2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
25 testout_pcap = 'testout.pcap'
28 def start_pinging(self):
30 if sys.platform.startswith('win32'):
31 # Fake '-i' with a subsecond interval.
32 for st in (0.1, 0.1, 0):
33 ping_procs.append(self.startProcess(config.args_ping))
36 ping_procs.append(self.startProcess(config.args_ping))
39 def stop_pinging(ping_procs):
40 for proc in ping_procs:
43 def check_capture_10_packets(self, cmd=None, to_stdout=False):
44 # Similar to suite_io.check_io_4_packets.
45 if not config.canCapture():
46 self.skipTest('Test requires capture privileges and an interface.')
47 if cmd == config.cmd_wireshark and not config.canDisplay():
48 self.skipTest('Test requires a display.')
49 if not config.args_ping:
50 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
51 self.assertIsNotNone(cmd)
52 testout_file = self.filename_from_id(testout_pcap)
53 ping_procs = start_pinging(self)
55 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
56 '-i', '"{}"'.format(config.capture_interface),
60 '-a', 'duration:{}'.format(capture_duration),
61 '-f', '"icmp || icmp6"',
68 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
69 '-i', config.capture_interface,
73 '-a', 'duration:{}'.format(capture_duration),
74 '-f', 'icmp || icmp6',
76 capture_returncode = capture_proc.returncode
77 stop_pinging(ping_procs)
78 if capture_returncode != 0:
79 self.log_fd.write('{} -D output:\n'.format(cmd))
80 self.runProcess((cmd, '-D'))
81 self.assertEqual(capture_returncode, 0)
82 if (capture_returncode == 0):
83 self.checkPacketCount(10)
85 def check_capture_fifo(self, cmd=None):
86 if not config.canMkfifo():
87 self.skipTest('Test requires OS fifo support.')
88 if cmd == config.cmd_wireshark and not config.canDisplay():
89 self.skipTest('Test requires a display.')
90 self.assertIsNotNone(cmd)
91 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
92 testout_file = self.filename_from_id(testout_pcap)
93 fifo_file = self.filename_from_id('testout.fifo')
95 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
100 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
101 fifo_proc = self.startProcess(
102 ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
104 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
108 '-a', 'duration:{}'.format(capture_duration),
111 self.assertTrue(os.path.isfile(testout_file))
112 capture_returncode = capture_proc.returncode
113 self.assertEqual(capture_returncode, 0)
114 if (capture_returncode == 0):
115 self.checkPacketCount(8)
117 def check_capture_stdin(self, cmd=None):
118 # Similar to suite_io.check_io_4_packets.
119 if cmd == config.cmd_wireshark and not config.canDisplay():
120 self.skipTest('Test requires a display.')
121 self.assertIsNotNone(cmd)
122 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
123 testout_file = self.filename_from_id(testout_pcap)
124 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
125 capture_cmd = subprocesstest.capture_command(cmd,
128 '-a', 'duration:{}'.format(capture_duration),
131 if cmd == config.cmd_wireshark:
132 capture_cmd += ' -o console.log.level:127'
133 pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
134 pipe_returncode = pipe_proc.returncode
135 self.assertEqual(pipe_returncode, 0)
136 if cmd == config.cmd_wireshark:
137 self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
138 self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
139 self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
140 self.assertTrue(os.path.isfile(testout_file))
141 if (pipe_returncode == 0):
142 self.checkPacketCount(8)
144 def check_capture_read_filter(self, cmd=None):
145 if not config.canCapture():
146 self.skipTest('Test requires capture privileges and an interface.')
147 if cmd == config.cmd_wireshark and not config.canDisplay():
148 self.skipTest('Test requires a display.')
149 if not config.args_ping:
150 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
151 self.assertIsNotNone(cmd)
152 ping_procs = start_pinging(self)
153 testout_file = self.filename_from_id(testout_pcap)
154 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
155 '-i', config.capture_interface,
159 '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
161 '-a', 'duration:{}'.format(capture_duration),
162 '-f', 'icmp || icmp6',
164 capture_returncode = capture_proc.returncode
165 stop_pinging(ping_procs)
166 self.assertEqual(capture_returncode, 0)
168 if (capture_returncode == 0):
169 self.checkPacketCount(0)
171 def check_capture_snapshot_len(self, cmd=None):
172 if not config.canCapture():
173 self.skipTest('Test requires capture privileges and an interface.')
174 if cmd == config.cmd_wireshark and not config.canDisplay():
175 self.skipTest('Test requires a display.')
176 if not config.args_ping:
177 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
178 self.assertIsNotNone(cmd)
179 ping_procs = start_pinging(self)
180 testout_file = self.filename_from_id(testout_pcap)
181 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
182 '-i', config.capture_interface,
185 '-s', str(snapshot_len),
186 '-a', 'duration:{}'.format(capture_duration),
187 '-f', 'icmp || icmp6',
189 capture_returncode = capture_proc.returncode
190 stop_pinging(ping_procs)
191 self.assertEqual(capture_returncode, 0)
192 self.assertTrue(os.path.isfile(testout_file))
194 # Use tshark to filter out all packets larger than 68 bytes.
195 testout2_file = self.filename_from_id('testout2.pcap')
197 filter_proc = self.runProcess((config.cmd_tshark,
200 '-Y', 'frame.cap_len>{}'.format(snapshot_len),
202 filter_returncode = filter_proc.returncode
203 self.assertEqual(capture_returncode, 0)
204 if (capture_returncode == 0):
205 self.checkPacketCount(0, cap_file=testout2_file)
207 def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
208 # Similar to check_capture_stdin.
209 cmd = config.cmd_dumpcap
210 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
211 testout_file = self.filename_from_id(testout_pcap)
212 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
213 condition='oops:invalid'
215 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
216 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
218 if packets is not None:
219 condition = 'packets:{}'.format(packets)
220 elif filesize is not None:
221 condition = 'filesize:{}'.format(filesize)
223 capture_cmd = subprocesstest.capture_command(cmd,
229 pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
230 pipe_returncode = pipe_proc.returncode
231 self.assertEqual(pipe_returncode, 0)
232 self.assertTrue(os.path.isfile(testout_file))
233 if (pipe_returncode != 0):
236 if packets is not None:
237 self.checkPacketCount(packets)
238 elif filesize is not None:
239 capturekb = os.path.getsize(testout_file) / 1000
240 self.assertGreaterEqual(capturekb, filesize)
242 def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
243 # Similar to check_capture_stdin.
244 cmd = config.cmd_dumpcap
245 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
246 rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
247 testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
248 testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
249 cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
250 condition='oops:invalid'
252 self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
253 self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
255 if packets is not None:
256 condition = 'packets:{}'.format(packets)
257 elif filesize is not None:
258 condition = 'filesize:{}'.format(filesize)
260 capture_cmd = subprocesstest.capture_command(cmd,
267 pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
268 pipe_returncode = pipe_proc.returncode
269 self.assertEqual(pipe_returncode, 0)
270 if (pipe_returncode != 0):
273 rb_files = glob.glob(testout_glob)
275 self.cleanup_files.append(rbf)
277 self.assertEqual(len(rb_files), 2)
280 self.assertTrue(os.path.isfile(rbf))
281 if packets is not None:
282 self.checkPacketCount(packets, cap_file=rbf)
283 elif filesize is not None:
284 capturekb = os.path.getsize(rbf) / 1000
285 self.assertGreaterEqual(capturekb, filesize)
287 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
288 def test_wireshark_capture_10_packets_to_file(self):
289 '''Capture 10 packets from the network to a file using Wireshark'''
290 check_capture_10_packets(self, cmd=config.cmd_wireshark)
292 # Wireshark doesn't currently support writing to stdout while capturing.
293 # def test_wireshark_capture_10_packets_to_stdout(self):
294 # '''Capture 10 packets from the network to stdout using Wireshark'''
295 # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
297 def test_wireshark_capture_from_fifo(self):
298 '''Capture from a fifo using Wireshark'''
299 check_capture_fifo(self, cmd=config.cmd_wireshark)
301 def test_wireshark_capture_from_stdin(self):
302 '''Capture from stdin using Wireshark'''
303 check_capture_stdin(self, cmd=config.cmd_wireshark)
305 def test_wireshark_capture_snapshot_len(self):
306 '''Capture truncated packets using Wireshark'''
307 check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
309 class case_tshark_capture(subprocesstest.SubprocessTestCase):
310 def test_tshark_capture_10_packets_to_file(self):
311 '''Capture 10 packets from the network to a file using TShark'''
312 check_capture_10_packets(self, cmd=config.cmd_tshark)
314 def test_tshark_capture_10_packets_to_stdout(self):
315 '''Capture 10 packets from the network to stdout using TShark'''
316 check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
318 def test_tshark_capture_from_fifo(self):
319 '''Capture from a fifo using TShark'''
320 check_capture_fifo(self, cmd=config.cmd_tshark)
322 def test_tshark_capture_from_stdin(self):
323 '''Capture from stdin using TShark'''
324 check_capture_stdin(self, cmd=config.cmd_tshark)
326 def test_tshark_capture_snapshot_len(self):
327 '''Capture truncated packets using TShark'''
328 check_capture_snapshot_len(self, cmd=config.cmd_tshark)
330 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
331 def test_dumpcap_capture_10_packets_to_file(self):
332 '''Capture 10 packets from the network to a file using Dumpcap'''
333 check_capture_10_packets(self, cmd=config.cmd_dumpcap)
335 def test_dumpcap_capture_10_packets_to_stdout(self):
336 '''Capture 10 packets from the network to stdout using Dumpcap'''
337 check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
339 def test_dumpcap_capture_from_fifo(self):
340 '''Capture from a fifo using Dumpcap'''
341 check_capture_fifo(self, cmd=config.cmd_dumpcap)
343 def test_dumpcap_capture_from_stdin(self):
344 '''Capture from stdin using Dumpcap'''
345 check_capture_stdin(self, cmd=config.cmd_dumpcap)
347 def test_dumpcap_capture_snapshot_len(self):
348 '''Capture truncated packets using Dumpcap'''
349 check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
351 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
352 # duration, filesize, packets, files
353 def test_dumpcap_autostop_filesize(self):
354 '''Capture from stdin using Dumpcap until we reach a file size limit'''
355 check_dumpcap_autostop_stdin(self, filesize=15)
357 def test_dumpcap_autostop_packets(self):
358 '''Capture from stdin using Dumpcap until we reach a packet limit'''
359 check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
361 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
362 # duration, interval, filesize, packets, files
363 # Need a function that finds ringbuffer file names.
364 def test_dumpcap_ringbuffer_filesize(self):
365 '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
366 check_dumpcap_ringbuffer_stdin(self, filesize=15)
368 def test_dumpcap_ringbuffer_packets(self):
369 '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
370 check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.