test: finalize suite_capture conversion to fixtures, drop config.py
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import config
13 import difflib
14 import io
15 import os
16 import os.path
17 import re
18 import subprocess
19 import sys
20 import unittest
21
22 # To do:
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
25
26 process_timeout = 300 # Seconds
27
28 def cat_dhcp_command(mode):
29     '''Create a command string for dumping dhcp.pcap to stdout'''
30     # XXX Do this in Python in a thread?
31     sd_cmd = ''
32     if sys.executable:
33         sd_cmd = '"{}" '.format(sys.executable)
34     this_dir = os.path.dirname(__file__)
35     sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
36     return sd_cmd
37
38 class LoggingPopen(subprocess.Popen):
39     '''Run a process using subprocess.Popen. Capture and log its output.
40
41     Stdout and stderr are captured to memory and decoded as UTF-8. The
42     program command and output is written to log_fd.
43     '''
44     def __init__(self, proc_args, *args, **kwargs):
45         self.log_fd = kwargs.pop('log_fd', None)
46         kwargs['stdout'] = subprocess.PIPE
47         kwargs['stderr'] = subprocess.PIPE
48         # Make sure communicate() gives us bytes.
49         kwargs['universal_newlines'] = False
50         self.cmd_str = 'command ' + repr(proc_args)
51         super().__init__(proc_args, *args, **kwargs)
52         self.stdout_str = ''
53         self.stderr_str = ''
54
55     def wait_and_log(self):
56         '''Wait for the process to finish and log its output.'''
57         out_data, err_data = self.communicate(timeout=process_timeout)
58         out_log = out_data.decode('UTF-8', 'replace')
59         err_log = err_data.decode('UTF-8', 'replace')
60         self.log_fd.flush()
61         self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
62         self.log_fd.write(out_log)
63         self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
64         self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
65         self.log_fd.write(err_log)
66         self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
67         self.log_fd.flush()
68         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
69         self.stdout_str = out_data.decode('UTF-8', 'strict')
70         self.stderr_str = err_data.decode('UTF-8', 'strict')
71
72     def stop_process(self, kill=False):
73         '''Stop the process immediately.'''
74         if kill:
75             super().kill()
76         else:
77             super().terminate()
78
79     def terminate(self):
80         '''Terminate the process. Do not log its output.'''
81         # XXX Currently unused.
82         self.stop_process(kill=False)
83
84     def kill(self):
85         '''Kill the process. Do not log its output.'''
86         self.stop_process(kill=True)
87
88 class SubprocessTestCase(unittest.TestCase):
89     '''Run a program and gather its stdout and stderr.'''
90
91     def __init__(self, *args, **kwargs):
92         super().__init__(*args, **kwargs)
93         self.exit_ok = 0
94         self.exit_command_line = 1
95         self.exit_error = 2
96         self.exit_code = None
97         self.log_fname = None
98         self.log_fd = None
99         self.processes = []
100         self.cleanup_files = []
101         self.dump_files = []
102
103     def log_fd_write_bytes(self, log_data):
104         self.log_fd.write(log_data)
105
106     def filename_from_id(self, filename):
107         '''Generate a filename prefixed with our test ID.'''
108         id_filename = self.id() + '.' + filename
109         if id_filename not in self.cleanup_files:
110             self.cleanup_files.append(id_filename)
111         return id_filename
112
113     def kill_processes(self):
114         '''Kill any processes we've opened so far'''
115         for proc in self.processes:
116             try:
117                 proc.kill()
118             except:
119                 pass
120
121     def _error_count(self, result):
122         if not result:
123             return 0
124         if hasattr(result, 'failures'):
125             # Python standard unittest runner
126             return len(result.failures) + len(result.errors)
127         if hasattr(result, '_excinfo'):
128             # pytest test runner
129             return len(result._excinfo or [])
130         self.fail("Unexpected test result %r" % result)
131
132     def run(self, result=None):
133         # Subclass run() so that we can do the following:
134         # - Open our log file and add it to the cleanup list.
135         # - Check our result before and after the run so that we can tell
136         #   if the current test was successful.
137
138         # Probably not needed, but shouldn't hurt.
139         self.kill_processes()
140         self.processes = []
141         self.log_fname = self.filename_from_id('log')
142         # Our command line utilities generate UTF-8. The log file endcoding
143         # needs to match that.
144         # XXX newline='\n' works for now, but we might have to do more work
145         # to handle line endings in the future.
146         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
147         self.cleanup_files.append(self.log_fname)
148         pre_run_problem_count = self._error_count(result)
149         try:
150             super().run(result=result)
151         except KeyboardInterrupt:
152             # XXX This doesn't seem to work on Windows, which is where we need it the most.
153             self.kill_processes()
154
155         # Tear down our test. We don't do this in tearDown() because Python 3
156         # updates "result" after calling tearDown().
157         self.kill_processes()
158         self.log_fd.close()
159         if result:
160             post_run_problem_count = self._error_count(result)
161             if pre_run_problem_count != post_run_problem_count:
162                 self.dump_files.append(self.log_fname)
163                 # Leave some evidence behind.
164                 self.cleanup_files = []
165                 print('\nProcess output for {}:'.format(self.id()))
166                 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
167                     for line in log_fd:
168                         sys.stdout.write(line)
169         for filename in self.cleanup_files:
170             try:
171                 os.unlink(filename)
172             except OSError:
173                 pass
174         self.cleanup_files = []
175
176     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
177         '''Run capinfos on a capture file and log its output.
178
179         capinfos_args must be a sequence.
180         Default cap_file is <test id>.testout.pcap.'''
181         # XXX convert users to use a new fixture instead of this function.
182         cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
183         if not cap_file:
184             cap_file = self.filename_from_id('testout.pcap')
185         self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
186         capinfos_cmd = [cmd_capinfos]
187         if capinfos_args is not None:
188             capinfos_cmd += capinfos_args
189         capinfos_cmd.append(cap_file)
190         capinfos_data = subprocess.check_output(capinfos_cmd)
191         capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
192         self.log_fd.write(capinfos_stdout)
193         return capinfos_stdout
194
195     def checkPacketCount(self, num_packets, cap_file=None):
196         '''Make sure a capture file contains a specific number of packets.'''
197         got_num_packets = False
198         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
199         count_pat = r'Number of packets:\s+{}'.format(num_packets)
200         if re.search(count_pat, capinfos_testout):
201             got_num_packets = True
202         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
203
204     def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
205         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
206         match_count = 0
207         self.assertTrue(count_stdout or count_stderr, 'No output to count.')
208
209         if proc is None:
210             proc = self.processes[-1]
211
212         out_data = ''
213         if count_stdout:
214             out_data = proc.stdout_str
215         if count_stderr:
216             out_data += proc.stderr_str
217
218         if search_pat is None:
219             return len(out_data.splitlines())
220
221         search_re = re.compile(search_pat)
222         for line in out_data.splitlines():
223             if search_re.search(line):
224                 match_count += 1
225
226         return match_count
227
228     def grepOutput(self, search_pat, proc=None):
229         return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
230
231     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
232         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
233
234         blob_a and blob_b must be UTF-8 strings.'''
235         lines_a = blob_a.splitlines()
236         lines_b = blob_b.splitlines()
237         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
238         if len(diff) > 0:
239             self.log_fd.flush()
240             self.log_fd.write('-- Begin diff output --\n')
241             self.log_fd.writelines(diff)
242             self.log_fd.write('-- End diff output --\n')
243             return False
244         return True
245
246     def startProcess(self, proc_args, stdin=None, env=None, shell=False):
247         '''Start a process in the background. Returns a subprocess.Popen object.
248
249         You typically wait for it using waitProcess() or assertWaitProcess().'''
250         if env is None:
251             # Apply default test environment if no override is provided.
252             env = getattr(self, 'injected_test_env', None)
253             # Not all tests need test_env, but those that use runProcess or
254             # startProcess must either pass an explicit environment or load the
255             # fixture (via a test method parameter or class decorator).
256             assert not (env is None and hasattr(self, '_fixture_request')), \
257                 "Decorate class with @fixtures.mark_usefixtures('test_env')"
258         proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
259         self.processes.append(proc)
260         return proc
261
262     def waitProcess(self, process):
263         '''Wait for a process to finish.'''
264         process.wait_and_log()
265         # XXX The shell version ran processes using a script called run_and_catch_crashes
266         # which looked for core dumps and printed stack traces if found. We might want
267         # to do something similar here. This may not be easy on modern Ubuntu systems,
268         # which default to using Apport: https://wiki.ubuntu.com/Apport
269
270     def assertWaitProcess(self, process, expected_return=0):
271         '''Wait for a process to finish and check its exit code.'''
272         process.wait_and_log()
273         self.assertEqual(process.returncode, expected_return)
274
275     def runProcess(self, args, env=None, shell=False):
276         '''Start a process and wait for it to finish.'''
277         process = self.startProcess(args, env=env, shell=shell)
278         process.wait_and_log()
279         return process
280
281     def assertRun(self, args, env=None, shell=False, expected_return=0):
282         '''Start a process and wait for it to finish. Check its return code.'''
283         process = self.runProcess(args, env=env, shell=shell)
284         self.assertEqual(process.returncode, expected_return)
285         return process