test: finalize suite_capture conversion to fixtures, drop config.py
[metze/wireshark/wip.git] / test / suite_capture.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Capture tests'''
11
12 import glob
13 import os
14 import subprocess
15 import subprocesstest
16 import sys
17 import time
18 import uuid
19 import fixtures
20
21 capture_duration = 5
22
23 testout_pcap = 'testout.pcap'
24 snapshot_len = 96
25
26 @fixtures.fixture
27 def traffic_generator():
28     '''
29     Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
30     where cfilter is a capture filter to match the generated traffic.
31     start_func can be invoked to start generating traffic and returns a function
32     which can be used to stop traffic generation early.
33     Currently calls ping www.wireshark.org for 60 seconds.
34     '''
35     # XXX replace this by something that generates UDP traffic to localhost?
36     # That would avoid external access which is forbidden by the Debian policy.
37     nprocs = 1
38     if sys.platform.startswith('win32'):
39         # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
40         args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
41         nprocs = 3
42     elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
43         args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
44     elif sys.platform.startswith('darwin'):
45         args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
46     else:
47         # XXX Other BSDs, Solaris, etc
48         fixtures.skip('ping utility is unavailable - cannot generate traffic')
49     procs = []
50     def kill_processes():
51         for proc in procs:
52             proc.kill()
53         for proc in procs:
54             proc.wait()
55         procs.clear()
56     def start_processes():
57         for i in range(nprocs):
58             if i > 0:
59                 # Fake subsecond interval if the ping utility lacks support.
60                 time.sleep(0.1)
61             proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
62             procs.append(proc)
63         return kill_processes
64     try:
65         yield start_processes, 'icmp || icmp6'
66     finally:
67         kill_processes()
68
69
70 @fixtures.fixture(scope='session')
71 def wireshark_k(wireshark_command):
72     return tuple(list(wireshark_command) + ['-k'])
73
74 def capture_command(*cmd_args, shell=False):
75     if type(cmd_args[0]) != str:
76         # Assume something like ['wireshark', '-k']
77         cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
78     if shell:
79         cmd_args = ' '.join(cmd_args)
80     return cmd_args
81
82
83 @fixtures.fixture
84 def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
85     start_traffic, cfilter = traffic_generator
86     def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
87         self.assertIsNotNone(cmd)
88         testout_file = self.filename_from_id(testout_pcap)
89         stop_traffic = start_traffic()
90         if to_stdout:
91             capture_proc = self.runProcess(capture_command(cmd,
92                 '-i', '"{}"'.format(capture_interface),
93                 '-p',
94                 '-w', '-',
95                 '-c', '10',
96                 '-a', 'duration:{}'.format(capture_duration),
97                 '-f', '"{}"'.format(cfilter),
98                 '>', testout_file,
99                 shell=True
100             ),
101             shell=True
102             )
103         else:
104             capture_proc = self.runProcess(capture_command(cmd,
105                 '-i', capture_interface,
106                 '-p',
107                 '-w', testout_file,
108                 '-c', '10',
109                 '-a', 'duration:{}'.format(capture_duration),
110                 '-f', cfilter,
111             ))
112         stop_traffic()
113         capture_returncode = capture_proc.returncode
114         if capture_returncode != 0:
115             self.log_fd.write('{} -D output:\n'.format(cmd))
116             self.runProcess((cmd, '-D'))
117         self.assertEqual(capture_returncode, 0)
118         if (capture_returncode == 0):
119             self.checkPacketCount(10)
120     return check_capture_10_packets_real
121
122
123 @fixtures.fixture
124 def check_capture_fifo(cmd_dumpcap):
125     if sys.platform == 'win32':
126         fixtures.skip('Test requires OS fifo support.')
127
128     def check_capture_fifo_real(self, cmd=None):
129         self.assertIsNotNone(cmd)
130         testout_file = self.filename_from_id(testout_pcap)
131         fifo_file = self.filename_from_id('testout.fifo')
132         try:
133             # If a previous test left its fifo laying around, e.g. from a failure, remove it.
134             os.unlink(fifo_file)
135         except:
136             pass
137         os.mkfifo(fifo_file)
138         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
139         fifo_proc = self.startProcess(
140             ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
141             shell=True)
142         capture_proc = self.runProcess(capture_command(cmd,
143             '-i', fifo_file,
144             '-p',
145             '-w', testout_file,
146             '-a', 'duration:{}'.format(capture_duration),
147         ))
148         fifo_proc.kill()
149         self.assertTrue(os.path.isfile(testout_file))
150         capture_returncode = capture_proc.returncode
151         self.assertEqual(capture_returncode, 0)
152         if (capture_returncode == 0):
153             self.checkPacketCount(8)
154     return check_capture_fifo_real
155
156
157 @fixtures.fixture
158 def check_capture_stdin(cmd_dumpcap):
159     # Capturing always requires dumpcap, hence the dependency on it.
160     def check_capture_stdin_real(self, cmd=None):
161         # Similar to suite_io.check_io_4_packets.
162         self.assertIsNotNone(cmd)
163         testout_file = self.filename_from_id(testout_pcap)
164         slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
165         capture_cmd = capture_command(cmd,
166             '-i', '-',
167             '-w', testout_file,
168             '-a', 'duration:{}'.format(capture_duration),
169             shell=True
170         )
171         is_gui = type(cmd) != str and '-k' in cmd[0]
172         if is_gui:
173             capture_cmd += ' -o console.log.level:127'
174         pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
175         pipe_returncode = pipe_proc.returncode
176         self.assertEqual(pipe_returncode, 0)
177         if is_gui:
178             self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
179             self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
180             self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
181         self.assertTrue(os.path.isfile(testout_file))
182         if (pipe_returncode == 0):
183             self.checkPacketCount(8)
184     return check_capture_stdin_real
185
186
187 @fixtures.fixture
188 def check_capture_read_filter(capture_interface, traffic_generator):
189     start_traffic, cfilter = traffic_generator
190     def check_capture_read_filter_real(self, cmd=None):
191         self.assertIsNotNone(cmd)
192         testout_file = self.filename_from_id(testout_pcap)
193         stop_traffic = start_traffic()
194         capture_proc = self.runProcess(capture_command(cmd,
195             '-i', capture_interface,
196             '-p',
197             '-w', testout_file,
198             '-2',
199             '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
200             '-c', '10',
201             '-a', 'duration:{}'.format(capture_duration),
202             '-f', cfilter,
203         ))
204         stop_traffic()
205         capture_returncode = capture_proc.returncode
206         self.assertEqual(capture_returncode, 0)
207
208         if (capture_returncode == 0):
209             self.checkPacketCount(0)
210     return check_capture_read_filter_real
211
212 @fixtures.fixture
213 def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
214     start_traffic, cfilter = traffic_generator
215     def check_capture_snapshot_len_real(self, cmd=None):
216         self.assertIsNotNone(cmd)
217         stop_traffic = start_traffic()
218         testout_file = self.filename_from_id(testout_pcap)
219         capture_proc = self.runProcess(capture_command(cmd,
220             '-i', capture_interface,
221             '-p',
222             '-w', testout_file,
223             '-s', str(snapshot_len),
224             '-a', 'duration:{}'.format(capture_duration),
225             '-f', cfilter,
226         ))
227         stop_traffic()
228         capture_returncode = capture_proc.returncode
229         self.assertEqual(capture_returncode, 0)
230         self.assertTrue(os.path.isfile(testout_file))
231
232         # Use tshark to filter out all packets larger than 68 bytes.
233         testout2_file = self.filename_from_id('testout2.pcap')
234
235         filter_proc = self.runProcess((cmd_tshark,
236             '-r', testout_file,
237             '-w', testout2_file,
238             '-Y', 'frame.cap_len>{}'.format(snapshot_len),
239         ))
240         filter_returncode = filter_proc.returncode
241         self.assertEqual(capture_returncode, 0)
242         if (capture_returncode == 0):
243             self.checkPacketCount(0, cap_file=testout2_file)
244     return check_capture_snapshot_len_real
245
246
247 @fixtures.fixture
248 def check_dumpcap_autostop_stdin(cmd_dumpcap):
249     def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
250         # Similar to check_capture_stdin.
251         testout_file = self.filename_from_id(testout_pcap)
252         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
253         condition='oops:invalid'
254
255         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
256         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
257
258         if packets is not None:
259             condition = 'packets:{}'.format(packets)
260         elif filesize is not None:
261             condition = 'filesize:{}'.format(filesize)
262
263         capture_cmd = ' '.join((cmd_dumpcap,
264             '-i', '-',
265             '-w', testout_file,
266             '-a', condition,
267         ))
268         pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
269         pipe_returncode = pipe_proc.returncode
270         self.assertEqual(pipe_returncode, 0)
271         self.assertTrue(os.path.isfile(testout_file))
272         if (pipe_returncode != 0):
273             return
274
275         if packets is not None:
276             self.checkPacketCount(packets)
277         elif filesize is not None:
278             capturekb = os.path.getsize(testout_file) / 1000
279             self.assertGreaterEqual(capturekb, filesize)
280     return check_dumpcap_autostop_stdin_real
281
282
283 @fixtures.fixture
284 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
285     def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
286         # Similar to check_capture_stdin.
287         rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
288         testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
289         testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
290         cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
291         condition='oops:invalid'
292
293         self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
294         self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
295
296         if packets is not None:
297             condition = 'packets:{}'.format(packets)
298         elif filesize is not None:
299             condition = 'filesize:{}'.format(filesize)
300
301         capture_cmd = ' '.join((cmd_dumpcap,
302             '-i', '-',
303             '-w', testout_file,
304             '-a', 'files:2',
305             '-b', condition,
306         ))
307         pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
308         pipe_returncode = pipe_proc.returncode
309         self.assertEqual(pipe_returncode, 0)
310         if (pipe_returncode != 0):
311             return
312
313         rb_files = glob.glob(testout_glob)
314         for rbf in rb_files:
315             self.cleanup_files.append(rbf)
316
317         self.assertEqual(len(rb_files), 2)
318
319         for rbf in rb_files:
320             self.assertTrue(os.path.isfile(rbf))
321             if packets is not None:
322                 self.checkPacketCount(packets, cap_file=rbf)
323             elif filesize is not None:
324                 capturekb = os.path.getsize(rbf) / 1000
325                 self.assertGreaterEqual(capturekb, filesize)
326     return check_dumpcap_ringbuffer_stdin_real
327
328
329 @fixtures.mark_usefixtures('test_env')
330 @fixtures.uses_fixtures
331 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
332     def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
333         '''Capture 10 packets from the network to a file using Wireshark'''
334         check_capture_10_packets(self, cmd=wireshark_k)
335
336     # Wireshark doesn't currently support writing to stdout while capturing.
337     # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
338     #     '''Capture 10 packets from the network to stdout using Wireshark'''
339     #     check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
340
341     def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
342         '''Capture from a fifo using Wireshark'''
343         check_capture_fifo(self, cmd=wireshark_k)
344
345     def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
346         '''Capture from stdin using Wireshark'''
347         check_capture_stdin(self, cmd=wireshark_k)
348
349     def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
350         '''Capture truncated packets using Wireshark'''
351         check_capture_snapshot_len(self, cmd=wireshark_k)
352
353
354 @fixtures.mark_usefixtures('test_env')
355 @fixtures.uses_fixtures
356 class case_tshark_capture(subprocesstest.SubprocessTestCase):
357     def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
358         '''Capture 10 packets from the network to a file using TShark'''
359         check_capture_10_packets(self, cmd=cmd_tshark)
360
361     def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
362         '''Capture 10 packets from the network to stdout using TShark'''
363         check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
364
365     def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
366         '''Capture from a fifo using TShark'''
367         check_capture_fifo(self, cmd=cmd_tshark)
368
369     def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
370         '''Capture from stdin using TShark'''
371         check_capture_stdin(self, cmd=cmd_tshark)
372
373     def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
374         '''Capture truncated packets using TShark'''
375         check_capture_snapshot_len(self, cmd=cmd_tshark)
376
377
378 @fixtures.mark_usefixtures('base_env')
379 @fixtures.uses_fixtures
380 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
381     def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
382         '''Capture 10 packets from the network to a file using Dumpcap'''
383         check_capture_10_packets(self, cmd=cmd_dumpcap)
384
385     def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
386         '''Capture 10 packets from the network to stdout using Dumpcap'''
387         check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
388
389     def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
390         '''Capture from a fifo using Dumpcap'''
391         check_capture_fifo(self, cmd=cmd_dumpcap)
392
393     def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
394         '''Capture from stdin using Dumpcap'''
395         check_capture_stdin(self, cmd=cmd_dumpcap)
396
397     def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
398         '''Capture truncated packets using Dumpcap'''
399         check_capture_snapshot_len(self, cmd=cmd_dumpcap)
400
401
402 @fixtures.mark_usefixtures('base_env')
403 @fixtures.uses_fixtures
404 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
405     # duration, filesize, packets, files
406     def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
407         '''Capture from stdin using Dumpcap until we reach a file size limit'''
408         check_dumpcap_autostop_stdin(self, filesize=15)
409
410     def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
411         '''Capture from stdin using Dumpcap until we reach a packet limit'''
412         check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
413
414
415 @fixtures.mark_usefixtures('base_env')
416 @fixtures.uses_fixtures
417 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
418     # duration, interval, filesize, packets, files
419     # Need a function that finds ringbuffer file names.
420     def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
421         '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
422         check_dumpcap_ringbuffer_stdin(self, filesize=15)
423
424     def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
425         '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
426         check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.