Test: More fixes and updates.
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import config
13 import difflib
14 import io
15 import os
16 import os.path
17 import re
18 import subprocess
19 import sys
20 import unittest
21
22 # To do:
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
25
26 # XXX This should probably be in config.py and settable from
27 # the command line.
28 if sys.version_info[0] >= 3:
29     process_timeout = 300 # Seconds
30
31 def capture_command(cmd, *args, **kwargs):
32     '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
33
34     If shell is true, return a string. Otherwise, return a list of arguments.'''
35     shell = kwargs.pop('shell', False)
36     if shell:
37         cap_cmd = ['"' + cmd + '"']
38     else:
39         cap_cmd = [cmd]
40     if cmd == config.cmd_wireshark:
41         cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
42     cap_cmd += args
43     if shell:
44         return ' '.join(cap_cmd)
45     else:
46         return cap_cmd
47
48 def cat_dhcp_command(mode):
49     '''Create a command string for dumping dhcp.pcap to stdout'''
50     # XXX Do this in Python in a thread?
51     sd_cmd = ''
52     if sys.executable:
53         sd_cmd = '"{}" '.format(sys.executable)
54     sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
55     return sd_cmd
56
57 class LoggingPopen(subprocess.Popen):
58     '''Run a process using subprocess.Popen. Capture and log its output.
59
60     Stdout and stderr are captured to memory and decoded as UTF-8. The
61     program command and output is written to log_fd.
62     '''
63     def __init__(self, proc_args, *args, **kwargs):
64         self.log_fd = kwargs.pop('log_fd', None)
65         kwargs['stdout'] = subprocess.PIPE
66         kwargs['stderr'] = subprocess.PIPE
67         # Make sure communicate() gives us bytes.
68         kwargs['universal_newlines'] = False
69         self.cmd_str = 'command ' + repr(proc_args)
70         super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
71         self.stdout_str = ''
72         self.stderr_str = ''
73
74     def wait_and_log(self):
75         '''Wait for the process to finish and log its output.'''
76         # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
77         if sys.version_info[0] >= 3:
78             out_data, err_data = self.communicate(timeout=process_timeout)
79             out_log = out_data.decode('UTF-8', 'replace')
80             err_log = err_data.decode('UTF-8', 'replace')
81         else:
82             out_data, err_data = self.communicate()
83             out_log = unicode(out_data, 'UTF-8', 'replace')
84             err_log = unicode(err_data, 'UTF-8', 'replace')
85         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
86         self.stdout_str = out_data.decode('UTF-8', 'strict')
87         self.stderr_str = err_data.decode('UTF-8', 'strict')
88         self.log_fd.flush()
89         self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
90         self.log_fd.write(out_log)
91         self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
92         self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
93         self.log_fd.write(err_log)
94         self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
95         self.log_fd.flush()
96
97     def stop_process(self, kill=False):
98         '''Stop the process immediately.'''
99         if kill:
100             super(LoggingPopen, self).kill()
101         else:
102             super(LoggingPopen, self).terminate()
103
104     def terminate(self):
105         '''Terminate the process. Do not log its output.'''
106         # XXX Currently unused.
107         self.stop_process(kill=False)
108
109     def kill(self):
110         '''Kill the process. Do not log its output.'''
111         self.stop_process(kill=True)
112
113 class SubprocessTestCase(unittest.TestCase):
114     '''Run a program and gather its stdout and stderr.'''
115
116     def __init__(self, *args, **kwargs):
117         super(SubprocessTestCase, self).__init__(*args, **kwargs)
118         self.exit_ok = 0
119         self.exit_command_line = 1
120         self.exit_error = 2
121         self.exit_code = None
122         self.log_fname = None
123         self.log_fd = None
124         self.processes = []
125         self.cleanup_files = []
126         self.dump_files = []
127
128     def log_fd_write_bytes(self, log_data):
129         if sys.version_info[0] >= 3:
130             self.log_fd.write(log_data)
131         else:
132             self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
133
134     def filename_from_id(self, filename):
135         '''Generate a filename prefixed with our test ID.'''
136         id_filename = self.id() + '.' + filename
137         if id_filename not in self.cleanup_files:
138             self.cleanup_files.append(id_filename)
139         return id_filename
140
141     def kill_processes(self):
142         '''Kill any processes we've opened so far'''
143         for proc in self.processes:
144             try:
145                 proc.kill()
146             except:
147                 pass
148
149     def run(self, result=None):
150         # Subclass run() so that we can do the following:
151         # - Open our log file and add it to the cleanup list.
152         # - Check our result before and after the run so that we can tell
153         #   if the current test was successful.
154
155         # Probably not needed, but shouldn't hurt.
156         self.kill_processes()
157         self.processes = []
158         self.log_fname = self.filename_from_id('log')
159         # Our command line utilities generate UTF-8. The log file endcoding
160         # needs to match that.
161         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8')
162         self.cleanup_files.append(self.log_fname)
163         pre_run_problem_count = 0
164         if result:
165             pre_run_problem_count = len(result.failures) + len(result.errors)
166         try:
167             super(SubprocessTestCase, self).run(result=result)
168         except KeyboardInterrupt:
169             # XXX This doesn't seem to work on Windows, which is where we need it the most.
170             self.kill_processes()
171
172         # Tear down our test. We don't do this in tearDown() because Python 3
173         # updates "result" after calling tearDown().
174         self.kill_processes()
175         self.log_fd.close()
176         if result:
177             post_run_problem_count = len(result.failures) + len(result.errors)
178             if pre_run_problem_count != post_run_problem_count:
179                 self.dump_files.append(self.log_fname)
180                 # Leave some evidence behind.
181                 self.cleanup_files = []
182                 print('\nProcess output for {}:'.format(self.id()))
183                 with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
184                     for line in log_fd:
185                         sys.stdout.write(line)
186         for filename in self.cleanup_files:
187             try:
188                 os.unlink(filename)
189             except OSError:
190                 pass
191         self.cleanup_files = []
192
193     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
194         '''Run capinfos on a capture file and log its output.
195
196         capinfos_args must be a sequence.
197         Default cap_file is <test id>.testout.pcap.'''
198         if not cap_file:
199             cap_file = self.filename_from_id('testout.pcap')
200         self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
201         capinfos_cmd = [config.cmd_capinfos]
202         if capinfos_args is not None:
203             capinfos_cmd += capinfos_args
204         capinfos_cmd.append(cap_file)
205         capinfos_stdout = str(subprocess.check_output(capinfos_cmd))
206         self.log_fd_write_bytes(capinfos_stdout)
207         return capinfos_stdout
208
209     def checkPacketCount(self, num_packets, cap_file=None):
210         '''Make sure a capture file contains a specific number of packets.'''
211         got_num_packets = False
212         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
213         count_pat = 'Number of packets:\s+{}'.format(num_packets)
214         if re.search(count_pat, capinfos_testout):
215             got_num_packets = True
216         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
217
218     def countOutput(self, search_pat, proc=None):
219         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
220         match_count = 0
221         if proc is None:
222             proc = self.processes[-1]
223         # We might want to let the caller decide what we're searching.
224         out_data = proc.stdout_str + proc.stderr_str
225         search_re = re.compile(search_pat)
226         for line in out_data.splitlines():
227             if search_re.search(line):
228                 match_count += 1
229         return match_count
230
231     def grepOutput(self, search_pat, proc=None):
232         return self.countOutput(search_pat, proc) > 0
233
234     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
235         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
236
237         blob_a and blob_b must be UTF-8 strings.'''
238         lines_a = blob_a.splitlines()
239         lines_b = blob_b.splitlines()
240         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
241         if len(diff) > 0:
242             if sys.version_info[0] < 3 and not isinstance(diff, unicode):
243                 diff = unicode(diff, 'UTF-8', 'replace')
244             self.log_fd.flush()
245             self.log_fd.write(u'-- Begin diff output --\n')
246             self.log_fd.writelines(diff)
247             self.log_fd.write(u'-- End diff output --\n')
248             return False
249         return True
250
251     def startProcess(self, proc_args, env=None, shell=False):
252         '''Start a process in the background. Returns a subprocess.Popen object.
253
254         You typically wait for it using waitProcess() or assertWaitProcess().'''
255         if env is None:
256             # Avoid using the test user's real environment by default.
257             env = config.test_env
258         proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
259         self.processes.append(proc)
260         return proc
261
262     def waitProcess(self, process):
263         '''Wait for a process to finish.'''
264         process.wait_and_log()
265         # XXX The shell version ran processes using a script called run_and_catch_crashes
266         # which looked for core dumps and printed stack traces if found. We might want
267         # to do something similar here. This may not be easy on modern Ubuntu systems,
268         # which default to using Apport: https://wiki.ubuntu.com/Apport
269
270     def assertWaitProcess(self, process, expected_return=0):
271         '''Wait for a process to finish and check its exit code.'''
272         process.wait_and_log()
273         self.assertEqual(process.returncode, expected_return)
274
275     def runProcess(self, args, env=None, shell=False):
276         '''Start a process and wait for it to finish.'''
277         process = self.startProcess(args, env=env, shell=shell)
278         process.wait_and_log()
279         return process
280
281     def assertRun(self, args, env=None, shell=False, expected_return=0):
282         '''Start a process and wait for it to finish. Check its return code.'''
283         process = self.runProcess(args, env=env, shell=shell)
284         self.assertEqual(process.returncode, expected_return)
285         return process