eb1a4ffcab62eb2b0702387ad91386cf21e7ced2
[metze/wireshark/wip.git] / test / suite_capture.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Capture tests'''
11
12 import config
13 import glob
14 import os
15 import re
16 import subprocess
17 import subprocesstest
18 import sys
19 import time
20 import unittest
21 import uuid
22
23 capture_duration = 5
24
25 testout_pcap = 'testout.pcap'
26 snapshot_len = 96
27
28 def start_pinging(self):
29     ping_procs = []
30     if sys.platform.startswith('win32'):
31         # Fake '-i' with a subsecond interval.
32         for st in (0.1, 0.1, 0):
33             ping_procs.append(self.startProcess(config.args_ping))
34             time.sleep(st)
35     else:
36         ping_procs.append(self.startProcess(config.args_ping))
37     return ping_procs
38
39 def stop_pinging(ping_procs):
40     for proc in ping_procs:
41         proc.kill()
42
43 def check_capture_10_packets(self, cmd=None, to_stdout=False):
44     # Similar to suite_io.check_io_4_packets.
45     if not config.canCapture():
46         self.skipTest('Test requires capture privileges and an interface.')
47     if cmd == config.cmd_wireshark and not config.canDisplay():
48         self.skipTest('Test requires a display.')
49     if not config.args_ping:
50         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
51     self.assertIsNotNone(cmd)
52     testout_file = self.filename_from_id(testout_pcap)
53     ping_procs = start_pinging(self)
54     if to_stdout:
55         capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
56             '-i', '"{}"'.format(config.capture_interface),
57             '-p',
58             '-w', '-',
59             '-c', '10',
60             '-a', 'duration:{}'.format(capture_duration),
61             '-f', '"icmp || icmp6"',
62             '>', testout_file,
63             shell=True
64         ),
65         shell=True
66         )
67     else:
68         capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
69             '-i', config.capture_interface,
70             '-p',
71             '-w', testout_file,
72             '-c', '10',
73             '-a', 'duration:{}'.format(capture_duration),
74             '-f', 'icmp || icmp6',
75         ))
76     capture_returncode = capture_proc.returncode
77     stop_pinging(ping_procs)
78     if capture_returncode != 0:
79         self.log_fd.write('{} -D output:\n'.format(cmd))
80         self.runProcess((cmd, '-D'))
81     self.assertEqual(capture_returncode, 0)
82     if (capture_returncode == 0):
83         self.checkPacketCount(10)
84
85 def check_capture_fifo(self, cmd=None):
86     if not config.canMkfifo():
87         self.skipTest('Test requires OS fifo support.')
88     if cmd == config.cmd_wireshark and not config.canDisplay():
89         self.skipTest('Test requires a display.')
90     self.assertIsNotNone(cmd)
91     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
92     testout_file = self.filename_from_id(testout_pcap)
93     fifo_file = self.filename_from_id('testout.fifo')
94     try:
95         # If a previous test left its fifo laying around, e.g. from a failure, remove it.
96         os.unlink(fifo_file)
97     except:
98         pass
99     os.mkfifo(fifo_file)
100     slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
101     fifo_proc = self.startProcess(
102         ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
103         shell=True)
104     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
105         '-i', fifo_file,
106         '-p',
107         '-w', testout_file,
108         '-a', 'duration:{}'.format(capture_duration),
109     ))
110     fifo_proc.kill()
111     self.assertTrue(os.path.isfile(testout_file))
112     capture_returncode = capture_proc.returncode
113     self.assertEqual(capture_returncode, 0)
114     if (capture_returncode == 0):
115         self.checkPacketCount(8)
116
117 def check_capture_stdin(self, cmd=None):
118     # Similar to suite_io.check_io_4_packets.
119     if cmd == config.cmd_wireshark and not config.canDisplay():
120         self.skipTest('Test requires a display.')
121     self.assertIsNotNone(cmd)
122     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
123     testout_file = self.filename_from_id(testout_pcap)
124     slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
125     capture_cmd = subprocesstest.capture_command(cmd,
126         '-i', '-',
127         '-w', testout_file,
128         '-a', 'duration:{}'.format(capture_duration),
129         shell=True
130     )
131     if cmd == config.cmd_wireshark:
132         capture_cmd += ' -o console.log.level:127'
133     pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
134     pipe_returncode = pipe_proc.returncode
135     self.assertEqual(pipe_returncode, 0)
136     if cmd == config.cmd_wireshark:
137         self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
138         self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
139         self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
140     self.assertTrue(os.path.isfile(testout_file))
141     if (pipe_returncode == 0):
142         self.checkPacketCount(8)
143
144 def check_capture_read_filter(self, cmd=None):
145     if not config.canCapture():
146         self.skipTest('Test requires capture privileges and an interface.')
147     if cmd == config.cmd_wireshark and not config.canDisplay():
148         self.skipTest('Test requires a display.')
149     if not config.args_ping:
150         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
151     self.assertIsNotNone(cmd)
152     ping_procs = start_pinging(self)
153     testout_file = self.filename_from_id(testout_pcap)
154     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
155         '-i', config.capture_interface,
156         '-p',
157         '-w', testout_file,
158         '-2',
159         '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
160         '-c', '10',
161         '-a', 'duration:{}'.format(capture_duration),
162         '-f', 'icmp || icmp6',
163     ))
164     capture_returncode = capture_proc.returncode
165     stop_pinging(ping_procs)
166     self.assertEqual(capture_returncode, 0)
167
168     if (capture_returncode == 0):
169         self.checkPacketCount(0)
170
171 def check_capture_snapshot_len(self, cmd=None):
172     if not config.canCapture():
173         self.skipTest('Test requires capture privileges and an interface.')
174     if cmd == config.cmd_wireshark and not config.canDisplay():
175         self.skipTest('Test requires a display.')
176     if not config.args_ping:
177         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
178     self.assertIsNotNone(cmd)
179     ping_procs = start_pinging(self)
180     testout_file = self.filename_from_id(testout_pcap)
181     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
182         '-i', config.capture_interface,
183         '-p',
184         '-w', testout_file,
185         '-s', str(snapshot_len),
186         '-a', 'duration:{}'.format(capture_duration),
187         '-f', 'icmp || icmp6',
188     ))
189     capture_returncode = capture_proc.returncode
190     stop_pinging(ping_procs)
191     self.assertEqual(capture_returncode, 0)
192     self.assertTrue(os.path.isfile(testout_file))
193
194     # Use tshark to filter out all packets larger than 68 bytes.
195     testout2_file = self.filename_from_id('testout2.pcap')
196
197     filter_proc = self.runProcess((config.cmd_tshark,
198         '-r', testout_file,
199         '-w', testout2_file,
200         '-Y', 'frame.cap_len>{}'.format(snapshot_len),
201     ))
202     filter_returncode = filter_proc.returncode
203     self.assertEqual(capture_returncode, 0)
204     if (capture_returncode == 0):
205         self.checkPacketCount(0, cap_file=testout2_file)
206
207 def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
208     # Similar to check_capture_stdin.
209     cmd = config.cmd_dumpcap
210     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
211     testout_file = self.filename_from_id(testout_pcap)
212     cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
213     condition='oops:invalid'
214
215     self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
216     self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
217
218     if packets is not None:
219         condition = 'packets:{}'.format(packets)
220     elif filesize is not None:
221         condition = 'filesize:{}'.format(filesize)
222
223     capture_cmd = subprocesstest.capture_command(cmd,
224         '-i', '-',
225         '-w', testout_file,
226         '-a', condition,
227         shell=True
228     )
229     pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
230     pipe_returncode = pipe_proc.returncode
231     self.assertEqual(pipe_returncode, 0)
232     self.assertTrue(os.path.isfile(testout_file))
233     if (pipe_returncode != 0):
234         return
235
236     if packets is not None:
237         self.checkPacketCount(packets)
238     elif filesize is not None:
239         capturekb = os.path.getsize(testout_file) / 1000
240         self.assertGreaterEqual(capturekb, filesize)
241
242 def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
243     # Similar to check_capture_stdin.
244     cmd = config.cmd_dumpcap
245     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
246     rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
247     testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
248     testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
249     cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
250     condition='oops:invalid'
251
252     self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
253     self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
254
255     if packets is not None:
256         condition = 'packets:{}'.format(packets)
257     elif filesize is not None:
258         condition = 'filesize:{}'.format(filesize)
259
260     capture_cmd = subprocesstest.capture_command(cmd,
261         '-i', '-',
262         '-w', testout_file,
263         '-a', 'files:2',
264         '-b', condition,
265         shell=True
266     )
267     pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
268     pipe_returncode = pipe_proc.returncode
269     self.assertEqual(pipe_returncode, 0)
270     if (pipe_returncode != 0):
271         return
272
273     rb_files = glob.glob(testout_glob)
274     for rbf in rb_files:
275         self.cleanup_files.append(rbf)
276
277     self.assertEqual(len(rb_files), 2)
278
279     for rbf in rb_files:
280         self.assertTrue(os.path.isfile(rbf))
281         if packets is not None:
282             self.checkPacketCount(packets, cap_file=rbf)
283         elif filesize is not None:
284             capturekb = os.path.getsize(rbf) / 1000
285             self.assertGreaterEqual(capturekb, filesize)
286
287 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
288     def test_wireshark_capture_10_packets_to_file(self):
289         '''Capture 10 packets from the network to a file using Wireshark'''
290         check_capture_10_packets(self, cmd=config.cmd_wireshark)
291
292     # Wireshark doesn't currently support writing to stdout while capturing.
293     # def test_wireshark_capture_10_packets_to_stdout(self):
294     #     '''Capture 10 packets from the network to stdout using Wireshark'''
295     #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
296
297     def test_wireshark_capture_from_fifo(self):
298         '''Capture from a fifo using Wireshark'''
299         check_capture_fifo(self, cmd=config.cmd_wireshark)
300
301     def test_wireshark_capture_from_stdin(self):
302         '''Capture from stdin using Wireshark'''
303         check_capture_stdin(self, cmd=config.cmd_wireshark)
304
305     def test_wireshark_capture_snapshot_len(self):
306         '''Capture truncated packets using Wireshark'''
307         check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
308
309 class case_tshark_capture(subprocesstest.SubprocessTestCase):
310     def test_tshark_capture_10_packets_to_file(self):
311         '''Capture 10 packets from the network to a file using TShark'''
312         check_capture_10_packets(self, cmd=config.cmd_tshark)
313
314     def test_tshark_capture_10_packets_to_stdout(self):
315         '''Capture 10 packets from the network to stdout using TShark'''
316         check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
317
318     def test_tshark_capture_from_fifo(self):
319         '''Capture from a fifo using TShark'''
320         check_capture_fifo(self, cmd=config.cmd_tshark)
321
322     def test_tshark_capture_from_stdin(self):
323         '''Capture from stdin using TShark'''
324         check_capture_stdin(self, cmd=config.cmd_tshark)
325
326     def test_tshark_capture_snapshot_len(self):
327         '''Capture truncated packets using TShark'''
328         check_capture_snapshot_len(self, cmd=config.cmd_tshark)
329
330 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
331     def test_dumpcap_capture_10_packets_to_file(self):
332         '''Capture 10 packets from the network to a file using Dumpcap'''
333         check_capture_10_packets(self, cmd=config.cmd_dumpcap)
334
335     def test_dumpcap_capture_10_packets_to_stdout(self):
336         '''Capture 10 packets from the network to stdout using Dumpcap'''
337         check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
338
339     def test_dumpcap_capture_from_fifo(self):
340         '''Capture from a fifo using Dumpcap'''
341         check_capture_fifo(self, cmd=config.cmd_dumpcap)
342
343     def test_dumpcap_capture_from_stdin(self):
344         '''Capture from stdin using Dumpcap'''
345         check_capture_stdin(self, cmd=config.cmd_dumpcap)
346
347     def test_dumpcap_capture_snapshot_len(self):
348         '''Capture truncated packets using Dumpcap'''
349         check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
350
351 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
352     # duration, filesize, packets, files
353     def test_dumpcap_autostop_filesize(self):
354         '''Capture from stdin using Dumpcap until we reach a file size limit'''
355         check_dumpcap_autostop_stdin(self, filesize=15)
356
357     def test_dumpcap_autostop_packets(self):
358         '''Capture from stdin using Dumpcap until we reach a packet limit'''
359         check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
360
361 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
362     # duration, interval, filesize, packets, files
363     # Need a function that finds ringbuffer file names.
364     def test_dumpcap_ringbuffer_filesize(self):
365         '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
366         check_dumpcap_ringbuffer_stdin(self, filesize=15)
367
368     def test_dumpcap_ringbuffer_packets(self):
369         '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
370         check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.