test: convert capture tests to use fixtures, fix tests without dumpcap
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import config
13 import difflib
14 import io
15 import os
16 import os.path
17 import re
18 import subprocess
19 import sys
20 import unittest
21
22 # To do:
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
25
26 # XXX This should probably be in config.py and settable from
27 # the command line.
28 process_timeout = 300 # Seconds
29
30 def cat_dhcp_command(mode):
31     '''Create a command string for dumping dhcp.pcap to stdout'''
32     # XXX Do this in Python in a thread?
33     sd_cmd = ''
34     if sys.executable:
35         sd_cmd = '"{}" '.format(sys.executable)
36     sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
37     return sd_cmd
38
39 class LoggingPopen(subprocess.Popen):
40     '''Run a process using subprocess.Popen. Capture and log its output.
41
42     Stdout and stderr are captured to memory and decoded as UTF-8. The
43     program command and output is written to log_fd.
44     '''
45     def __init__(self, proc_args, *args, **kwargs):
46         self.log_fd = kwargs.pop('log_fd', None)
47         kwargs['stdout'] = subprocess.PIPE
48         kwargs['stderr'] = subprocess.PIPE
49         # Make sure communicate() gives us bytes.
50         kwargs['universal_newlines'] = False
51         self.cmd_str = 'command ' + repr(proc_args)
52         super().__init__(proc_args, *args, **kwargs)
53         self.stdout_str = ''
54         self.stderr_str = ''
55
56     def wait_and_log(self):
57         '''Wait for the process to finish and log its output.'''
58         out_data, err_data = self.communicate(timeout=process_timeout)
59         out_log = out_data.decode('UTF-8', 'replace')
60         err_log = err_data.decode('UTF-8', 'replace')
61         self.log_fd.flush()
62         self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
63         self.log_fd.write(out_log)
64         self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
65         self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
66         self.log_fd.write(err_log)
67         self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
68         self.log_fd.flush()
69         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
70         self.stdout_str = out_data.decode('UTF-8', 'strict')
71         self.stderr_str = err_data.decode('UTF-8', 'strict')
72
73     def stop_process(self, kill=False):
74         '''Stop the process immediately.'''
75         if kill:
76             super().kill()
77         else:
78             super().terminate()
79
80     def terminate(self):
81         '''Terminate the process. Do not log its output.'''
82         # XXX Currently unused.
83         self.stop_process(kill=False)
84
85     def kill(self):
86         '''Kill the process. Do not log its output.'''
87         self.stop_process(kill=True)
88
89 class SubprocessTestCase(unittest.TestCase):
90     '''Run a program and gather its stdout and stderr.'''
91
92     def __init__(self, *args, **kwargs):
93         super().__init__(*args, **kwargs)
94         self.exit_ok = 0
95         self.exit_command_line = 1
96         self.exit_error = 2
97         self.exit_code = None
98         self.log_fname = None
99         self.log_fd = None
100         self.processes = []
101         self.cleanup_files = []
102         self.dump_files = []
103
104     def log_fd_write_bytes(self, log_data):
105         self.log_fd.write(log_data)
106
107     def filename_from_id(self, filename):
108         '''Generate a filename prefixed with our test ID.'''
109         id_filename = self.id() + '.' + filename
110         if id_filename not in self.cleanup_files:
111             self.cleanup_files.append(id_filename)
112         return id_filename
113
114     def kill_processes(self):
115         '''Kill any processes we've opened so far'''
116         for proc in self.processes:
117             try:
118                 proc.kill()
119             except:
120                 pass
121
122     def _error_count(self, result):
123         if not result:
124             return 0
125         if hasattr(result, 'failures'):
126             # Python standard unittest runner
127             return len(result.failures) + len(result.errors)
128         if hasattr(result, '_excinfo'):
129             # pytest test runner
130             return len(result._excinfo or [])
131         self.fail("Unexpected test result %r" % result)
132
133     def run(self, result=None):
134         # Subclass run() so that we can do the following:
135         # - Open our log file and add it to the cleanup list.
136         # - Check our result before and after the run so that we can tell
137         #   if the current test was successful.
138
139         # Probably not needed, but shouldn't hurt.
140         self.kill_processes()
141         self.processes = []
142         self.log_fname = self.filename_from_id('log')
143         # Our command line utilities generate UTF-8. The log file endcoding
144         # needs to match that.
145         # XXX newline='\n' works for now, but we might have to do more work
146         # to handle line endings in the future.
147         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
148         self.cleanup_files.append(self.log_fname)
149         pre_run_problem_count = self._error_count(result)
150         try:
151             super().run(result=result)
152         except KeyboardInterrupt:
153             # XXX This doesn't seem to work on Windows, which is where we need it the most.
154             self.kill_processes()
155
156         # Tear down our test. We don't do this in tearDown() because Python 3
157         # updates "result" after calling tearDown().
158         self.kill_processes()
159         self.log_fd.close()
160         if result:
161             post_run_problem_count = self._error_count(result)
162             if pre_run_problem_count != post_run_problem_count:
163                 self.dump_files.append(self.log_fname)
164                 # Leave some evidence behind.
165                 self.cleanup_files = []
166                 print('\nProcess output for {}:'.format(self.id()))
167                 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
168                     for line in log_fd:
169                         sys.stdout.write(line)
170         for filename in self.cleanup_files:
171             try:
172                 os.unlink(filename)
173             except OSError:
174                 pass
175         self.cleanup_files = []
176
177     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
178         '''Run capinfos on a capture file and log its output.
179
180         capinfos_args must be a sequence.
181         Default cap_file is <test id>.testout.pcap.'''
182         # XXX convert users to use a new fixture instead of this function.
183         cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
184         if not cap_file:
185             cap_file = self.filename_from_id('testout.pcap')
186         self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
187         capinfos_cmd = [cmd_capinfos]
188         if capinfos_args is not None:
189             capinfos_cmd += capinfos_args
190         capinfos_cmd.append(cap_file)
191         capinfos_data = subprocess.check_output(capinfos_cmd)
192         capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
193         self.log_fd.write(capinfos_stdout)
194         return capinfos_stdout
195
196     def checkPacketCount(self, num_packets, cap_file=None):
197         '''Make sure a capture file contains a specific number of packets.'''
198         got_num_packets = False
199         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
200         count_pat = r'Number of packets:\s+{}'.format(num_packets)
201         if re.search(count_pat, capinfos_testout):
202             got_num_packets = True
203         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
204
205     def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
206         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
207         match_count = 0
208         self.assertTrue(count_stdout or count_stderr, 'No output to count.')
209
210         if proc is None:
211             proc = self.processes[-1]
212
213         out_data = ''
214         if count_stdout:
215             out_data = proc.stdout_str
216         if count_stderr:
217             out_data += proc.stderr_str
218
219         if search_pat is None:
220             return len(out_data.splitlines())
221
222         search_re = re.compile(search_pat)
223         for line in out_data.splitlines():
224             if search_re.search(line):
225                 match_count += 1
226
227         return match_count
228
229     def grepOutput(self, search_pat, proc=None):
230         return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
231
232     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
233         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
234
235         blob_a and blob_b must be UTF-8 strings.'''
236         lines_a = blob_a.splitlines()
237         lines_b = blob_b.splitlines()
238         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
239         if len(diff) > 0:
240             self.log_fd.flush()
241             self.log_fd.write('-- Begin diff output --\n')
242             self.log_fd.writelines(diff)
243             self.log_fd.write('-- End diff output --\n')
244             return False
245         return True
246
247     def startProcess(self, proc_args, stdin=None, env=None, shell=False):
248         '''Start a process in the background. Returns a subprocess.Popen object.
249
250         You typically wait for it using waitProcess() or assertWaitProcess().'''
251         if env is None:
252             # Apply default test environment if no override is provided.
253             env = getattr(self, 'injected_test_env', None)
254             # Not all tests need test_env, but those that use runProcess or
255             # startProcess must either pass an explicit environment or load the
256             # fixture (via a test method parameter or class decorator).
257             assert not (env is None and hasattr(self, '_fixture_request')), \
258                 "Decorate class with @fixtures.mark_usefixtures('test_env')"
259         if env is None:
260             # Avoid using the test user's real environment by default.
261             env = config.test_env
262         proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
263         self.processes.append(proc)
264         return proc
265
266     def waitProcess(self, process):
267         '''Wait for a process to finish.'''
268         process.wait_and_log()
269         # XXX The shell version ran processes using a script called run_and_catch_crashes
270         # which looked for core dumps and printed stack traces if found. We might want
271         # to do something similar here. This may not be easy on modern Ubuntu systems,
272         # which default to using Apport: https://wiki.ubuntu.com/Apport
273
274     def assertWaitProcess(self, process, expected_return=0):
275         '''Wait for a process to finish and check its exit code.'''
276         process.wait_and_log()
277         self.assertEqual(process.returncode, expected_return)
278
279     def runProcess(self, args, env=None, shell=False):
280         '''Start a process and wait for it to finish.'''
281         process = self.startProcess(args, env=env, shell=shell)
282         process.wait_and_log()
283         return process
284
285     def assertRun(self, args, env=None, shell=False, expected_return=0):
286         '''Start a process and wait for it to finish. Check its return code.'''
287         process = self.runProcess(args, env=env, shell=shell)
288         self.assertEqual(process.returncode, expected_return)
289         return process