2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
24 testout_pcap = 'testout.pcap'
25 testout_pcapng = 'testout.pcapng'
29 def traffic_generator():
31 Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
32 where cfilter is a capture filter to match the generated traffic.
33 start_func can be invoked to start generating traffic and returns a function
34 which can be used to stop traffic generation early.
35 Currently calls ping www.wireshark.org for 60 seconds.
37 # XXX replace this by something that generates UDP traffic to localhost?
38 # That would avoid external access which is forbidden by the Debian policy.
40 if sys.platform.startswith('win32'):
41 # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
42 args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
44 elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
45 args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
46 elif sys.platform.startswith('darwin'):
47 args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
49 # XXX Other BSDs, Solaris, etc
50 fixtures.skip('ping utility is unavailable - cannot generate traffic')
58 def start_processes():
59 for i in range(nprocs):
61 # Fake subsecond interval if the ping utility lacks support.
63 proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
67 yield start_processes, 'icmp || icmp6'
72 @fixtures.fixture(scope='session')
73 def wireshark_k(wireshark_command):
74 return tuple(list(wireshark_command) + ['-k'])
77 def capture_command(*cmd_args, shell=False):
78 if type(cmd_args[0]) != str:
79 # Assume something like ['wireshark', '-k']
80 cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
82 cmd_args = ' '.join(cmd_args)
87 def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
88 start_traffic, cfilter = traffic_generator
89 def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
90 self.assertIsNotNone(cmd)
91 testout_file = self.filename_from_id(testout_pcap)
92 stop_traffic = start_traffic()
94 capture_proc = self.runProcess(capture_command(cmd,
95 '-i', '"{}"'.format(capture_interface),
99 '-a', 'duration:{}'.format(capture_duration),
100 '-f', '"{}"'.format(cfilter),
107 capture_proc = self.runProcess(capture_command(cmd,
108 '-i', capture_interface,
112 '-a', 'duration:{}'.format(capture_duration),
116 capture_returncode = capture_proc.returncode
117 if capture_returncode != 0:
118 self.log_fd.write('{} -D output:\n'.format(cmd))
119 self.runProcess((cmd, '-D'))
120 self.assertEqual(capture_returncode, 0)
121 self.checkPacketCount(10)
122 return check_capture_10_packets_real
126 def check_capture_fifo(cmd_dumpcap):
127 if sys.platform == 'win32':
128 fixtures.skip('Test requires OS fifo support.')
130 def check_capture_fifo_real(self, cmd=None):
131 self.assertIsNotNone(cmd)
132 testout_file = self.filename_from_id(testout_pcap)
133 fifo_file = self.filename_from_id('testout.fifo')
135 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
140 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
141 fifo_proc = self.startProcess(
142 ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
144 capture_proc = self.assertRun(capture_command(cmd,
148 '-a', 'duration:{}'.format(capture_duration),
151 self.assertTrue(os.path.isfile(testout_file))
152 self.checkPacketCount(8)
153 return check_capture_fifo_real
157 def check_capture_stdin(cmd_dumpcap):
158 # Capturing always requires dumpcap, hence the dependency on it.
159 def check_capture_stdin_real(self, cmd=None):
160 # Similar to suite_io.check_io_4_packets.
161 self.assertIsNotNone(cmd)
162 testout_file = self.filename_from_id(testout_pcap)
163 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
164 capture_cmd = capture_command(cmd,
167 '-a', 'duration:{}'.format(capture_duration),
170 is_gui = type(cmd) != str and '-k' in cmd[0]
172 capture_cmd += ' -o console.log.level:127'
173 pipe_proc = self.assertRun(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
175 self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
176 self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
177 self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
178 self.assertTrue(os.path.isfile(testout_file))
179 self.checkPacketCount(8)
180 return check_capture_stdin_real
184 def check_capture_read_filter(capture_interface, traffic_generator):
185 start_traffic, cfilter = traffic_generator
186 def check_capture_read_filter_real(self, cmd=None):
187 self.assertIsNotNone(cmd)
188 testout_file = self.filename_from_id(testout_pcap)
189 stop_traffic = start_traffic()
190 capture_proc = self.assertRun(capture_command(cmd,
191 '-i', capture_interface,
195 '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
197 '-a', 'duration:{}'.format(capture_duration),
201 self.checkPacketCount(0)
202 return check_capture_read_filter_real
205 def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
206 start_traffic, cfilter = traffic_generator
207 def check_capture_snapshot_len_real(self, cmd=None):
208 self.assertIsNotNone(cmd)
209 stop_traffic = start_traffic()
210 testout_file = self.filename_from_id(testout_pcap)
211 capture_proc = self.assertRun(capture_command(cmd,
212 '-i', capture_interface,
215 '-s', str(snapshot_len),
216 '-a', 'duration:{}'.format(capture_duration),
220 self.assertTrue(os.path.isfile(testout_file))
222 # Use tshark to filter out all packets larger than 68 bytes.
223 testout2_file = self.filename_from_id('testout2.pcap')
225 filter_proc = self.assertRun((cmd_tshark,
228 '-Y', 'frame.cap_len>{}'.format(snapshot_len),
230 self.checkPacketCount(0, cap_file=testout2_file)
231 return check_capture_snapshot_len_real
235 def check_dumpcap_autostop_stdin(cmd_dumpcap):
236 def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
237 # Similar to check_capture_stdin.
238 testout_file = self.filename_from_id(testout_pcap)
239 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
240 condition='oops:invalid'
242 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
243 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
245 if packets is not None:
246 condition = 'packets:{}'.format(packets)
247 elif filesize is not None:
248 condition = 'filesize:{}'.format(filesize)
250 capture_cmd = ' '.join((cmd_dumpcap,
255 pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
256 self.assertTrue(os.path.isfile(testout_file))
258 if packets is not None:
259 self.checkPacketCount(packets)
260 elif filesize is not None:
261 capturekb = os.path.getsize(testout_file) / 1000
262 self.assertGreaterEqual(capturekb, filesize)
263 return check_dumpcap_autostop_stdin_real
267 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
268 def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
269 # Similar to check_capture_stdin.
270 rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
271 testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
272 testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
273 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
274 condition='oops:invalid'
276 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
277 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
279 if packets is not None:
280 condition = 'packets:{}'.format(packets)
281 elif filesize is not None:
282 condition = 'filesize:{}'.format(filesize)
284 capture_cmd = ' '.join((cmd_dumpcap,
290 pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
292 rb_files = glob.glob(testout_glob)
294 self.cleanup_files.append(rbf)
296 self.assertEqual(len(rb_files), 2)
299 self.assertTrue(os.path.isfile(rbf))
300 if packets is not None:
301 self.checkPacketCount(packets, cap_file=rbf)
302 elif filesize is not None:
303 capturekb = os.path.getsize(rbf) / 1000
304 self.assertGreaterEqual(capturekb, filesize)
305 return check_dumpcap_ringbuffer_stdin_real
309 def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
310 if sys.platform == 'win32':
311 fixtures.skip('Test requires OS fifo support.')
312 def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False):
313 # Make sure we always test multiple SHBs in an input.
315 capture_file('many_interfaces.pcapng.1'),
316 capture_file('many_interfaces.pcapng.2')
319 in_files_l.append([ capture_file('many_interfaces.pcapng.3') ])
322 # Default values for our validity tests
332 check_vals = [ check_val_d ]
334 for in_files in in_files_l:
335 fifo_file = self.filename_from_id('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
336 fifo_files.append(fifo_file)
337 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
342 cat_cmd = subprocesstest.cat_cap_file_command(in_files)
343 fifo_procs.append(self.startProcess(('{0} > {1}'.format(cat_cmd, fifo_file)), shell=True))
346 rb_unique = 'sections_rb_' + uuid.uuid4().hex[:6] # Random ID
347 testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
348 testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
349 check_vals.append(check_val_d.copy())
350 # check_vals[]['filename'] will be filled in below
352 testout_file = self.filename_from_id(testout_pcapng)
353 check_vals[0]['filename'] = testout_file
356 if not multi_input and not multi_output:
357 # Passthrough SHBs, single output file
362 check_vals[0]['packet_count'] = 79
363 check_vals[0]['idb_count'] = 22
364 check_vals[0]['ua_pt1_count'] = 1
365 check_vals[0]['ua_pt2_count'] = 1
366 elif not multi_input and multi_output:
367 # Passthrough SHBs, multiple output files
374 check_vals[0]['packet_count'] = 53
375 check_vals[0]['idb_count'] = 11
376 check_vals[0]['ua_pt1_count'] = 1
377 check_vals[1]['packet_count'] = 26
378 check_vals[1]['idb_count'] = 22
379 check_vals[1]['ua_pt1_count'] = 1
380 check_vals[1]['ua_pt2_count'] = 1
381 elif multi_input and not multi_output:
382 # Dumpcap SHBs, single output file
388 check_vals[0]['packet_count'] = 88
389 check_vals[0]['idb_count'] = 35
390 check_vals[0]['ua_dc_count'] = 1
392 # Dumpcap SHBs, multiple output files
400 check_vals[0]['packet_count'] = 53
401 check_vals[0]['idb_count'] = 13
402 check_vals[0]['ua_dc_count'] = 1
403 check_vals[1]['packet_count'] = 35
404 check_vals[1]['idb_count'] = 35
405 check_vals[1]['ua_dc_count'] = 1
407 capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args)
409 capture_proc = self.assertRun(capture_cmd)
410 for fifo_proc in fifo_procs: fifo_proc.kill()
414 rb_files = sorted(glob.glob(testout_glob))
415 self.assertEqual(len(rb_files), 2)
416 check_vals[0]['filename'] = rb_files[0]
417 check_vals[1]['filename'] = rb_files[1]
420 self.cleanup_files.append(rbf)
421 self.assertTrue(os.path.isfile(rbf))
425 if not multi_input and not multi_output:
426 # Check strict bit-for-bit passthrough.
427 in_hash = hashlib.sha256()
428 out_hash = hashlib.sha256()
429 for in_file in in_files_l[0]:
430 in_cap_file = capture_file(in_file)
431 with open(in_cap_file, 'rb') as f:
432 in_hash.update(f.read())
433 with open(testout_file, 'rb') as f:
434 out_hash.update(f.read())
435 self.assertEqual(in_hash.hexdigest(), out_hash.hexdigest())
437 # many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1"
438 # many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2"
439 # many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3"
440 # Each has 11 interfaces.
441 idb_compare_eq = True
442 if multi_input and multi_output:
443 # Having multiple inputs forces the use of threads. In our
444 # case this means that non-packet block counts in the first
445 # file in is nondeterministic.
446 idb_compare_eq = False
447 for check_val in check_vals:
448 self.checkPacketCount(check_val['packet_count'], cap_file=check_val['filename'])
450 tshark_proc = self.assertRun(capture_command(cmd_tshark,
451 '-r', check_val['filename'],
453 '-X', 'read_format:MIME Files Format'
455 # XXX Are there any other sanity checks we should run?
457 self.assertEqual(self.countOutput(r'Block: Interface Description Block',
458 proc=tshark_proc), check_val['idb_count'])
460 self.assertGreaterEqual(self.countOutput(r'Block: Interface Description Block',
461 proc=tshark_proc), check_val['idb_count'])
462 idb_compare_eq = True
463 self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #1',
464 proc=tshark_proc), check_val['ua_pt1_count'])
465 self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #2',
466 proc=tshark_proc), check_val['ua_pt2_count'])
467 self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #3',
468 proc=tshark_proc), check_val['ua_pt3_count'])
469 self.assertEqual(self.countOutput(r'Option: User Application = Dumpcap \(Wireshark\)',
470 proc=tshark_proc), check_val['ua_dc_count'])
471 return check_dumpcap_pcapng_sections_real
474 @fixtures.mark_usefixtures('test_env')
475 @fixtures.uses_fixtures
476 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
477 def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
478 '''Capture 10 packets from the network to a file using Wireshark'''
479 check_capture_10_packets(self, cmd=wireshark_k)
481 # Wireshark doesn't currently support writing to stdout while capturing.
482 # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
483 # '''Capture 10 packets from the network to stdout using Wireshark'''
484 # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
486 def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
487 '''Capture from a fifo using Wireshark'''
488 check_capture_fifo(self, cmd=wireshark_k)
490 def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
491 '''Capture from stdin using Wireshark'''
492 check_capture_stdin(self, cmd=wireshark_k)
494 def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
495 '''Capture truncated packets using Wireshark'''
496 check_capture_snapshot_len(self, cmd=wireshark_k)
499 @fixtures.mark_usefixtures('test_env')
500 @fixtures.uses_fixtures
501 class case_tshark_capture(subprocesstest.SubprocessTestCase):
502 def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
503 '''Capture 10 packets from the network to a file using TShark'''
504 check_capture_10_packets(self, cmd=cmd_tshark)
506 def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
507 '''Capture 10 packets from the network to stdout using TShark'''
508 check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
510 def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
511 '''Capture from a fifo using TShark'''
512 check_capture_fifo(self, cmd=cmd_tshark)
514 def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
515 '''Capture from stdin using TShark'''
516 check_capture_stdin(self, cmd=cmd_tshark)
518 def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
519 '''Capture truncated packets using TShark'''
520 check_capture_snapshot_len(self, cmd=cmd_tshark)
523 @fixtures.mark_usefixtures('base_env')
524 @fixtures.uses_fixtures
525 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
526 def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
527 '''Capture 10 packets from the network to a file using Dumpcap'''
528 check_capture_10_packets(self, cmd=cmd_dumpcap)
530 def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
531 '''Capture 10 packets from the network to stdout using Dumpcap'''
532 check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
534 def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
535 '''Capture from a fifo using Dumpcap'''
536 check_capture_fifo(self, cmd=cmd_dumpcap)
538 def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
539 '''Capture from stdin using Dumpcap'''
540 check_capture_stdin(self, cmd=cmd_dumpcap)
542 def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
543 '''Capture truncated packets using Dumpcap'''
544 check_capture_snapshot_len(self, cmd=cmd_dumpcap)
547 @fixtures.mark_usefixtures('base_env')
548 @fixtures.uses_fixtures
549 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
550 # duration, filesize, packets, files
551 def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
552 '''Capture from stdin using Dumpcap until we reach a file size limit'''
553 check_dumpcap_autostop_stdin(self, filesize=15)
555 def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
556 '''Capture from stdin using Dumpcap until we reach a packet limit'''
557 check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
560 @fixtures.mark_usefixtures('base_env')
561 @fixtures.uses_fixtures
562 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
563 # duration, interval, filesize, packets, files
564 def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
565 '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
566 check_dumpcap_ringbuffer_stdin(self, filesize=15)
568 def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
569 '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
570 check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
573 @fixtures.mark_usefixtures('base_env')
574 @fixtures.uses_fixtures
575 class case_dumpcap_pcapng_sections(subprocesstest.SubprocessTestCase):
576 def test_dumpcap_pcapng_single_in_single_out(self, check_dumpcap_pcapng_sections):
577 '''Capture from a single pcapng source using Dumpcap and write a single file'''
578 check_dumpcap_pcapng_sections(self)
580 def test_dumpcap_pcapng_single_in_multi_out(self, check_dumpcap_pcapng_sections):
581 '''Capture from a single pcapng source using Dumpcap and write two files'''
582 check_dumpcap_pcapng_sections(self, multi_output=True)
584 def test_dumpcap_pcapng_multi_in_single_out(self, check_dumpcap_pcapng_sections):
585 '''Capture from two pcapng sources using Dumpcap and write a single file'''
586 check_dumpcap_pcapng_sections(self, multi_input=True)
588 def test_dumpcap_pcapng_multi_in_multi_out(self, check_dumpcap_pcapng_sections):
589 '''Capture from two pcapng sources using Dumpcap and write two files'''
590 check_dumpcap_pcapng_sections(self, multi_input=True, multi_output=True)