WireGuard: implement responder handshake decryption
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import config
13 import difflib
14 import io
15 import os
16 import os.path
17 import re
18 import subprocess
19 import sys
20 import unittest
21
22 # To do:
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
25
26 # XXX This should probably be in config.py and settable from
27 # the command line.
28 if sys.version_info[0] >= 3:
29     process_timeout = 300 # Seconds
30
31 def capture_command(cmd, *args, **kwargs):
32     '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
33
34     If shell is true, return a string. Otherwise, return a list of arguments.'''
35     shell = kwargs.pop('shell', False)
36     if shell:
37         cap_cmd = ['"' + cmd + '"']
38     else:
39         cap_cmd = [cmd]
40     if cmd == config.cmd_wireshark:
41         cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
42     cap_cmd += args
43     if shell:
44         return ' '.join(cap_cmd)
45     else:
46         return cap_cmd
47
48 def cat_dhcp_command(mode):
49     '''Create a command string for dumping dhcp.pcap to stdout'''
50     # XXX Do this in Python in a thread?
51     sd_cmd = ''
52     if sys.executable:
53         sd_cmd = '"{}" '.format(sys.executable)
54     sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
55     return sd_cmd
56
57 class LoggingPopen(subprocess.Popen):
58     '''Run a process using subprocess.Popen. Capture and log its output.
59
60     Stdout and stderr are captured to memory and decoded as UTF-8. The
61     program command and output is written to log_fd.
62     '''
63     def __init__(self, proc_args, *args, **kwargs):
64         self.log_fd = kwargs.pop('log_fd', None)
65         kwargs['stdout'] = subprocess.PIPE
66         kwargs['stderr'] = subprocess.PIPE
67         # Make sure communicate() gives us bytes.
68         kwargs['universal_newlines'] = False
69         self.cmd_str = 'command ' + repr(proc_args)
70         super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
71         self.stdout_str = ''
72         self.stderr_str = ''
73
74     def wait_and_log(self):
75         '''Wait for the process to finish and log its output.'''
76         # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
77         if sys.version_info[0] >= 3:
78             out_data, err_data = self.communicate(timeout=process_timeout)
79             out_log = out_data.decode('UTF-8', 'replace')
80             err_log = err_data.decode('UTF-8', 'replace')
81         else:
82             out_data, err_data = self.communicate()
83             out_log = unicode(out_data, 'UTF-8', 'replace')
84             err_log = unicode(err_data, 'UTF-8', 'replace')
85         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
86         self.stdout_str = out_data.decode('UTF-8', 'strict')
87         self.stderr_str = err_data.decode('UTF-8', 'strict')
88         self.log_fd.flush()
89         self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
90         self.log_fd.write(out_log)
91         self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
92         self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
93         self.log_fd.write(err_log)
94         self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
95         self.log_fd.flush()
96
97     def stop_process(self, kill=False):
98         '''Stop the process immediately.'''
99         if kill:
100             super(LoggingPopen, self).kill()
101         else:
102             super(LoggingPopen, self).terminate()
103
104     def terminate(self):
105         '''Terminate the process. Do not log its output.'''
106         # XXX Currently unused.
107         self.stop_process(kill=False)
108
109     def kill(self):
110         '''Kill the process. Do not log its output.'''
111         self.stop_process(kill=True)
112
113 class SubprocessTestCase(unittest.TestCase):
114     '''Run a program and gather its stdout and stderr.'''
115
116     def __init__(self, *args, **kwargs):
117         super(SubprocessTestCase, self).__init__(*args, **kwargs)
118         self.exit_ok = 0
119         self.exit_command_line = 1
120         self.exit_error = 2
121         self.exit_code = None
122         self.log_fname = None
123         self.log_fd = None
124         self.processes = []
125         self.cleanup_files = []
126         self.dump_files = []
127
128     def log_fd_write_bytes(self, log_data):
129         if sys.version_info[0] >= 3:
130             self.log_fd.write(log_data)
131         else:
132             self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
133
134     def filename_from_id(self, filename):
135         '''Generate a filename prefixed with our test ID.'''
136         id_filename = self.id() + '.' + filename
137         if id_filename not in self.cleanup_files:
138             self.cleanup_files.append(id_filename)
139         return id_filename
140
141     def kill_processes(self):
142         '''Kill any processes we've opened so far'''
143         for proc in self.processes:
144             try:
145                 proc.kill()
146             except:
147                 pass
148
149     def _error_count(self, result):
150         if not result:
151             return 0
152         if hasattr(result, 'failures'):
153             # Python standard unittest runner
154             return len(result.failures) + len(result.errors)
155         if hasattr(result, '_excinfo'):
156             # pytest test runner
157             return len(result._excinfo or [])
158         self.fail("Unexpected test result %r" % result)
159
160     def run(self, result=None):
161         # Subclass run() so that we can do the following:
162         # - Open our log file and add it to the cleanup list.
163         # - Check our result before and after the run so that we can tell
164         #   if the current test was successful.
165
166         # Probably not needed, but shouldn't hurt.
167         self.kill_processes()
168         self.processes = []
169         self.log_fname = self.filename_from_id('log')
170         # Our command line utilities generate UTF-8. The log file endcoding
171         # needs to match that.
172         # XXX newline='\n' works for now, but we might have to do more work
173         # to handle line endings in the future.
174         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
175         self.cleanup_files.append(self.log_fname)
176         pre_run_problem_count = self._error_count(result)
177         try:
178             super(SubprocessTestCase, self).run(result=result)
179         except KeyboardInterrupt:
180             # XXX This doesn't seem to work on Windows, which is where we need it the most.
181             self.kill_processes()
182
183         # Tear down our test. We don't do this in tearDown() because Python 3
184         # updates "result" after calling tearDown().
185         self.kill_processes()
186         self.log_fd.close()
187         if result:
188             post_run_problem_count = self._error_count(result)
189             if pre_run_problem_count != post_run_problem_count:
190                 self.dump_files.append(self.log_fname)
191                 # Leave some evidence behind.
192                 self.cleanup_files = []
193                 print('\nProcess output for {}:'.format(self.id()))
194                 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
195                     for line in log_fd:
196                         sys.stdout.write(line)
197         for filename in self.cleanup_files:
198             try:
199                 os.unlink(filename)
200             except OSError:
201                 pass
202         self.cleanup_files = []
203
204     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
205         '''Run capinfos on a capture file and log its output.
206
207         capinfos_args must be a sequence.
208         Default cap_file is <test id>.testout.pcap.'''
209         if not cap_file:
210             cap_file = self.filename_from_id('testout.pcap')
211         self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
212         capinfos_cmd = [config.cmd_capinfos]
213         if capinfos_args is not None:
214             capinfos_cmd += capinfos_args
215         capinfos_cmd.append(cap_file)
216         capinfos_data = subprocess.check_output(capinfos_cmd)
217         if sys.version_info[0] >= 3:
218             capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
219         else:
220             capinfos_stdout = unicode(capinfos_data, 'UTF-8', 'replace')
221         self.log_fd.write(capinfos_stdout)
222         return capinfos_stdout
223
224     def checkPacketCount(self, num_packets, cap_file=None):
225         '''Make sure a capture file contains a specific number of packets.'''
226         got_num_packets = False
227         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
228         count_pat = 'Number of packets:\s+{}'.format(num_packets)
229         if re.search(count_pat, capinfos_testout):
230             got_num_packets = True
231         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
232
233     def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
234         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
235         match_count = 0
236         self.assertTrue(count_stdout or count_stderr, 'No output to count.')
237
238         if proc is None:
239             proc = self.processes[-1]
240
241         out_data = u''
242         if count_stdout:
243             out_data = proc.stdout_str
244         if count_stderr:
245             out_data += proc.stderr_str
246
247         if search_pat is None:
248             return len(out_data.splitlines())
249
250         search_re = re.compile(search_pat)
251         for line in out_data.splitlines():
252             if search_re.search(line):
253                 match_count += 1
254
255         return match_count
256
257     def grepOutput(self, search_pat, proc=None):
258         return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
259
260     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
261         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
262
263         blob_a and blob_b must be UTF-8 strings.'''
264         lines_a = blob_a.splitlines()
265         lines_b = blob_b.splitlines()
266         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
267         if len(diff) > 0:
268             if sys.version_info[0] < 3 and not isinstance(diff, unicode):
269                 diff = unicode(diff, 'UTF-8', 'replace')
270             self.log_fd.flush()
271             self.log_fd.write(u'-- Begin diff output --\n')
272             self.log_fd.writelines(diff)
273             self.log_fd.write(u'-- End diff output --\n')
274             return False
275         return True
276
277     def startProcess(self, proc_args, stdin=None, env=None, shell=False):
278         '''Start a process in the background. Returns a subprocess.Popen object.
279
280         You typically wait for it using waitProcess() or assertWaitProcess().'''
281         if env is None:
282             # Avoid using the test user's real environment by default.
283             env = config.test_env
284         proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
285         self.processes.append(proc)
286         return proc
287
288     def waitProcess(self, process):
289         '''Wait for a process to finish.'''
290         process.wait_and_log()
291         # XXX The shell version ran processes using a script called run_and_catch_crashes
292         # which looked for core dumps and printed stack traces if found. We might want
293         # to do something similar here. This may not be easy on modern Ubuntu systems,
294         # which default to using Apport: https://wiki.ubuntu.com/Apport
295
296     def assertWaitProcess(self, process, expected_return=0):
297         '''Wait for a process to finish and check its exit code.'''
298         process.wait_and_log()
299         self.assertEqual(process.returncode, expected_return)
300
301     def runProcess(self, args, env=None, shell=False):
302         '''Start a process and wait for it to finish.'''
303         process = self.startProcess(args, env=env, shell=shell)
304         process.wait_and_log()
305         return process
306
307     def assertRun(self, args, env=None, shell=False, expected_return=0):
308         '''Start a process and wait for it to finish. Check its return code.'''
309         process = self.runProcess(args, env=env, shell=shell)
310         self.assertEqual(process.returncode, expected_return)
311         return process