a9ac1aa9c0cde6d9f810e3f1aee002c9deb9d218
[metze/wireshark/wip.git] / test / suite_capture.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Capture tests'''
11
12 import config
13 import glob
14 import os
15 import subprocess
16 import subprocesstest
17 import sys
18 import time
19 import uuid
20 import fixtures
21
22 capture_duration = 5
23
24 testout_pcap = 'testout.pcap'
25 snapshot_len = 96
26
27 def start_pinging(self):
28     ping_procs = []
29     if sys.platform.startswith('win32'):
30         # Fake '-i' with a subsecond interval.
31         for st in (0.1, 0.1, 0):
32             ping_procs.append(self.startProcess(config.args_ping))
33             time.sleep(st)
34     else:
35         ping_procs.append(self.startProcess(config.args_ping))
36     return ping_procs
37
38 def stop_pinging(ping_procs):
39     for proc in ping_procs:
40         proc.kill()
41
42
43 @fixtures.fixture(scope='session')
44 def wireshark_k(wireshark_command):
45     return tuple(list(wireshark_command) + ['-k'])
46
47 def capture_command(*cmd_args, shell=False):
48     if type(cmd_args[0]) != str:
49         # Assume something like ['wireshark', '-k']
50         cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
51     if shell:
52         cmd_args = ' '.join(cmd_args)
53     return cmd_args
54
55
56 @fixtures.fixture
57 def check_capture_10_packets(capture_interface, cmd_dumpcap):
58     def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
59         # Similar to suite_io.check_io_4_packets.
60         if not config.args_ping:
61             self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
62         self.assertIsNotNone(cmd)
63         testout_file = self.filename_from_id(testout_pcap)
64         ping_procs = start_pinging(self)
65         if to_stdout:
66             capture_proc = self.runProcess(capture_command(cmd,
67                 '-i', '"{}"'.format(capture_interface),
68                 '-p',
69                 '-w', '-',
70                 '-c', '10',
71                 '-a', 'duration:{}'.format(capture_duration),
72                 '-f', '"icmp || icmp6"',
73                 '>', testout_file,
74                 shell=True
75             ),
76             shell=True
77             )
78         else:
79             capture_proc = self.runProcess(capture_command(cmd,
80                 '-i', capture_interface,
81                 '-p',
82                 '-w', testout_file,
83                 '-c', '10',
84                 '-a', 'duration:{}'.format(capture_duration),
85                 '-f', 'icmp || icmp6',
86             ))
87         capture_returncode = capture_proc.returncode
88         stop_pinging(ping_procs)
89         if capture_returncode != 0:
90             self.log_fd.write('{} -D output:\n'.format(cmd))
91             self.runProcess((cmd, '-D'))
92         self.assertEqual(capture_returncode, 0)
93         if (capture_returncode == 0):
94             self.checkPacketCount(10)
95     return check_capture_10_packets_real
96
97
98 @fixtures.fixture
99 def check_capture_fifo(cmd_dumpcap):
100     if sys.platform == 'win32':
101         fixtures.skip('Test requires OS fifo support.')
102
103     def check_capture_fifo_real(self, cmd=None):
104         self.assertIsNotNone(cmd)
105         testout_file = self.filename_from_id(testout_pcap)
106         fifo_file = self.filename_from_id('testout.fifo')
107         try:
108             # If a previous test left its fifo laying around, e.g. from a failure, remove it.
109             os.unlink(fifo_file)
110         except:
111             pass
112         os.mkfifo(fifo_file)
113         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
114         fifo_proc = self.startProcess(
115             ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
116             shell=True)
117         capture_proc = self.runProcess(capture_command(cmd,
118             '-i', fifo_file,
119             '-p',
120             '-w', testout_file,
121             '-a', 'duration:{}'.format(capture_duration),
122         ))
123         fifo_proc.kill()
124         self.assertTrue(os.path.isfile(testout_file))
125         capture_returncode = capture_proc.returncode
126         self.assertEqual(capture_returncode, 0)
127         if (capture_returncode == 0):
128             self.checkPacketCount(8)
129     return check_capture_fifo_real
130
131
132 @fixtures.fixture
133 def check_capture_stdin(cmd_dumpcap):
134     # Capturing always requires dumpcap, hence the dependency on it.
135     def check_capture_stdin_real(self, cmd=None):
136         # Similar to suite_io.check_io_4_packets.
137         self.assertIsNotNone(cmd)
138         testout_file = self.filename_from_id(testout_pcap)
139         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
140         capture_cmd = capture_command(cmd,
141             '-i', '-',
142             '-w', testout_file,
143             '-a', 'duration:{}'.format(capture_duration),
144             shell=True
145         )
146         is_gui = type(cmd) != str and '-k' in cmd[0]
147         if is_gui:
148             capture_cmd += ' -o console.log.level:127'
149         pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
150         pipe_returncode = pipe_proc.returncode
151         self.assertEqual(pipe_returncode, 0)
152         if is_gui:
153             self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
154             self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
155             self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
156         self.assertTrue(os.path.isfile(testout_file))
157         if (pipe_returncode == 0):
158             self.checkPacketCount(8)
159     return check_capture_stdin_real
160
161
162 @fixtures.fixture
163 def check_capture_read_filter(capture_interface):
164     def check_capture_read_filter_real(self, cmd=None):
165         if not config.args_ping:
166             self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
167         self.assertIsNotNone(cmd)
168         ping_procs = start_pinging(self)
169         testout_file = self.filename_from_id(testout_pcap)
170         capture_proc = self.runProcess(capture_command(cmd,
171             '-i', capture_interface,
172             '-p',
173             '-w', testout_file,
174             '-2',
175             '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
176             '-c', '10',
177             '-a', 'duration:{}'.format(capture_duration),
178             '-f', 'icmp || icmp6',
179         ))
180         capture_returncode = capture_proc.returncode
181         stop_pinging(ping_procs)
182         self.assertEqual(capture_returncode, 0)
183
184         if (capture_returncode == 0):
185             self.checkPacketCount(0)
186     return check_capture_read_filter_real
187
188 @fixtures.fixture
189 def check_capture_snapshot_len(capture_interface, cmd_tshark):
190     def check_capture_snapshot_len_real(self, cmd=None):
191         if not config.args_ping:
192             self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
193         self.assertIsNotNone(cmd)
194         ping_procs = start_pinging(self)
195         testout_file = self.filename_from_id(testout_pcap)
196         capture_proc = self.runProcess(capture_command(cmd,
197             '-i', capture_interface,
198             '-p',
199             '-w', testout_file,
200             '-s', str(snapshot_len),
201             '-a', 'duration:{}'.format(capture_duration),
202             '-f', 'icmp || icmp6',
203         ))
204         capture_returncode = capture_proc.returncode
205         stop_pinging(ping_procs)
206         self.assertEqual(capture_returncode, 0)
207         self.assertTrue(os.path.isfile(testout_file))
208
209         # Use tshark to filter out all packets larger than 68 bytes.
210         testout2_file = self.filename_from_id('testout2.pcap')
211
212         filter_proc = self.runProcess((cmd_tshark,
213             '-r', testout_file,
214             '-w', testout2_file,
215             '-Y', 'frame.cap_len>{}'.format(snapshot_len),
216         ))
217         filter_returncode = filter_proc.returncode
218         self.assertEqual(capture_returncode, 0)
219         if (capture_returncode == 0):
220             self.checkPacketCount(0, cap_file=testout2_file)
221     return check_capture_snapshot_len_real
222
223
224 @fixtures.fixture
225 def check_dumpcap_autostop_stdin(cmd_dumpcap):
226     def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
227         # Similar to check_capture_stdin.
228         testout_file = self.filename_from_id(testout_pcap)
229         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
230         condition='oops:invalid'
231
232         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
233         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
234
235         if packets is not None:
236             condition = 'packets:{}'.format(packets)
237         elif filesize is not None:
238             condition = 'filesize:{}'.format(filesize)
239
240         capture_cmd = ' '.join((cmd_dumpcap,
241             '-i', '-',
242             '-w', testout_file,
243             '-a', condition,
244         ))
245         pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
246         pipe_returncode = pipe_proc.returncode
247         self.assertEqual(pipe_returncode, 0)
248         self.assertTrue(os.path.isfile(testout_file))
249         if (pipe_returncode != 0):
250             return
251
252         if packets is not None:
253             self.checkPacketCount(packets)
254         elif filesize is not None:
255             capturekb = os.path.getsize(testout_file) / 1000
256             self.assertGreaterEqual(capturekb, filesize)
257     return check_dumpcap_autostop_stdin_real
258
259
260 @fixtures.fixture
261 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
262     def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
263         # Similar to check_capture_stdin.
264         rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
265         testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
266         testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
267         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
268         condition='oops:invalid'
269
270         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
271         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
272
273         if packets is not None:
274             condition = 'packets:{}'.format(packets)
275         elif filesize is not None:
276             condition = 'filesize:{}'.format(filesize)
277
278         capture_cmd = ' '.join((cmd_dumpcap,
279             '-i', '-',
280             '-w', testout_file,
281             '-a', 'files:2',
282             '-b', condition,
283         ))
284         pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
285         pipe_returncode = pipe_proc.returncode
286         self.assertEqual(pipe_returncode, 0)
287         if (pipe_returncode != 0):
288             return
289
290         rb_files = glob.glob(testout_glob)
291         for rbf in rb_files:
292             self.cleanup_files.append(rbf)
293
294         self.assertEqual(len(rb_files), 2)
295
296         for rbf in rb_files:
297             self.assertTrue(os.path.isfile(rbf))
298             if packets is not None:
299                 self.checkPacketCount(packets, cap_file=rbf)
300             elif filesize is not None:
301                 capturekb = os.path.getsize(rbf) / 1000
302                 self.assertGreaterEqual(capturekb, filesize)
303     return check_dumpcap_ringbuffer_stdin_real
304
305
306 @fixtures.mark_usefixtures('test_env')
307 @fixtures.uses_fixtures
308 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
309     def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
310         '''Capture 10 packets from the network to a file using Wireshark'''
311         check_capture_10_packets(self, cmd=wireshark_k)
312
313     # Wireshark doesn't currently support writing to stdout while capturing.
314     # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
315     #     '''Capture 10 packets from the network to stdout using Wireshark'''
316     #     check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
317
318     def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
319         '''Capture from a fifo using Wireshark'''
320         check_capture_fifo(self, cmd=wireshark_k)
321
322     def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
323         '''Capture from stdin using Wireshark'''
324         check_capture_stdin(self, cmd=wireshark_k)
325
326     def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
327         '''Capture truncated packets using Wireshark'''
328         check_capture_snapshot_len(self, cmd=wireshark_k)
329
330
331 @fixtures.mark_usefixtures('test_env')
332 @fixtures.uses_fixtures
333 class case_tshark_capture(subprocesstest.SubprocessTestCase):
334     def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
335         '''Capture 10 packets from the network to a file using TShark'''
336         check_capture_10_packets(self, cmd=cmd_tshark)
337
338     def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
339         '''Capture 10 packets from the network to stdout using TShark'''
340         check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
341
342     def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
343         '''Capture from a fifo using TShark'''
344         check_capture_fifo(self, cmd=cmd_tshark)
345
346     def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
347         '''Capture from stdin using TShark'''
348         check_capture_stdin(self, cmd=cmd_tshark)
349
350     def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
351         '''Capture truncated packets using TShark'''
352         check_capture_snapshot_len(self, cmd=cmd_tshark)
353
354
355 @fixtures.mark_usefixtures('base_env')
356 @fixtures.uses_fixtures
357 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
358     def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
359         '''Capture 10 packets from the network to a file using Dumpcap'''
360         check_capture_10_packets(self, cmd=cmd_dumpcap)
361
362     def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
363         '''Capture 10 packets from the network to stdout using Dumpcap'''
364         check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
365
366     def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
367         '''Capture from a fifo using Dumpcap'''
368         check_capture_fifo(self, cmd=cmd_dumpcap)
369
370     def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
371         '''Capture from stdin using Dumpcap'''
372         check_capture_stdin(self, cmd=cmd_dumpcap)
373
374     def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
375         '''Capture truncated packets using Dumpcap'''
376         check_capture_snapshot_len(self, cmd=cmd_dumpcap)
377
378
379 @fixtures.mark_usefixtures('base_env')
380 @fixtures.uses_fixtures
381 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
382     # duration, filesize, packets, files
383     def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
384         '''Capture from stdin using Dumpcap until we reach a file size limit'''
385         check_dumpcap_autostop_stdin(self, filesize=15)
386
387     def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
388         '''Capture from stdin using Dumpcap until we reach a packet limit'''
389         check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
390
391
392 @fixtures.mark_usefixtures('base_env')
393 @fixtures.uses_fixtures
394 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
395     # duration, interval, filesize, packets, files
396     # Need a function that finds ringbuffer file names.
397     def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
398         '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
399         check_dumpcap_ringbuffer_stdin(self, filesize=15)
400
401     def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
402         '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
403         check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.