test: add suite_outputformats for json output regression testing.
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import difflib
13 import io
14 import os
15 import os.path
16 import re
17 import subprocess
18 import sys
19 import unittest
20
21 # To do:
22 # - Add a subprocesstest.SkipUnlessCapture decorator?
23 # - Try to catch crashes? See the comments below in waitProcess.
24
25 process_timeout = 300 # Seconds
26
27 def cat_dhcp_command(mode):
28     '''Create a command string for dumping dhcp.pcap to stdout'''
29     # XXX Do this in Python in a thread?
30     sd_cmd = ''
31     if sys.executable:
32         sd_cmd = '"{}" '.format(sys.executable)
33     this_dir = os.path.dirname(__file__)
34     sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
35     return sd_cmd
36
37 def cat_cap_file_command(cap_files):
38     '''Create a command string for dumping one or more capture files to stdout'''
39     # XXX Do this in Python in a thread?
40     if isinstance(cap_files, str):
41         cap_files = [ cap_files ]
42     quoted_paths = ' '.join('"{}"'.format(cap_file) for cap_file in cap_files)
43     if sys.platform.startswith('win32'):
44         # https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb491026(v=technet.10)
45         # says that the `type` command "displays the contents of a text
46         # file." Copy to the console instead.
47         return 'copy {} CON'.format(quoted_paths)
48     return 'cat {}'.format(quoted_paths)
49
50 class LoggingPopen(subprocess.Popen):
51     '''Run a process using subprocess.Popen. Capture and log its output.
52
53     Stdout and stderr are captured to memory and decoded as UTF-8. The
54     program command and output is written to log_fd.
55     '''
56     def __init__(self, proc_args, *args, **kwargs):
57         self.log_fd = kwargs.pop('log_fd', None)
58         kwargs['stdout'] = subprocess.PIPE
59         kwargs['stderr'] = subprocess.PIPE
60         # Make sure communicate() gives us bytes.
61         kwargs['universal_newlines'] = False
62         self.cmd_str = 'command ' + repr(proc_args)
63         super().__init__(proc_args, *args, **kwargs)
64         self.stdout_str = ''
65         self.stderr_str = ''
66
67     def wait_and_log(self):
68         '''Wait for the process to finish and log its output.'''
69         out_data, err_data = self.communicate(timeout=process_timeout)
70         out_log = out_data.decode('UTF-8', 'replace')
71         err_log = err_data.decode('UTF-8', 'replace')
72         self.log_fd.flush()
73         self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
74         self.log_fd.write(out_log)
75         self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
76         self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
77         self.log_fd.write(err_log)
78         self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
79         self.log_fd.flush()
80         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
81         self.stdout_str = out_data.decode('UTF-8', 'strict')
82         self.stderr_str = err_data.decode('UTF-8', 'strict')
83
84     def stop_process(self, kill=False):
85         '''Stop the process immediately.'''
86         if kill:
87             super().kill()
88         else:
89             super().terminate()
90
91     def terminate(self):
92         '''Terminate the process. Do not log its output.'''
93         # XXX Currently unused.
94         self.stop_process(kill=False)
95
96     def kill(self):
97         '''Kill the process. Do not log its output.'''
98         self.stop_process(kill=True)
99
100 class SubprocessTestCase(unittest.TestCase):
101     '''Run a program and gather its stdout and stderr.'''
102
103     def __init__(self, *args, **kwargs):
104         super().__init__(*args, **kwargs)
105         self.exit_ok = 0
106         self.exit_command_line = 1
107         self.exit_error = 2
108         self.exit_code = None
109         self.log_fname = None
110         self.log_fd = None
111         self.processes = []
112         self.cleanup_files = []
113         self.dump_files = []
114
115     def log_fd_write_bytes(self, log_data):
116         self.log_fd.write(log_data)
117
118     def filename_from_id(self, filename):
119         '''Generate a filename prefixed with our test ID.'''
120         id_filename = self.id() + '.' + filename
121         if id_filename not in self.cleanup_files:
122             self.cleanup_files.append(id_filename)
123         return id_filename
124
125     def kill_processes(self):
126         '''Kill any processes we've opened so far'''
127         for proc in self.processes:
128             try:
129                 proc.kill()
130             except:
131                 pass
132
133     def _error_count(self, result):
134         if not result:
135             return 0
136         if hasattr(result, 'failures'):
137             # Python standard unittest runner
138             return len(result.failures) + len(result.errors)
139         if hasattr(result, '_excinfo'):
140             # pytest test runner
141             return len(result._excinfo or [])
142         self.fail("Unexpected test result %r" % result)
143
144     def run(self, result=None):
145         # Subclass run() so that we can do the following:
146         # - Open our log file and add it to the cleanup list.
147         # - Check our result before and after the run so that we can tell
148         #   if the current test was successful.
149
150         # Probably not needed, but shouldn't hurt.
151         self.kill_processes()
152         self.processes = []
153         self.log_fname = self.filename_from_id('log')
154         # Our command line utilities generate UTF-8. The log file endcoding
155         # needs to match that.
156         # XXX newline='\n' works for now, but we might have to do more work
157         # to handle line endings in the future.
158         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
159         self.cleanup_files.append(self.log_fname)
160         pre_run_problem_count = self._error_count(result)
161         try:
162             super().run(result=result)
163         except KeyboardInterrupt:
164             # XXX This doesn't seem to work on Windows, which is where we need it the most.
165             self.kill_processes()
166
167         # Tear down our test. We don't do this in tearDown() because Python 3
168         # updates "result" after calling tearDown().
169         self.kill_processes()
170         self.log_fd.close()
171         if result:
172             post_run_problem_count = self._error_count(result)
173             if pre_run_problem_count != post_run_problem_count:
174                 self.dump_files.append(self.log_fname)
175                 # Leave some evidence behind.
176                 self.cleanup_files = []
177                 print('\nProcess output for {}:'.format(self.id()))
178                 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
179                     for line in log_fd:
180                         sys.stdout.write(line)
181         for filename in self.cleanup_files:
182             try:
183                 os.unlink(filename)
184             except OSError:
185                 pass
186         self.cleanup_files = []
187
188     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
189         '''Run capinfos on a capture file and log its output.
190
191         capinfos_args must be a sequence.
192         Default cap_file is <test id>.testout.pcap.'''
193         # XXX convert users to use a new fixture instead of this function.
194         cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
195         if not cap_file:
196             cap_file = self.filename_from_id('testout.pcap')
197         self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
198         capinfos_cmd = [cmd_capinfos]
199         if capinfos_args is not None:
200             capinfos_cmd += capinfos_args
201         capinfos_cmd.append(cap_file)
202         capinfos_data = subprocess.check_output(capinfos_cmd)
203         capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
204         self.log_fd.write(capinfos_stdout)
205         return capinfos_stdout
206
207     def checkPacketCount(self, num_packets, cap_file=None):
208         '''Make sure a capture file contains a specific number of packets.'''
209         got_num_packets = False
210         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
211         count_pat = r'Number of packets:\s+{}'.format(num_packets)
212         if re.search(count_pat, capinfos_testout):
213             got_num_packets = True
214         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
215
216     def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
217         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
218         match_count = 0
219         self.assertTrue(count_stdout or count_stderr, 'No output to count.')
220
221         if proc is None:
222             proc = self.processes[-1]
223
224         out_data = ''
225         if count_stdout:
226             out_data = proc.stdout_str
227         if count_stderr:
228             out_data += proc.stderr_str
229
230         if search_pat is None:
231             return len(out_data.splitlines())
232
233         search_re = re.compile(search_pat)
234         for line in out_data.splitlines():
235             if search_re.search(line):
236                 match_count += 1
237
238         return match_count
239
240     def grepOutput(self, search_pat, proc=None):
241         return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
242
243     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
244         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
245
246         blob_a and blob_b must be UTF-8 strings.'''
247         lines_a = blob_a.splitlines()
248         lines_b = blob_b.splitlines()
249         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
250         if len(diff) > 0:
251             self.log_fd.flush()
252             self.log_fd.write('-- Begin diff output --\n')
253             self.log_fd.writelines(diff)
254             self.log_fd.write('-- End diff output --\n')
255             return False
256         return True
257
258     def startProcess(self, proc_args, stdin=None, env=None, shell=False):
259         '''Start a process in the background. Returns a subprocess.Popen object.
260
261         You typically wait for it using waitProcess() or assertWaitProcess().'''
262         if env is None:
263             # Apply default test environment if no override is provided.
264             env = getattr(self, 'injected_test_env', None)
265             # Not all tests need test_env, but those that use runProcess or
266             # startProcess must either pass an explicit environment or load the
267             # fixture (via a test method parameter or class decorator).
268             assert not (env is None and hasattr(self, '_fixture_request')), \
269                 "Decorate class with @fixtures.mark_usefixtures('test_env')"
270         proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
271         self.processes.append(proc)
272         return proc
273
274     def waitProcess(self, process):
275         '''Wait for a process to finish.'''
276         process.wait_and_log()
277         # XXX The shell version ran processes using a script called run_and_catch_crashes
278         # which looked for core dumps and printed stack traces if found. We might want
279         # to do something similar here. This may not be easy on modern Ubuntu systems,
280         # which default to using Apport: https://wiki.ubuntu.com/Apport
281
282     def assertWaitProcess(self, process, expected_return=0):
283         '''Wait for a process to finish and check its exit code.'''
284         process.wait_and_log()
285         self.assertEqual(process.returncode, expected_return)
286
287     def runProcess(self, args, env=None, shell=False):
288         '''Start a process and wait for it to finish.'''
289         process = self.startProcess(args, env=env, shell=shell)
290         process.wait_and_log()
291         return process
292
293     def assertRun(self, args, env=None, shell=False, expected_return=0):
294         '''Start a process and wait for it to finish. Check its return code.'''
295         process = self.runProcess(args, env=env, shell=shell)
296         self.assertEqual(process.returncode, expected_return)
297         return process