2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
10 '''Subprocess test case superclass'''
22 # - Add a subprocesstest.SkipUnlessCapture decorator?
23 # - Try to catch crashes? See the comments below in waitProcess.
25 process_timeout = 300 # Seconds
27 def cat_dhcp_command(mode):
28 '''Create a command string for dumping dhcp.pcap to stdout'''
29 # XXX Do this in Python in a thread?
32 sd_cmd = '"{}" '.format(sys.executable)
33 this_dir = os.path.dirname(__file__)
34 sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
37 def cat_cap_file_command(cap_files):
38 '''Create a command string for dumping one or more capture files to stdout'''
39 # XXX Do this in Python in a thread?
40 if isinstance(cap_files, str):
41 cap_files = [ cap_files ]
42 quoted_paths = ' '.join('"{}"'.format(cap_file) for cap_file in cap_files)
43 if sys.platform.startswith('win32'):
44 # https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb491026(v=technet.10)
45 # says that the `type` command "displays the contents of a text
46 # file." Copy to the console instead.
47 return 'copy {} CON'.format(quoted_paths)
48 return 'cat {}'.format(quoted_paths)
50 class LoggingPopen(subprocess.Popen):
51 '''Run a process using subprocess.Popen. Capture and log its output.
53 Stdout and stderr are captured to memory and decoded as UTF-8. The
54 program command and output is written to log_fd.
56 def __init__(self, proc_args, *args, **kwargs):
57 self.log_fd = kwargs.pop('log_fd', None)
58 kwargs['stdout'] = subprocess.PIPE
59 kwargs['stderr'] = subprocess.PIPE
60 # Make sure communicate() gives us bytes.
61 kwargs['universal_newlines'] = False
62 self.cmd_str = 'command ' + repr(proc_args)
63 super().__init__(proc_args, *args, **kwargs)
67 def wait_and_log(self):
68 '''Wait for the process to finish and log its output.'''
69 out_data, err_data = self.communicate(timeout=process_timeout)
70 out_log = out_data.decode('UTF-8', 'replace')
71 err_log = err_data.decode('UTF-8', 'replace')
73 self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
74 self.log_fd.write(out_log)
75 self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
76 self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
77 self.log_fd.write(err_log)
78 self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
80 # Throwing a UnicodeDecodeError exception here is arguably a good thing.
81 self.stdout_str = out_data.decode('UTF-8', 'strict')
82 self.stderr_str = err_data.decode('UTF-8', 'strict')
84 def stop_process(self, kill=False):
85 '''Stop the process immediately.'''
92 '''Terminate the process. Do not log its output.'''
93 # XXX Currently unused.
94 self.stop_process(kill=False)
97 '''Kill the process. Do not log its output.'''
98 self.stop_process(kill=True)
100 class SubprocessTestCase(unittest.TestCase):
101 '''Run a program and gather its stdout and stderr.'''
103 def __init__(self, *args, **kwargs):
104 super().__init__(*args, **kwargs)
106 self.exit_command_line = 1
108 self.exit_code = None
109 self.log_fname = None
112 self.cleanup_files = []
115 def log_fd_write_bytes(self, log_data):
116 self.log_fd.write(log_data)
118 def filename_from_id(self, filename):
119 '''Generate a filename prefixed with our test ID.'''
120 id_filename = self.id() + '.' + filename
121 if id_filename not in self.cleanup_files:
122 self.cleanup_files.append(id_filename)
125 def kill_processes(self):
126 '''Kill any processes we've opened so far'''
127 for proc in self.processes:
133 def _error_count(self, result):
136 if hasattr(result, 'failures'):
137 # Python standard unittest runner
138 return len(result.failures) + len(result.errors)
139 if hasattr(result, '_excinfo'):
141 return len(result._excinfo or [])
142 self.fail("Unexpected test result %r" % result)
144 def run(self, result=None):
145 # Subclass run() so that we can do the following:
146 # - Open our log file and add it to the cleanup list.
147 # - Check our result before and after the run so that we can tell
148 # if the current test was successful.
150 # Probably not needed, but shouldn't hurt.
151 self.kill_processes()
153 self.log_fname = self.filename_from_id('log')
154 # Our command line utilities generate UTF-8. The log file endcoding
155 # needs to match that.
156 # XXX newline='\n' works for now, but we might have to do more work
157 # to handle line endings in the future.
158 self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
159 self.cleanup_files.append(self.log_fname)
160 pre_run_problem_count = self._error_count(result)
162 super().run(result=result)
163 except KeyboardInterrupt:
164 # XXX This doesn't seem to work on Windows, which is where we need it the most.
165 self.kill_processes()
167 # Tear down our test. We don't do this in tearDown() because Python 3
168 # updates "result" after calling tearDown().
169 self.kill_processes()
172 post_run_problem_count = self._error_count(result)
173 if pre_run_problem_count != post_run_problem_count:
174 self.dump_files.append(self.log_fname)
175 # Leave some evidence behind.
176 self.cleanup_files = []
177 print('\nProcess output for {}:'.format(self.id()))
178 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
180 sys.stdout.write(line)
181 for filename in self.cleanup_files:
186 self.cleanup_files = []
188 def getCaptureInfo(self, capinfos_args=None, cap_file=None):
189 '''Run capinfos on a capture file and log its output.
191 capinfos_args must be a sequence.
192 Default cap_file is <test id>.testout.pcap.'''
193 # XXX convert users to use a new fixture instead of this function.
194 cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
196 cap_file = self.filename_from_id('testout.pcap')
197 self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
198 capinfos_cmd = [cmd_capinfos]
199 if capinfos_args is not None:
200 capinfos_cmd += capinfos_args
201 capinfos_cmd.append(cap_file)
202 capinfos_data = subprocess.check_output(capinfos_cmd)
203 capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
204 self.log_fd.write(capinfos_stdout)
205 return capinfos_stdout
207 def checkPacketCount(self, num_packets, cap_file=None):
208 '''Make sure a capture file contains a specific number of packets.'''
209 got_num_packets = False
210 capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
211 count_pat = r'Number of packets:\s+{}'.format(num_packets)
212 if re.search(count_pat, capinfos_testout):
213 got_num_packets = True
214 self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
216 def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
217 '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
219 self.assertTrue(count_stdout or count_stderr, 'No output to count.')
222 proc = self.processes[-1]
226 out_data = proc.stdout_str
228 out_data += proc.stderr_str
230 if search_pat is None:
231 return len(out_data.splitlines())
233 search_re = re.compile(search_pat)
234 for line in out_data.splitlines():
235 if search_re.search(line):
240 def grepOutput(self, search_pat, proc=None):
241 return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
243 def diffOutput(self, blob_a, blob_b, *args, **kwargs):
244 '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
246 blob_a and blob_b must be UTF-8 strings.'''
247 lines_a = blob_a.splitlines()
248 lines_b = blob_b.splitlines()
249 diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
252 self.log_fd.write('-- Begin diff output --\n')
253 self.log_fd.writelines(diff)
254 self.log_fd.write('-- End diff output --\n')
258 def startProcess(self, proc_args, stdin=None, env=None, shell=False):
259 '''Start a process in the background. Returns a subprocess.Popen object.
261 You typically wait for it using waitProcess() or assertWaitProcess().'''
263 # Apply default test environment if no override is provided.
264 env = getattr(self, 'injected_test_env', None)
265 # Not all tests need test_env, but those that use runProcess or
266 # startProcess must either pass an explicit environment or load the
267 # fixture (via a test method parameter or class decorator).
268 assert not (env is None and hasattr(self, '_fixture_request')), \
269 "Decorate class with @fixtures.mark_usefixtures('test_env')"
270 proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
271 self.processes.append(proc)
274 def waitProcess(self, process):
275 '''Wait for a process to finish.'''
276 process.wait_and_log()
277 # XXX The shell version ran processes using a script called run_and_catch_crashes
278 # which looked for core dumps and printed stack traces if found. We might want
279 # to do something similar here. This may not be easy on modern Ubuntu systems,
280 # which default to using Apport: https://wiki.ubuntu.com/Apport
282 def assertWaitProcess(self, process, expected_return=0):
283 '''Wait for a process to finish and check its exit code.'''
284 process.wait_and_log()
285 self.assertEqual(process.returncode, expected_return)
287 def runProcess(self, args, env=None, shell=False):
288 '''Start a process and wait for it to finish.'''
289 process = self.startProcess(args, env=env, shell=shell)
290 process.wait_and_log()
293 def assertRun(self, args, env=None, shell=False, expected_return=0):
294 '''Start a process and wait for it to finish. Check its return code.'''
295 process = self.runProcess(args, env=env, shell=shell)
296 self.assertEqual(process.returncode, expected_return)