Unit tests for ASTERIX I034
[metze/wireshark/wip.git] / test / suite_capture.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Capture tests'''
11
12 import fixtures
13 import glob
14 import hashlib
15 import os
16 import subprocess
17 import subprocesstest
18 import sys
19 import time
20 import uuid
21
22 capture_duration = 5
23
24 testout_pcap = 'testout.pcap'
25 testout_pcapng = 'testout.pcapng'
26 snapshot_len = 96
27
28 @fixtures.fixture
29 def traffic_generator():
30     '''
31     Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
32     where cfilter is a capture filter to match the generated traffic.
33     start_func can be invoked to start generating traffic and returns a function
34     which can be used to stop traffic generation early.
35     Currently calls ping www.wireshark.org for 60 seconds.
36     '''
37     # XXX replace this by something that generates UDP traffic to localhost?
38     # That would avoid external access which is forbidden by the Debian policy.
39     nprocs = 1
40     if sys.platform.startswith('win32'):
41         # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
42         args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
43         nprocs = 3
44     elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
45         args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
46     elif sys.platform.startswith('darwin'):
47         args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
48     else:
49         # XXX Other BSDs, Solaris, etc
50         fixtures.skip('ping utility is unavailable - cannot generate traffic')
51     procs = []
52     def kill_processes():
53         for proc in procs:
54             proc.kill()
55         for proc in procs:
56             proc.wait()
57         procs.clear()
58     def start_processes():
59         for i in range(nprocs):
60             if i > 0:
61                 # Fake subsecond interval if the ping utility lacks support.
62                 time.sleep(0.1)
63             proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
64             procs.append(proc)
65         return kill_processes
66     try:
67         yield start_processes, 'icmp || icmp6'
68     finally:
69         kill_processes()
70
71
72 @fixtures.fixture(scope='session')
73 def wireshark_k(wireshark_command):
74     return tuple(list(wireshark_command) + ['-k'])
75
76
77 def capture_command(*cmd_args, shell=False):
78     if type(cmd_args[0]) != str:
79         # Assume something like ['wireshark', '-k']
80         cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
81     if shell:
82         cmd_args = ' '.join(cmd_args)
83     return cmd_args
84
85
86 @fixtures.fixture
87 def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
88     start_traffic, cfilter = traffic_generator
89     def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
90         self.assertIsNotNone(cmd)
91         testout_file = self.filename_from_id(testout_pcap)
92         stop_traffic = start_traffic()
93         if to_stdout:
94             capture_proc = self.runProcess(capture_command(cmd,
95                 '-i', '"{}"'.format(capture_interface),
96                 '-p',
97                 '-w', '-',
98                 '-c', '10',
99                 '-a', 'duration:{}'.format(capture_duration),
100                 '-f', '"{}"'.format(cfilter),
101                 '>', testout_file,
102                 shell=True
103             ),
104             shell=True
105             )
106         else:
107             capture_proc = self.runProcess(capture_command(cmd,
108                 '-i', capture_interface,
109                 '-p',
110                 '-w', testout_file,
111                 '-c', '10',
112                 '-a', 'duration:{}'.format(capture_duration),
113                 '-f', cfilter,
114             ))
115         stop_traffic()
116         capture_returncode = capture_proc.returncode
117         if capture_returncode != 0:
118             self.log_fd.write('{} -D output:\n'.format(cmd))
119             self.runProcess((cmd, '-D'))
120         self.assertEqual(capture_returncode, 0)
121         self.checkPacketCount(10)
122     return check_capture_10_packets_real
123
124
125 @fixtures.fixture
126 def check_capture_fifo(cmd_dumpcap):
127     if sys.platform == 'win32':
128         fixtures.skip('Test requires OS fifo support.')
129
130     def check_capture_fifo_real(self, cmd=None):
131         self.assertIsNotNone(cmd)
132         testout_file = self.filename_from_id(testout_pcap)
133         fifo_file = self.filename_from_id('testout.fifo')
134         try:
135             # If a previous test left its fifo laying around, e.g. from a failure, remove it.
136             os.unlink(fifo_file)
137         except:
138             pass
139         os.mkfifo(fifo_file)
140         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
141         fifo_proc = self.startProcess(
142             ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
143             shell=True)
144         capture_proc = self.assertRun(capture_command(cmd,
145             '-i', fifo_file,
146             '-p',
147             '-w', testout_file,
148             '-a', 'duration:{}'.format(capture_duration),
149         ))
150         fifo_proc.kill()
151         self.assertTrue(os.path.isfile(testout_file))
152         self.checkPacketCount(8)
153     return check_capture_fifo_real
154
155
156 @fixtures.fixture
157 def check_capture_stdin(cmd_dumpcap):
158     # Capturing always requires dumpcap, hence the dependency on it.
159     def check_capture_stdin_real(self, cmd=None):
160         # Similar to suite_io.check_io_4_packets.
161         self.assertIsNotNone(cmd)
162         testout_file = self.filename_from_id(testout_pcap)
163         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
164         capture_cmd = capture_command(cmd,
165             '-i', '-',
166             '-w', testout_file,
167             '-a', 'duration:{}'.format(capture_duration),
168             shell=True
169         )
170         is_gui = type(cmd) != str and '-k' in cmd[0]
171         if is_gui:
172             capture_cmd += ' -o console.log.level:127'
173         pipe_proc = self.assertRun(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
174         if is_gui:
175             self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
176             self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
177             self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
178         self.assertTrue(os.path.isfile(testout_file))
179         self.checkPacketCount(8)
180     return check_capture_stdin_real
181
182
183 @fixtures.fixture
184 def check_capture_read_filter(capture_interface, traffic_generator):
185     start_traffic, cfilter = traffic_generator
186     def check_capture_read_filter_real(self, cmd=None):
187         self.assertIsNotNone(cmd)
188         testout_file = self.filename_from_id(testout_pcap)
189         stop_traffic = start_traffic()
190         capture_proc = self.assertRun(capture_command(cmd,
191             '-i', capture_interface,
192             '-p',
193             '-w', testout_file,
194             '-2',
195             '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
196             '-c', '10',
197             '-a', 'duration:{}'.format(capture_duration),
198             '-f', cfilter,
199         ))
200         stop_traffic()
201         self.checkPacketCount(0)
202     return check_capture_read_filter_real
203
204 @fixtures.fixture
205 def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
206     start_traffic, cfilter = traffic_generator
207     def check_capture_snapshot_len_real(self, cmd=None):
208         self.assertIsNotNone(cmd)
209         stop_traffic = start_traffic()
210         testout_file = self.filename_from_id(testout_pcap)
211         capture_proc = self.assertRun(capture_command(cmd,
212             '-i', capture_interface,
213             '-p',
214             '-w', testout_file,
215             '-s', str(snapshot_len),
216             '-a', 'duration:{}'.format(capture_duration),
217             '-f', cfilter,
218         ))
219         stop_traffic()
220         self.assertTrue(os.path.isfile(testout_file))
221
222         # Use tshark to filter out all packets larger than 68 bytes.
223         testout2_file = self.filename_from_id('testout2.pcap')
224
225         filter_proc = self.assertRun((cmd_tshark,
226             '-r', testout_file,
227             '-w', testout2_file,
228             '-Y', 'frame.cap_len>{}'.format(snapshot_len),
229         ))
230         self.checkPacketCount(0, cap_file=testout2_file)
231     return check_capture_snapshot_len_real
232
233
234 @fixtures.fixture
235 def check_dumpcap_autostop_stdin(cmd_dumpcap):
236     def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
237         # Similar to check_capture_stdin.
238         testout_file = self.filename_from_id(testout_pcap)
239         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
240         condition='oops:invalid'
241
242         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
243         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
244
245         if packets is not None:
246             condition = 'packets:{}'.format(packets)
247         elif filesize is not None:
248             condition = 'filesize:{}'.format(filesize)
249
250         capture_cmd = ' '.join((cmd_dumpcap,
251             '-i', '-',
252             '-w', testout_file,
253             '-a', condition,
254         ))
255         pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
256         self.assertTrue(os.path.isfile(testout_file))
257
258         if packets is not None:
259             self.checkPacketCount(packets)
260         elif filesize is not None:
261             capturekb = os.path.getsize(testout_file) / 1000
262             self.assertGreaterEqual(capturekb, filesize)
263     return check_dumpcap_autostop_stdin_real
264
265
266 @fixtures.fixture
267 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
268     def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
269         # Similar to check_capture_stdin.
270         rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
271         testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
272         testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
273         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
274         condition='oops:invalid'
275
276         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
277         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
278
279         if packets is not None:
280             condition = 'packets:{}'.format(packets)
281         elif filesize is not None:
282             condition = 'filesize:{}'.format(filesize)
283
284         capture_cmd = ' '.join((cmd_dumpcap,
285             '-i', '-',
286             '-w', testout_file,
287             '-a', 'files:2',
288             '-b', condition,
289         ))
290         pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
291
292         rb_files = glob.glob(testout_glob)
293         for rbf in rb_files:
294             self.cleanup_files.append(rbf)
295
296         self.assertEqual(len(rb_files), 2)
297
298         for rbf in rb_files:
299             self.assertTrue(os.path.isfile(rbf))
300             if packets is not None:
301                 self.checkPacketCount(packets, cap_file=rbf)
302             elif filesize is not None:
303                 capturekb = os.path.getsize(rbf) / 1000
304                 self.assertGreaterEqual(capturekb, filesize)
305     return check_dumpcap_ringbuffer_stdin_real
306
307
308 @fixtures.fixture
309 def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, capture_file):
310     if sys.platform == 'win32':
311         fixtures.skip('Test requires OS fifo support.')
312     def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False):
313         # Make sure we always test multiple SHBs in an input.
314         in_files_l = [ [
315             capture_file('many_interfaces.pcapng.1'),
316             capture_file('many_interfaces.pcapng.2')
317             ] ]
318         if multi_input:
319             in_files_l.append([ capture_file('many_interfaces.pcapng.3') ])
320         fifo_files = []
321         fifo_procs = []
322         # Default values for our validity tests
323         check_val_d = {
324             'filename': None,
325             'packet_count': 0,
326             'idb_count': 0,
327             'ua_pt1_count': 0,
328             'ua_pt2_count': 0,
329             'ua_pt3_count': 0,
330             'ua_dc_count': 0,
331         }
332         check_vals = [ check_val_d ]
333
334         for in_files in in_files_l:
335             fifo_file = self.filename_from_id('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
336             fifo_files.append(fifo_file)
337             # If a previous test left its fifo laying around, e.g. from a failure, remove it.
338             try:
339                 os.unlink(fifo_file)
340             except: pass
341             os.mkfifo(fifo_file)
342             cat_cmd = subprocesstest.cat_cap_file_command(in_files)
343             fifo_procs.append(self.startProcess(('{0} > {1}'.format(cat_cmd, fifo_file)), shell=True))
344
345         if multi_output:
346             rb_unique = 'sections_rb_' + uuid.uuid4().hex[:6] # Random ID
347             testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
348             testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
349             check_vals.append(check_val_d.copy())
350             # check_vals[]['filename'] will be filled in below
351         else:
352             testout_file = self.filename_from_id(testout_pcapng)
353             check_vals[0]['filename'] = testout_file
354
355         # Capture commands
356         if not multi_input and not multi_output:
357             # Passthrough SHBs, single output file
358             capture_cmd_args = (
359                 '-i', fifo_files[0],
360                 '-w', testout_file
361             )
362             check_vals[0]['packet_count'] = 79
363             check_vals[0]['idb_count'] = 22
364             check_vals[0]['ua_pt1_count'] = 1
365             check_vals[0]['ua_pt2_count'] = 1
366         elif not multi_input and multi_output:
367             # Passthrough SHBs, multiple output files
368             capture_cmd_args = (
369                 '-i', fifo_files[0],
370                 '-w', testout_file,
371                 '-a', 'files:2',
372                 '-b', 'packets:53'
373             )
374             check_vals[0]['packet_count'] = 53
375             check_vals[0]['idb_count'] = 11
376             check_vals[0]['ua_pt1_count'] = 1
377             check_vals[1]['packet_count'] = 26
378             check_vals[1]['idb_count'] = 22
379             check_vals[1]['ua_pt1_count'] = 1
380             check_vals[1]['ua_pt2_count'] = 1
381         elif multi_input and not multi_output:
382             # Dumpcap SHBs, single output file
383             capture_cmd_args = (
384                 '-i', fifo_files[0],
385                 '-i', fifo_files[1],
386                 '-w', testout_file
387             )
388             check_vals[0]['packet_count'] = 88
389             check_vals[0]['idb_count'] = 35
390             check_vals[0]['ua_dc_count'] = 1
391         else:
392             # Dumpcap SHBs, multiple output files
393             capture_cmd_args = (
394                 '-i', fifo_files[0],
395                 '-i', fifo_files[1],
396                 '-w', testout_file,
397                 '-a', 'files:2',
398                 '-b', 'packets:53'
399             )
400             check_vals[0]['packet_count'] = 53
401             check_vals[0]['idb_count'] = 13
402             check_vals[0]['ua_dc_count'] = 1
403             check_vals[1]['packet_count'] = 35
404             check_vals[1]['idb_count'] = 35
405             check_vals[1]['ua_dc_count'] = 1
406
407         capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args)
408
409         capture_proc = self.assertRun(capture_cmd)
410         for fifo_proc in fifo_procs: fifo_proc.kill()
411
412         rb_files = []
413         if multi_output:
414             rb_files = sorted(glob.glob(testout_glob))
415             self.assertEqual(len(rb_files), 2)
416             check_vals[0]['filename'] = rb_files[0]
417             check_vals[1]['filename'] = rb_files[1]
418
419         for rbf in rb_files:
420             self.cleanup_files.append(rbf)
421             self.assertTrue(os.path.isfile(rbf))
422
423         # Output tests
424
425         if not multi_input and not multi_output:
426             # Check strict bit-for-bit passthrough.
427             in_hash = hashlib.sha256()
428             out_hash = hashlib.sha256()
429             for in_file in in_files_l[0]:
430                 in_cap_file = capture_file(in_file)
431                 with open(in_cap_file, 'rb') as f:
432                     in_hash.update(f.read())
433             with open(testout_file, 'rb') as f:
434                 out_hash.update(f.read())
435             self.assertEqual(in_hash.hexdigest(), out_hash.hexdigest())
436
437         # many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1"
438         # many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2"
439         # many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3"
440         # Each has 11 interfaces.
441         idb_compare_eq = True
442         if multi_input and multi_output:
443             # Having multiple inputs forces the use of threads. In our
444             # case this means that non-packet block counts in the first
445             # file in is nondeterministic.
446             idb_compare_eq = False
447         for check_val in check_vals:
448             self.checkPacketCount(check_val['packet_count'], cap_file=check_val['filename'])
449
450             tshark_proc = self.assertRun(capture_command(cmd_tshark,
451                 '-r', check_val['filename'],
452                 '-V',
453                 '-X', 'read_format:MIME Files Format'
454             ))
455             # XXX Are there any other sanity checks we should run?
456             if idb_compare_eq:
457                 self.assertEqual(self.countOutput(r'Block: Interface Description Block',
458                     proc=tshark_proc), check_val['idb_count'])
459             else:
460                 self.assertGreaterEqual(self.countOutput(r'Block: Interface Description Block',
461                     proc=tshark_proc), check_val['idb_count'])
462                 idb_compare_eq = True
463             self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #1',
464                 proc=tshark_proc), check_val['ua_pt1_count'])
465             self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #2',
466                 proc=tshark_proc), check_val['ua_pt2_count'])
467             self.assertEqual(self.countOutput(r'Option: User Application = Passthrough test #3',
468                 proc=tshark_proc), check_val['ua_pt3_count'])
469             self.assertEqual(self.countOutput(r'Option: User Application = Dumpcap \(Wireshark\)',
470                 proc=tshark_proc), check_val['ua_dc_count'])
471     return check_dumpcap_pcapng_sections_real
472
473
474 @fixtures.mark_usefixtures('test_env')
475 @fixtures.uses_fixtures
476 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
477     def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
478         '''Capture 10 packets from the network to a file using Wireshark'''
479         check_capture_10_packets(self, cmd=wireshark_k)
480
481     # Wireshark doesn't currently support writing to stdout while capturing.
482     # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
483     #     '''Capture 10 packets from the network to stdout using Wireshark'''
484     #     check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
485
486     def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
487         '''Capture from a fifo using Wireshark'''
488         check_capture_fifo(self, cmd=wireshark_k)
489
490     def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
491         '''Capture from stdin using Wireshark'''
492         check_capture_stdin(self, cmd=wireshark_k)
493
494     def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
495         '''Capture truncated packets using Wireshark'''
496         check_capture_snapshot_len(self, cmd=wireshark_k)
497
498
499 @fixtures.mark_usefixtures('test_env')
500 @fixtures.uses_fixtures
501 class case_tshark_capture(subprocesstest.SubprocessTestCase):
502     def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
503         '''Capture 10 packets from the network to a file using TShark'''
504         check_capture_10_packets(self, cmd=cmd_tshark)
505
506     def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
507         '''Capture 10 packets from the network to stdout using TShark'''
508         check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
509
510     def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
511         '''Capture from a fifo using TShark'''
512         check_capture_fifo(self, cmd=cmd_tshark)
513
514     def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
515         '''Capture from stdin using TShark'''
516         check_capture_stdin(self, cmd=cmd_tshark)
517
518     def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
519         '''Capture truncated packets using TShark'''
520         check_capture_snapshot_len(self, cmd=cmd_tshark)
521
522
523 @fixtures.mark_usefixtures('base_env')
524 @fixtures.uses_fixtures
525 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
526     def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
527         '''Capture 10 packets from the network to a file using Dumpcap'''
528         check_capture_10_packets(self, cmd=cmd_dumpcap)
529
530     def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
531         '''Capture 10 packets from the network to stdout using Dumpcap'''
532         check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
533
534     def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
535         '''Capture from a fifo using Dumpcap'''
536         check_capture_fifo(self, cmd=cmd_dumpcap)
537
538     def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
539         '''Capture from stdin using Dumpcap'''
540         check_capture_stdin(self, cmd=cmd_dumpcap)
541
542     def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
543         '''Capture truncated packets using Dumpcap'''
544         check_capture_snapshot_len(self, cmd=cmd_dumpcap)
545
546
547 @fixtures.mark_usefixtures('base_env')
548 @fixtures.uses_fixtures
549 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
550     # duration, filesize, packets, files
551     def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
552         '''Capture from stdin using Dumpcap until we reach a file size limit'''
553         check_dumpcap_autostop_stdin(self, filesize=15)
554
555     def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
556         '''Capture from stdin using Dumpcap until we reach a packet limit'''
557         check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
558
559
560 @fixtures.mark_usefixtures('base_env')
561 @fixtures.uses_fixtures
562 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
563     # duration, interval, filesize, packets, files
564     def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
565         '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
566         check_dumpcap_ringbuffer_stdin(self, filesize=15)
567
568     def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
569         '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
570         check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
571
572
573 @fixtures.mark_usefixtures('base_env')
574 @fixtures.uses_fixtures
575 class case_dumpcap_pcapng_sections(subprocesstest.SubprocessTestCase):
576     def test_dumpcap_pcapng_single_in_single_out(self, check_dumpcap_pcapng_sections):
577         '''Capture from a single pcapng source using Dumpcap and write a single file'''
578         check_dumpcap_pcapng_sections(self)
579
580     def test_dumpcap_pcapng_single_in_multi_out(self, check_dumpcap_pcapng_sections):
581         '''Capture from a single pcapng source using Dumpcap and write two files'''
582         check_dumpcap_pcapng_sections(self, multi_output=True)
583
584     def test_dumpcap_pcapng_multi_in_single_out(self, check_dumpcap_pcapng_sections):
585         '''Capture from two pcapng sources using Dumpcap and write a single file'''
586         check_dumpcap_pcapng_sections(self, multi_input=True)
587
588     def test_dumpcap_pcapng_multi_in_multi_out(self, check_dumpcap_pcapng_sections):
589         '''Capture from two pcapng sources using Dumpcap and write two files'''
590         check_dumpcap_pcapng_sections(self, multi_input=True, multi_output=True)