2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
24 testout_pcap = 'testout.pcap'
27 def start_pinging(self):
29 if sys.platform.startswith('win32'):
30 # Fake '-i' with a subsecond interval.
31 for st in (0.1, 0.1, 0):
32 ping_procs.append(self.startProcess(config.args_ping))
35 ping_procs.append(self.startProcess(config.args_ping))
38 def stop_pinging(ping_procs):
39 for proc in ping_procs:
43 @fixtures.fixture(scope='session')
44 def wireshark_k(wireshark_command):
45 return tuple(list(wireshark_command) + ['-k'])
47 def capture_command(*cmd_args, shell=False):
48 if type(cmd_args[0]) != str:
49 # Assume something like ['wireshark', '-k']
50 cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
52 cmd_args = ' '.join(cmd_args)
57 def check_capture_10_packets(capture_interface, cmd_dumpcap):
58 def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
59 # Similar to suite_io.check_io_4_packets.
60 if not config.args_ping:
61 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
62 self.assertIsNotNone(cmd)
63 testout_file = self.filename_from_id(testout_pcap)
64 ping_procs = start_pinging(self)
66 capture_proc = self.runProcess(capture_command(cmd,
67 '-i', '"{}"'.format(capture_interface),
71 '-a', 'duration:{}'.format(capture_duration),
72 '-f', '"icmp || icmp6"',
79 capture_proc = self.runProcess(capture_command(cmd,
80 '-i', capture_interface,
84 '-a', 'duration:{}'.format(capture_duration),
85 '-f', 'icmp || icmp6',
87 capture_returncode = capture_proc.returncode
88 stop_pinging(ping_procs)
89 if capture_returncode != 0:
90 self.log_fd.write('{} -D output:\n'.format(cmd))
91 self.runProcess((cmd, '-D'))
92 self.assertEqual(capture_returncode, 0)
93 if (capture_returncode == 0):
94 self.checkPacketCount(10)
95 return check_capture_10_packets_real
99 def check_capture_fifo(cmd_dumpcap):
100 if sys.platform == 'win32':
101 fixtures.skip('Test requires OS fifo support.')
103 def check_capture_fifo_real(self, cmd=None):
104 self.assertIsNotNone(cmd)
105 testout_file = self.filename_from_id(testout_pcap)
106 fifo_file = self.filename_from_id('testout.fifo')
108 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
113 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
114 fifo_proc = self.startProcess(
115 ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
117 capture_proc = self.runProcess(capture_command(cmd,
121 '-a', 'duration:{}'.format(capture_duration),
124 self.assertTrue(os.path.isfile(testout_file))
125 capture_returncode = capture_proc.returncode
126 self.assertEqual(capture_returncode, 0)
127 if (capture_returncode == 0):
128 self.checkPacketCount(8)
129 return check_capture_fifo_real
133 def check_capture_stdin(cmd_dumpcap):
134 # Capturing always requires dumpcap, hence the dependency on it.
135 def check_capture_stdin_real(self, cmd=None):
136 # Similar to suite_io.check_io_4_packets.
137 self.assertIsNotNone(cmd)
138 testout_file = self.filename_from_id(testout_pcap)
139 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
140 capture_cmd = capture_command(cmd,
143 '-a', 'duration:{}'.format(capture_duration),
146 is_gui = type(cmd) != str and '-k' in cmd[0]
148 capture_cmd += ' -o console.log.level:127'
149 pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
150 pipe_returncode = pipe_proc.returncode
151 self.assertEqual(pipe_returncode, 0)
153 self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
154 self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
155 self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
156 self.assertTrue(os.path.isfile(testout_file))
157 if (pipe_returncode == 0):
158 self.checkPacketCount(8)
159 return check_capture_stdin_real
163 def check_capture_read_filter(capture_interface):
164 def check_capture_read_filter_real(self, cmd=None):
165 if not config.args_ping:
166 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
167 self.assertIsNotNone(cmd)
168 ping_procs = start_pinging(self)
169 testout_file = self.filename_from_id(testout_pcap)
170 capture_proc = self.runProcess(capture_command(cmd,
171 '-i', capture_interface,
175 '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
177 '-a', 'duration:{}'.format(capture_duration),
178 '-f', 'icmp || icmp6',
180 capture_returncode = capture_proc.returncode
181 stop_pinging(ping_procs)
182 self.assertEqual(capture_returncode, 0)
184 if (capture_returncode == 0):
185 self.checkPacketCount(0)
186 return check_capture_read_filter_real
189 def check_capture_snapshot_len(capture_interface, cmd_tshark):
190 def check_capture_snapshot_len_real(self, cmd=None):
191 if not config.args_ping:
192 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
193 self.assertIsNotNone(cmd)
194 ping_procs = start_pinging(self)
195 testout_file = self.filename_from_id(testout_pcap)
196 capture_proc = self.runProcess(capture_command(cmd,
197 '-i', capture_interface,
200 '-s', str(snapshot_len),
201 '-a', 'duration:{}'.format(capture_duration),
202 '-f', 'icmp || icmp6',
204 capture_returncode = capture_proc.returncode
205 stop_pinging(ping_procs)
206 self.assertEqual(capture_returncode, 0)
207 self.assertTrue(os.path.isfile(testout_file))
209 # Use tshark to filter out all packets larger than 68 bytes.
210 testout2_file = self.filename_from_id('testout2.pcap')
212 filter_proc = self.runProcess((cmd_tshark,
215 '-Y', 'frame.cap_len>{}'.format(snapshot_len),
217 filter_returncode = filter_proc.returncode
218 self.assertEqual(capture_returncode, 0)
219 if (capture_returncode == 0):
220 self.checkPacketCount(0, cap_file=testout2_file)
221 return check_capture_snapshot_len_real
225 def check_dumpcap_autostop_stdin(cmd_dumpcap):
226 def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
227 # Similar to check_capture_stdin.
228 testout_file = self.filename_from_id(testout_pcap)
229 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
230 condition='oops:invalid'
232 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
233 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
235 if packets is not None:
236 condition = 'packets:{}'.format(packets)
237 elif filesize is not None:
238 condition = 'filesize:{}'.format(filesize)
240 capture_cmd = ' '.join((cmd_dumpcap,
245 pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
246 pipe_returncode = pipe_proc.returncode
247 self.assertEqual(pipe_returncode, 0)
248 self.assertTrue(os.path.isfile(testout_file))
249 if (pipe_returncode != 0):
252 if packets is not None:
253 self.checkPacketCount(packets)
254 elif filesize is not None:
255 capturekb = os.path.getsize(testout_file) / 1000
256 self.assertGreaterEqual(capturekb, filesize)
257 return check_dumpcap_autostop_stdin_real
261 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
262 def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
263 # Similar to check_capture_stdin.
264 rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
265 testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
266 testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
267 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
268 condition='oops:invalid'
270 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
271 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
273 if packets is not None:
274 condition = 'packets:{}'.format(packets)
275 elif filesize is not None:
276 condition = 'filesize:{}'.format(filesize)
278 capture_cmd = ' '.join((cmd_dumpcap,
284 pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
285 pipe_returncode = pipe_proc.returncode
286 self.assertEqual(pipe_returncode, 0)
287 if (pipe_returncode != 0):
290 rb_files = glob.glob(testout_glob)
292 self.cleanup_files.append(rbf)
294 self.assertEqual(len(rb_files), 2)
297 self.assertTrue(os.path.isfile(rbf))
298 if packets is not None:
299 self.checkPacketCount(packets, cap_file=rbf)
300 elif filesize is not None:
301 capturekb = os.path.getsize(rbf) / 1000
302 self.assertGreaterEqual(capturekb, filesize)
303 return check_dumpcap_ringbuffer_stdin_real
306 @fixtures.mark_usefixtures('test_env')
307 @fixtures.uses_fixtures
308 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
309 def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
310 '''Capture 10 packets from the network to a file using Wireshark'''
311 check_capture_10_packets(self, cmd=wireshark_k)
313 # Wireshark doesn't currently support writing to stdout while capturing.
314 # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
315 # '''Capture 10 packets from the network to stdout using Wireshark'''
316 # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
318 def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
319 '''Capture from a fifo using Wireshark'''
320 check_capture_fifo(self, cmd=wireshark_k)
322 def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
323 '''Capture from stdin using Wireshark'''
324 check_capture_stdin(self, cmd=wireshark_k)
326 def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
327 '''Capture truncated packets using Wireshark'''
328 check_capture_snapshot_len(self, cmd=wireshark_k)
331 @fixtures.mark_usefixtures('test_env')
332 @fixtures.uses_fixtures
333 class case_tshark_capture(subprocesstest.SubprocessTestCase):
334 def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
335 '''Capture 10 packets from the network to a file using TShark'''
336 check_capture_10_packets(self, cmd=cmd_tshark)
338 def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
339 '''Capture 10 packets from the network to stdout using TShark'''
340 check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
342 def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
343 '''Capture from a fifo using TShark'''
344 check_capture_fifo(self, cmd=cmd_tshark)
346 def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
347 '''Capture from stdin using TShark'''
348 check_capture_stdin(self, cmd=cmd_tshark)
350 def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
351 '''Capture truncated packets using TShark'''
352 check_capture_snapshot_len(self, cmd=cmd_tshark)
355 @fixtures.mark_usefixtures('base_env')
356 @fixtures.uses_fixtures
357 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
358 def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
359 '''Capture 10 packets from the network to a file using Dumpcap'''
360 check_capture_10_packets(self, cmd=cmd_dumpcap)
362 def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
363 '''Capture 10 packets from the network to stdout using Dumpcap'''
364 check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
366 def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
367 '''Capture from a fifo using Dumpcap'''
368 check_capture_fifo(self, cmd=cmd_dumpcap)
370 def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
371 '''Capture from stdin using Dumpcap'''
372 check_capture_stdin(self, cmd=cmd_dumpcap)
374 def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
375 '''Capture truncated packets using Dumpcap'''
376 check_capture_snapshot_len(self, cmd=cmd_dumpcap)
379 @fixtures.mark_usefixtures('base_env')
380 @fixtures.uses_fixtures
381 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
382 # duration, filesize, packets, files
383 def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
384 '''Capture from stdin using Dumpcap until we reach a file size limit'''
385 check_dumpcap_autostop_stdin(self, filesize=15)
387 def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
388 '''Capture from stdin using Dumpcap until we reach a packet limit'''
389 check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
392 @fixtures.mark_usefixtures('base_env')
393 @fixtures.uses_fixtures
394 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
395 # duration, interval, filesize, packets, files
396 # Need a function that finds ringbuffer file names.
397 def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
398 '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
399 check_dumpcap_ringbuffer_stdin(self, filesize=15)
401 def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
402 '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
403 check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.