2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
23 testout_pcap = 'testout.pcap'
25 capture_env = os.environ.copy()
26 capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
28 def start_pinging(self):
30 if sys.platform.startswith('win32'):
31 # Fake '-i' with a subsecond interval.
32 for st in (0.1, 0.1, 0):
33 ping_procs.append(self.startProcess(config.args_ping))
36 ping_procs.append(self.startProcess(config.args_ping))
39 def stop_pinging(ping_procs):
40 for proc in ping_procs:
43 def check_capture_10_packets(self, cmd=None, to_stdout=False):
44 # Similar to suite_io.check_io_4_packets.
45 if not config.canCapture():
46 self.skipTest('Test requires capture privileges and an interface.')
47 if cmd == config.cmd_wireshark and not config.canDisplay():
48 self.skipTest('Test requires a display.')
49 if not config.args_ping:
50 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
51 self.assertIsNotNone(cmd)
52 testout_file = self.filename_from_id(testout_pcap)
53 ping_procs = start_pinging(self)
55 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
56 '-i', '"{}"'.format(config.capture_interface),
60 '-a', 'duration:{}'.format(capture_duration),
61 '-f', '"icmp || icmp6"',
69 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
70 '-i', config.capture_interface,
74 '-a', 'duration:{}'.format(capture_duration),
75 '-f', 'icmp || icmp6',
79 capture_returncode = capture_proc.returncode
80 stop_pinging(ping_procs)
81 if capture_returncode != 0:
82 self.log_fd.write('{} -D output:\n'.format(cmd))
83 self.runProcess((cmd, '-D'))
84 self.assertEqual(capture_returncode, 0)
85 if (capture_returncode == 0):
86 self.checkPacketCount(10)
88 def check_capture_fifo(self, cmd=None):
89 if not config.canMkfifo():
90 self.skipTest('Test requires OS fifo support.')
91 if cmd == config.cmd_wireshark and not config.canDisplay():
92 self.skipTest('Test requires a display.')
93 self.assertIsNotNone(cmd)
94 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
95 testout_file = self.filename_from_id(testout_pcap)
96 fifo_file = self.filename_from_id('testout.fifo')
98 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
103 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
104 fifo_proc = self.startProcess(
105 ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
107 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
111 '-a', 'duration:{}'.format(capture_duration),
116 self.assertTrue(os.path.isfile(testout_file))
117 capture_returncode = capture_proc.returncode
118 self.assertEqual(capture_returncode, 0)
119 if (capture_returncode == 0):
120 self.checkPacketCount(8)
122 def check_capture_stdin(self, cmd=None):
123 # Similar to suite_io.check_io_4_packets.
124 if cmd == config.cmd_wireshark and not config.canDisplay():
125 self.skipTest('Test requires a display.')
126 self.assertIsNotNone(cmd)
127 capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
128 testout_file = self.filename_from_id(testout_pcap)
129 slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
130 capture_cmd = subprocesstest.capture_command(cmd,
133 '-a', 'duration:{}'.format(capture_duration),
136 if cmd == config.cmd_wireshark:
137 capture_cmd += ' -o console.log.level:127'
138 pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
139 pipe_returncode = pipe_proc.returncode
140 self.assertEqual(pipe_returncode, 0)
141 if cmd == config.cmd_wireshark:
142 self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
143 self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
144 self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
145 self.assertTrue(os.path.isfile(testout_file))
146 if (pipe_returncode == 0):
147 self.checkPacketCount(8)
149 def check_capture_2multi_10packets(self, cmd=None):
150 # This was present in the Bash version but was incorrect and not part of any suite.
151 # It's apparently intended to test file rotation.
152 self.skipTest('Not yet implemented')
154 def check_capture_read_filter(self, cmd=None):
155 if not config.canCapture():
156 self.skipTest('Test requires capture privileges and an interface.')
157 if cmd == config.cmd_wireshark and not config.canDisplay():
158 self.skipTest('Test requires a display.')
159 if not config.args_ping:
160 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
161 self.assertIsNotNone(cmd)
162 ping_procs = start_pinging(self)
163 testout_file = self.filename_from_id(testout_pcap)
164 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
165 '-i', config.capture_interface,
169 '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
171 '-a', 'duration:{}'.format(capture_duration),
172 '-f', 'icmp || icmp6',
176 capture_returncode = capture_proc.returncode
177 stop_pinging(ping_procs)
178 self.assertEqual(capture_returncode, 0)
180 if (capture_returncode == 0):
181 self.checkPacketCount(0)
183 def check_capture_snapshot_len(self, cmd=None):
184 if not config.canCapture():
185 self.skipTest('Test requires capture privileges and an interface.')
186 if cmd == config.cmd_wireshark and not config.canDisplay():
187 self.skipTest('Test requires a display.')
188 if not config.args_ping:
189 self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
190 self.assertIsNotNone(cmd)
191 ping_procs = start_pinging(self)
192 testout_file = self.filename_from_id(testout_pcap)
193 capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
194 '-i', config.capture_interface,
197 '-s', str(snapshot_len),
198 '-a', 'duration:{}'.format(capture_duration),
199 '-f', 'icmp || icmp6',
203 capture_returncode = capture_proc.returncode
204 stop_pinging(ping_procs)
205 self.assertEqual(capture_returncode, 0)
206 self.assertTrue(os.path.isfile(testout_file))
208 # Use tshark to filter out all packets larger than 68 bytes.
209 testout2_file = self.filename_from_id('testout2.pcap')
211 filter_proc = self.runProcess((config.cmd_tshark,
214 '-Y', 'frame.cap_len>{}'.format(snapshot_len),
216 filter_returncode = filter_proc.returncode
217 self.assertEqual(capture_returncode, 0)
218 if (capture_returncode == 0):
219 self.checkPacketCount(0, cap_file=testout2_file)
221 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
222 def test_wireshark_capture_10_packets_to_file(self):
223 '''Capture 10 packets from the network to a file using Wireshark'''
224 check_capture_10_packets(self, cmd=config.cmd_wireshark)
226 # Wireshark doesn't currently support writing to stdout while capturing.
227 # def test_wireshark_capture_10_packets_to_stdout(self):
228 # '''Capture 10 packets from the network to stdout using Wireshark'''
229 # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
231 def test_wireshark_capture_from_fifo(self):
232 '''Capture from a fifo using Wireshark'''
233 check_capture_fifo(self, cmd=config.cmd_wireshark)
235 def test_wireshark_capture_from_stdin(self):
236 '''Capture from stdin using Wireshark'''
237 check_capture_stdin(self, cmd=config.cmd_wireshark)
239 def test_wireshark_capture_snapshot_len(self):
240 '''Capture truncated packets using Wireshark'''
241 check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
243 class case_tshark_capture(subprocesstest.SubprocessTestCase):
244 def test_tshark_capture_10_packets_to_file(self):
245 '''Capture 10 packets from the network to a file using TShark'''
246 check_capture_10_packets(self, cmd=config.cmd_tshark)
248 def test_tshark_capture_10_packets_to_stdout(self):
249 '''Capture 10 packets from the network to stdout using TShark'''
250 check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
252 def test_tshark_capture_from_fifo(self):
253 '''Capture from a fifo using TShark'''
254 check_capture_fifo(self, cmd=config.cmd_tshark)
256 def test_tshark_capture_from_stdin(self):
257 '''Capture from stdin using TShark'''
258 check_capture_stdin(self, cmd=config.cmd_tshark)
260 def test_tshark_capture_snapshot_len(self):
261 '''Capture truncated packets using TShark'''
262 check_capture_snapshot_len(self, cmd=config.cmd_tshark)
264 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
265 def test_dumpcap_capture_10_packets_to_file(self):
266 '''Capture 10 packets from the network to a file using Dumpcap'''
267 check_capture_10_packets(self, cmd=config.cmd_dumpcap)
269 def test_dumpcap_capture_10_packets_to_stdout(self):
270 '''Capture 10 packets from the network to stdout using Dumpcap'''
271 check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
273 def test_dumpcap_capture_from_fifo(self):
274 '''Capture from a fifo using Dumpcap'''
275 check_capture_fifo(self, cmd=config.cmd_dumpcap)
277 def test_dumpcap_capture_from_stdin(self):
278 '''Capture from stdin using Dumpcap'''
279 check_capture_stdin(self, cmd=config.cmd_dumpcap)
281 def test_dumpcap_capture_snapshot_len(self):
282 '''Capture truncated packets using Dumpcap'''
283 check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)