2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
10 '''Subprocess test case superclass'''
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
26 # XXX This should probably be in config.py and settable from
28 if sys.version_info[0] >= 3:
29 process_timeout = 300 # Seconds
31 def capture_command(cmd, *args, **kwargs):
32 '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
34 If shell is true, return a string. Otherwise, return a list of arguments.'''
35 shell = kwargs.pop('shell', False)
37 cap_cmd = ['"' + cmd + '"']
40 if cmd == config.cmd_wireshark:
41 cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
44 return ' '.join(cap_cmd)
48 def cat_dhcp_command(mode):
49 '''Create a command string for dumping dhcp.pcap to stdout'''
50 # XXX Do this in Python in a thread?
53 sd_cmd = '"{}" '.format(sys.executable)
54 sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
57 class LoggingPopen(subprocess.Popen):
58 '''Run a process using subprocess.Popen. Capture and log its output.
60 Stdout and stderr are captured to memory and decoded as UTF-8. The
61 program command and output is written to log_fd.
63 def __init__(self, proc_args, *args, **kwargs):
64 self.log_fd = kwargs.pop('log_fd', None)
65 kwargs['stdout'] = subprocess.PIPE
66 kwargs['stderr'] = subprocess.PIPE
67 # Make sure communicate() gives us bytes.
68 kwargs['universal_newlines'] = False
69 self.cmd_str = 'command ' + repr(proc_args)
70 super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
74 def wait_and_log(self):
75 '''Wait for the process to finish and log its output.'''
76 # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
77 if sys.version_info[0] >= 3:
78 out_data, err_data = self.communicate(timeout=process_timeout)
79 out_log = out_data.decode('UTF-8', 'replace')
80 err_log = err_data.decode('UTF-8', 'replace')
82 out_data, err_data = self.communicate()
83 out_log = unicode(out_data, 'UTF-8', 'replace')
84 err_log = unicode(err_data, 'UTF-8', 'replace')
85 # Throwing a UnicodeDecodeError exception here is arguably a good thing.
86 self.stdout_str = out_data.decode('UTF-8', 'strict')
87 self.stderr_str = err_data.decode('UTF-8', 'strict')
89 self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
90 self.log_fd.write(out_log)
91 self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
92 self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
93 self.log_fd.write(err_log)
94 self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
97 def stop_process(self, kill=False):
98 '''Stop the process immediately.'''
100 super(LoggingPopen, self).kill()
102 super(LoggingPopen, self).terminate()
105 '''Terminate the process. Do not log its output.'''
106 # XXX Currently unused.
107 self.stop_process(kill=False)
110 '''Kill the process. Do not log its output.'''
111 self.stop_process(kill=True)
113 class SubprocessTestCase(unittest.TestCase):
114 '''Run a program and gather its stdout and stderr.'''
116 def __init__(self, *args, **kwargs):
117 super(SubprocessTestCase, self).__init__(*args, **kwargs)
119 self.exit_command_line = 1
121 self.exit_code = None
122 self.log_fname = None
125 self.cleanup_files = []
128 def log_fd_write_bytes(self, log_data):
129 if sys.version_info[0] >= 3:
130 self.log_fd.write(log_data)
132 self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
134 def filename_from_id(self, filename):
135 '''Generate a filename prefixed with our test ID.'''
136 id_filename = self.id() + '.' + filename
137 if id_filename not in self.cleanup_files:
138 self.cleanup_files.append(id_filename)
141 def kill_processes(self):
142 '''Kill any processes we've opened so far'''
143 for proc in self.processes:
149 def _error_count(self, result):
152 if hasattr(result, 'failures'):
153 # Python standard unittest runner
154 return len(result.failures) + len(result.errors)
155 if hasattr(result, '_excinfo'):
157 return len(result._excinfo or [])
158 self.fail("Unexpected test result %r" % result)
160 def run(self, result=None):
161 # Subclass run() so that we can do the following:
162 # - Open our log file and add it to the cleanup list.
163 # - Check our result before and after the run so that we can tell
164 # if the current test was successful.
166 # Probably not needed, but shouldn't hurt.
167 self.kill_processes()
169 self.log_fname = self.filename_from_id('log')
170 # Our command line utilities generate UTF-8. The log file endcoding
171 # needs to match that.
172 # XXX newline='\n' works for now, but we might have to do more work
173 # to handle line endings in the future.
174 self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
175 self.cleanup_files.append(self.log_fname)
176 pre_run_problem_count = self._error_count(result)
178 super(SubprocessTestCase, self).run(result=result)
179 except KeyboardInterrupt:
180 # XXX This doesn't seem to work on Windows, which is where we need it the most.
181 self.kill_processes()
183 # Tear down our test. We don't do this in tearDown() because Python 3
184 # updates "result" after calling tearDown().
185 self.kill_processes()
188 post_run_problem_count = self._error_count(result)
189 if pre_run_problem_count != post_run_problem_count:
190 self.dump_files.append(self.log_fname)
191 # Leave some evidence behind.
192 self.cleanup_files = []
193 print('\nProcess output for {}:'.format(self.id()))
194 with io.open(self.log_fname, 'r', encoding='UTF-8', errors='backslashreplace') as log_fd:
196 sys.stdout.write(line)
197 for filename in self.cleanup_files:
202 self.cleanup_files = []
204 def getCaptureInfo(self, capinfos_args=None, cap_file=None):
205 '''Run capinfos on a capture file and log its output.
207 capinfos_args must be a sequence.
208 Default cap_file is <test id>.testout.pcap.'''
210 cap_file = self.filename_from_id('testout.pcap')
211 self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
212 capinfos_cmd = [config.cmd_capinfos]
213 if capinfos_args is not None:
214 capinfos_cmd += capinfos_args
215 capinfos_cmd.append(cap_file)
216 capinfos_data = subprocess.check_output(capinfos_cmd)
217 if sys.version_info[0] >= 3:
218 capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
220 capinfos_stdout = unicode(capinfos_data, 'UTF-8', 'replace')
221 self.log_fd.write(capinfos_stdout)
222 return capinfos_stdout
224 def checkPacketCount(self, num_packets, cap_file=None):
225 '''Make sure a capture file contains a specific number of packets.'''
226 got_num_packets = False
227 capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
228 count_pat = 'Number of packets:\s+{}'.format(num_packets)
229 if re.search(count_pat, capinfos_testout):
230 got_num_packets = True
231 self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
233 def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
234 '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
236 self.assertTrue(count_stdout or count_stderr, 'No output to count.')
239 proc = self.processes[-1]
243 out_data = proc.stdout_str
245 out_data += proc.stderr_str
247 if search_pat is None:
248 return len(out_data.splitlines())
250 search_re = re.compile(search_pat)
251 for line in out_data.splitlines():
252 if search_re.search(line):
257 def grepOutput(self, search_pat, proc=None):
258 return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
260 def diffOutput(self, blob_a, blob_b, *args, **kwargs):
261 '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
263 blob_a and blob_b must be UTF-8 strings.'''
264 lines_a = blob_a.splitlines()
265 lines_b = blob_b.splitlines()
266 diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
268 if sys.version_info[0] < 3 and not isinstance(diff, unicode):
269 diff = unicode(diff, 'UTF-8', 'replace')
271 self.log_fd.write(u'-- Begin diff output --\n')
272 self.log_fd.writelines(diff)
273 self.log_fd.write(u'-- End diff output --\n')
277 def startProcess(self, proc_args, stdin=None, env=None, shell=False):
278 '''Start a process in the background. Returns a subprocess.Popen object.
280 You typically wait for it using waitProcess() or assertWaitProcess().'''
282 # Avoid using the test user's real environment by default.
283 env = config.test_env
284 proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
285 self.processes.append(proc)
288 def waitProcess(self, process):
289 '''Wait for a process to finish.'''
290 process.wait_and_log()
291 # XXX The shell version ran processes using a script called run_and_catch_crashes
292 # which looked for core dumps and printed stack traces if found. We might want
293 # to do something similar here. This may not be easy on modern Ubuntu systems,
294 # which default to using Apport: https://wiki.ubuntu.com/Apport
296 def assertWaitProcess(self, process, expected_return=0):
297 '''Wait for a process to finish and check its exit code.'''
298 process.wait_and_log()
299 self.assertEqual(process.returncode, expected_return)
301 def runProcess(self, args, env=None, shell=False):
302 '''Start a process and wait for it to finish.'''
303 process = self.startProcess(args, env=env, shell=shell)
304 process.wait_and_log()
307 def assertRun(self, args, env=None, shell=False, expected_return=0):
308 '''Start a process and wait for it to finish. Check its return code.'''
309 process = self.runProcess(args, env=env, shell=shell)
310 self.assertEqual(process.returncode, expected_return)