2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
10 '''Subprocess test case superclass'''
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
26 # XXX This should probably be in config.py and settable from
28 process_timeout = 300 # Seconds
30 def cat_dhcp_command(mode):
31 '''Create a command string for dumping dhcp.pcap to stdout'''
32 # XXX Do this in Python in a thread?
35 sd_cmd = '"{}" '.format(sys.executable)
36 sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
39 class LoggingPopen(subprocess.Popen):
40 '''Run a process using subprocess.Popen. Capture and log its output.
42 Stdout and stderr are captured to memory and decoded as UTF-8. The
43 program command and output is written to log_fd.
45 def __init__(self, proc_args, *args, **kwargs):
46 self.log_fd = kwargs.pop('log_fd', None)
47 kwargs['stdout'] = subprocess.PIPE
48 kwargs['stderr'] = subprocess.PIPE
49 # Make sure communicate() gives us bytes.
50 kwargs['universal_newlines'] = False
51 self.cmd_str = 'command ' + repr(proc_args)
52 super().__init__(proc_args, *args, **kwargs)
56 def wait_and_log(self):
57 '''Wait for the process to finish and log its output.'''
58 out_data, err_data = self.communicate(timeout=process_timeout)
59 out_log = out_data.decode('UTF-8', 'replace')
60 err_log = err_data.decode('UTF-8', 'replace')
62 self.log_fd.write('-- Begin stdout for {} --\n'.format(self.cmd_str))
63 self.log_fd.write(out_log)
64 self.log_fd.write('-- End stdout for {} --\n'.format(self.cmd_str))
65 self.log_fd.write('-- Begin stderr for {} --\n'.format(self.cmd_str))
66 self.log_fd.write(err_log)
67 self.log_fd.write('-- End stderr for {} --\n'.format(self.cmd_str))
69 # Throwing a UnicodeDecodeError exception here is arguably a good thing.
70 self.stdout_str = out_data.decode('UTF-8', 'strict')
71 self.stderr_str = err_data.decode('UTF-8', 'strict')
73 def stop_process(self, kill=False):
74 '''Stop the process immediately.'''
81 '''Terminate the process. Do not log its output.'''
82 # XXX Currently unused.
83 self.stop_process(kill=False)
86 '''Kill the process. Do not log its output.'''
87 self.stop_process(kill=True)
89 class SubprocessTestCase(unittest.TestCase):
90 '''Run a program and gather its stdout and stderr.'''
92 def __init__(self, *args, **kwargs):
93 super().__init__(*args, **kwargs)
95 self.exit_command_line = 1
101 self.cleanup_files = []
104 def log_fd_write_bytes(self, log_data):
105 self.log_fd.write(log_data)
107 def filename_from_id(self, filename):
108 '''Generate a filename prefixed with our test ID.'''
109 id_filename = self.id() + '.' + filename
110 if id_filename not in self.cleanup_files:
111 self.cleanup_files.append(id_filename)
114 def kill_processes(self):
115 '''Kill any processes we've opened so far'''
116 for proc in self.processes:
122 def _error_count(self, result):
125 if hasattr(result, 'failures'):
126 # Python standard unittest runner
127 return len(result.failures) + len(result.errors)
128 if hasattr(result, '_excinfo'):
130 return len(result._excinfo or [])
131 self.fail("Unexpected test result %r" % result)
133 def run(self, result=None):
134 # Subclass run() so that we can do the following:
135 # - Open our log file and add it to the cleanup list.
136 # - Check our result before and after the run so that we can tell
137 # if the current test was successful.
139 # Probably not needed, but shouldn't hurt.
140 self.kill_processes()
142 self.log_fname = self.filename_from_id('log')
143 # Our command line utilities generate UTF-8. The log file endcoding
144 # needs to match that.
145 # XXX newline='\n' works for now, but we might have to do more work
146 # to handle line endings in the future.
147 self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
148 self.cleanup_files.append(self.log_fname)
149 pre_run_problem_count = self._error_count(result)
151 super().run(result=result)
152 except KeyboardInterrupt:
153 # XXX This doesn't seem to work on Windows, which is where we need it the most.
154 self.kill_processes()
156 # Tear down our test. We don't do this in tearDown() because Python 3
157 # updates "result" after calling tearDown().
158 self.kill_processes()
161 post_run_problem_count = self._error_count(result)
162 if pre_run_problem_count != post_run_problem_count:
163 self.dump_files.append(self.log_fname)
164 # Leave some evidence behind.
165 self.cleanup_files = []
166 print('\nProcess output for {}:'.format(self.id()))
167 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
169 sys.stdout.write(line)
170 for filename in self.cleanup_files:
175 self.cleanup_files = []
177 def getCaptureInfo(self, capinfos_args=None, cap_file=None):
178 '''Run capinfos on a capture file and log its output.
180 capinfos_args must be a sequence.
181 Default cap_file is <test id>.testout.pcap.'''
182 # XXX convert users to use a new fixture instead of this function.
183 cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
185 cap_file = self.filename_from_id('testout.pcap')
186 self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
187 capinfos_cmd = [cmd_capinfos]
188 if capinfos_args is not None:
189 capinfos_cmd += capinfos_args
190 capinfos_cmd.append(cap_file)
191 capinfos_data = subprocess.check_output(capinfos_cmd)
192 capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
193 self.log_fd.write(capinfos_stdout)
194 return capinfos_stdout
196 def checkPacketCount(self, num_packets, cap_file=None):
197 '''Make sure a capture file contains a specific number of packets.'''
198 got_num_packets = False
199 capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
200 count_pat = r'Number of packets:\s+{}'.format(num_packets)
201 if re.search(count_pat, capinfos_testout):
202 got_num_packets = True
203 self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
205 def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
206 '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
208 self.assertTrue(count_stdout or count_stderr, 'No output to count.')
211 proc = self.processes[-1]
215 out_data = proc.stdout_str
217 out_data += proc.stderr_str
219 if search_pat is None:
220 return len(out_data.splitlines())
222 search_re = re.compile(search_pat)
223 for line in out_data.splitlines():
224 if search_re.search(line):
229 def grepOutput(self, search_pat, proc=None):
230 return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
232 def diffOutput(self, blob_a, blob_b, *args, **kwargs):
233 '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
235 blob_a and blob_b must be UTF-8 strings.'''
236 lines_a = blob_a.splitlines()
237 lines_b = blob_b.splitlines()
238 diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
241 self.log_fd.write('-- Begin diff output --\n')
242 self.log_fd.writelines(diff)
243 self.log_fd.write('-- End diff output --\n')
247 def startProcess(self, proc_args, stdin=None, env=None, shell=False):
248 '''Start a process in the background. Returns a subprocess.Popen object.
250 You typically wait for it using waitProcess() or assertWaitProcess().'''
252 # Apply default test environment if no override is provided.
253 env = getattr(self, 'injected_test_env', None)
254 # Not all tests need test_env, but those that use runProcess or
255 # startProcess must either pass an explicit environment or load the
256 # fixture (via a test method parameter or class decorator).
257 assert not (env is None and hasattr(self, '_fixture_request')), \
258 "Decorate class with @fixtures.mark_usefixtures('test_env')"
260 # Avoid using the test user's real environment by default.
261 env = config.test_env
262 proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
263 self.processes.append(proc)
266 def waitProcess(self, process):
267 '''Wait for a process to finish.'''
268 process.wait_and_log()
269 # XXX The shell version ran processes using a script called run_and_catch_crashes
270 # which looked for core dumps and printed stack traces if found. We might want
271 # to do something similar here. This may not be easy on modern Ubuntu systems,
272 # which default to using Apport: https://wiki.ubuntu.com/Apport
274 def assertWaitProcess(self, process, expected_return=0):
275 '''Wait for a process to finish and check its exit code.'''
276 process.wait_and_log()
277 self.assertEqual(process.returncode, expected_return)
279 def runProcess(self, args, env=None, shell=False):
280 '''Start a process and wait for it to finish.'''
281 process = self.startProcess(args, env=env, shell=shell)
282 process.wait_and_log()
285 def assertRun(self, args, env=None, shell=False, expected_return=0):
286 '''Start a process and wait for it to finish. Check its return code.'''
287 process = self.runProcess(args, env=env, shell=shell)
288 self.assertEqual(process.returncode, expected_return)