Test: Be more paranoid about our log output.
[metze/wireshark/wip.git] / test / suite_capture.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Capture tests'''
11
12 import config
13 import os
14 import re
15 import subprocess
16 import subprocesstest
17 import sys
18 import time
19 import unittest
20
21 capture_duration = 5
22
23 testout_pcap = 'testout.pcap'
24 snapshot_len = 96
25 capture_env = os.environ.copy()
26 capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
27
28 def start_pinging(self):
29     ping_procs = []
30     if sys.platform.startswith('win32'):
31         # Fake '-i' with a subsecond interval.
32         for st in (0.1, 0.1, 0):
33             ping_procs.append(self.startProcess(config.args_ping))
34             time.sleep(st)
35     else:
36         ping_procs.append(self.startProcess(config.args_ping))
37     return ping_procs
38
39 def stop_pinging(ping_procs):
40     for proc in ping_procs:
41         proc.kill()
42
43 def check_capture_10_packets(self, cmd=None, to_stdout=False):
44     # Similar to suite_io.check_io_4_packets.
45     if not config.canCapture():
46         self.skipTest('Test requires capture privileges and an interface.')
47     if cmd == config.cmd_wireshark and not config.canDisplay():
48         self.skipTest('Test requires a display.')
49     if not config.args_ping:
50         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
51     self.assertIsNotNone(cmd)
52     testout_file = self.filename_from_id(testout_pcap)
53     ping_procs = start_pinging(self)
54     if to_stdout:
55         capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
56             '-i', '"{}"'.format(config.capture_interface),
57             '-p',
58             '-w', '-',
59             '-c', '10',
60             '-a', 'duration:{}'.format(capture_duration),
61             '-f', '"icmp || icmp6"',
62             '>', testout_file,
63             shell=True
64         ),
65         env=capture_env,
66         shell=True
67         )
68     else:
69         capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
70             '-i', config.capture_interface,
71             '-p',
72             '-w', testout_file,
73             '-c', '10',
74             '-a', 'duration:{}'.format(capture_duration),
75             '-f', 'icmp || icmp6',
76         ),
77         env=capture_env
78         )
79     capture_returncode = capture_proc.returncode
80     stop_pinging(ping_procs)
81     if capture_returncode != 0:
82         self.log_fd.write('{} -D output:\n'.format(cmd))
83         self.runProcess((cmd, '-D'))
84     self.assertEqual(capture_returncode, 0)
85     if (capture_returncode == 0):
86         self.checkPacketCount(10)
87
88 def check_capture_fifo(self, cmd=None):
89     if not config.canMkfifo():
90         self.skipTest('Test requires OS fifo support.')
91     if cmd == config.cmd_wireshark and not config.canDisplay():
92         self.skipTest('Test requires a display.')
93     self.assertIsNotNone(cmd)
94     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
95     testout_file = self.filename_from_id(testout_pcap)
96     fifo_file = self.filename_from_id('testout.fifo')
97     try:
98         # If a previous test left its fifo laying around, e.g. from a failure, remove it.
99         os.unlink(fifo_file)
100     except:
101         pass
102     os.mkfifo(fifo_file)
103     slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
104     fifo_proc = self.startProcess(
105         ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
106         shell=True)
107     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
108         '-i', fifo_file,
109         '-p',
110         '-w', testout_file,
111         '-a', 'duration:{}'.format(capture_duration),
112     ),
113     env=capture_env
114     )
115     fifo_proc.kill()
116     self.assertTrue(os.path.isfile(testout_file))
117     capture_returncode = capture_proc.returncode
118     self.assertEqual(capture_returncode, 0)
119     if (capture_returncode == 0):
120         self.checkPacketCount(8)
121
122 def check_capture_stdin(self, cmd=None):
123     # Similar to suite_io.check_io_4_packets.
124     if cmd == config.cmd_wireshark and not config.canDisplay():
125         self.skipTest('Test requires a display.')
126     self.assertIsNotNone(cmd)
127     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
128     testout_file = self.filename_from_id(testout_pcap)
129     slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
130     capture_cmd = subprocesstest.capture_command(cmd,
131         '-i', '-',
132         '-w', testout_file,
133         '-a', 'duration:{}'.format(capture_duration),
134         shell=True
135     )
136     if cmd == config.cmd_wireshark:
137         capture_cmd += ' -o console.log.level:127'
138     pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
139     pipe_returncode = pipe_proc.returncode
140     self.assertEqual(pipe_returncode, 0)
141     if cmd == config.cmd_wireshark:
142         self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
143         self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
144         self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
145     self.assertTrue(os.path.isfile(testout_file))
146     if (pipe_returncode == 0):
147         self.checkPacketCount(8)
148
149 def check_capture_2multi_10packets(self, cmd=None):
150     # This was present in the Bash version but was incorrect and not part of any suite.
151     # It's apparently intended to test file rotation.
152     self.skipTest('Not yet implemented')
153
154 def check_capture_read_filter(self, cmd=None):
155     if not config.canCapture():
156         self.skipTest('Test requires capture privileges and an interface.')
157     if cmd == config.cmd_wireshark and not config.canDisplay():
158         self.skipTest('Test requires a display.')
159     if not config.args_ping:
160         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
161     self.assertIsNotNone(cmd)
162     ping_procs = start_pinging(self)
163     testout_file = self.filename_from_id(testout_pcap)
164     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
165         '-i', config.capture_interface,
166         '-p',
167         '-w', testout_file,
168         '-2',
169         '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
170         '-c', '10',
171         '-a', 'duration:{}'.format(capture_duration),
172         '-f', 'icmp || icmp6',
173     ),
174     env=capture_env
175     )
176     capture_returncode = capture_proc.returncode
177     stop_pinging(ping_procs)
178     self.assertEqual(capture_returncode, 0)
179
180     if (capture_returncode == 0):
181         self.checkPacketCount(0)
182
183 def check_capture_snapshot_len(self, cmd=None):
184     if not config.canCapture():
185         self.skipTest('Test requires capture privileges and an interface.')
186     if cmd == config.cmd_wireshark and not config.canDisplay():
187         self.skipTest('Test requires a display.')
188     if not config.args_ping:
189         self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
190     self.assertIsNotNone(cmd)
191     ping_procs = start_pinging(self)
192     testout_file = self.filename_from_id(testout_pcap)
193     capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
194         '-i', config.capture_interface,
195         '-p',
196         '-w', testout_file,
197         '-s', str(snapshot_len),
198         '-a', 'duration:{}'.format(capture_duration),
199         '-f', 'icmp || icmp6',
200     ),
201     env=capture_env
202     )
203     capture_returncode = capture_proc.returncode
204     stop_pinging(ping_procs)
205     self.assertEqual(capture_returncode, 0)
206     self.assertTrue(os.path.isfile(testout_file))
207
208     # Use tshark to filter out all packets larger than 68 bytes.
209     testout2_file = self.filename_from_id('testout2.pcap')
210
211     filter_proc = self.runProcess((config.cmd_tshark,
212         '-r', testout_file,
213         '-w', testout2_file,
214         '-Y', 'frame.cap_len>{}'.format(snapshot_len),
215     ))
216     filter_returncode = filter_proc.returncode
217     self.assertEqual(capture_returncode, 0)
218     if (capture_returncode == 0):
219         self.checkPacketCount(0, cap_file=testout2_file)
220
221 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
222     def test_wireshark_capture_10_packets_to_file(self):
223         '''Capture 10 packets from the network to a file using Wireshark'''
224         check_capture_10_packets(self, cmd=config.cmd_wireshark)
225
226     # Wireshark doesn't currently support writing to stdout while capturing.
227     # def test_wireshark_capture_10_packets_to_stdout(self):
228     #     '''Capture 10 packets from the network to stdout using Wireshark'''
229     #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
230
231     def test_wireshark_capture_from_fifo(self):
232         '''Capture from a fifo using Wireshark'''
233         check_capture_fifo(self, cmd=config.cmd_wireshark)
234
235     def test_wireshark_capture_from_stdin(self):
236         '''Capture from stdin using Wireshark'''
237         check_capture_stdin(self, cmd=config.cmd_wireshark)
238
239     def test_wireshark_capture_snapshot_len(self):
240         '''Capture truncated packets using Wireshark'''
241         check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
242
243 class case_tshark_capture(subprocesstest.SubprocessTestCase):
244     def test_tshark_capture_10_packets_to_file(self):
245         '''Capture 10 packets from the network to a file using TShark'''
246         check_capture_10_packets(self, cmd=config.cmd_tshark)
247
248     def test_tshark_capture_10_packets_to_stdout(self):
249         '''Capture 10 packets from the network to stdout using TShark'''
250         check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
251
252     def test_tshark_capture_from_fifo(self):
253         '''Capture from a fifo using TShark'''
254         check_capture_fifo(self, cmd=config.cmd_tshark)
255
256     def test_tshark_capture_from_stdin(self):
257         '''Capture from stdin using TShark'''
258         check_capture_stdin(self, cmd=config.cmd_tshark)
259
260     def test_tshark_capture_snapshot_len(self):
261         '''Capture truncated packets using TShark'''
262         check_capture_snapshot_len(self, cmd=config.cmd_tshark)
263
264 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
265     def test_dumpcap_capture_10_packets_to_file(self):
266         '''Capture 10 packets from the network to a file using Dumpcap'''
267         check_capture_10_packets(self, cmd=config.cmd_dumpcap)
268
269     def test_dumpcap_capture_10_packets_to_stdout(self):
270         '''Capture 10 packets from the network to stdout using Dumpcap'''
271         check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
272
273     def test_dumpcap_capture_from_fifo(self):
274         '''Capture from a fifo using Dumpcap'''
275         check_capture_fifo(self, cmd=config.cmd_dumpcap)
276
277     def test_dumpcap_capture_from_stdin(self):
278         '''Capture from stdin using Dumpcap'''
279         check_capture_stdin(self, cmd=config.cmd_dumpcap)
280
281     def test_dumpcap_capture_snapshot_len(self):
282         '''Capture truncated packets using Dumpcap'''
283         check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)