2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
10 '''Subprocess test case superclass'''
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
26 # XXX This should probably be in config.py and settable from
28 if sys.version_info[0] >= 3:
29 process_timeout = 300 # Seconds
31 def capture_command(cmd, *args, **kwargs):
32 '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
34 If shell is true, return a string. Otherwise, return a list of arguments.'''
35 shell = kwargs.pop('shell', False)
37 cap_cmd = ['"' + cmd + '"']
40 if cmd == config.cmd_wireshark:
41 cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
44 return ' '.join(cap_cmd)
48 def cat_dhcp_command(mode):
49 '''Create a command string for dumping dhcp.pcap to stdout'''
50 # XXX Do this in Python in a thread?
53 sd_cmd = '"{}" '.format(sys.executable)
54 sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
57 class LoggingPopen(subprocess.Popen):
58 '''Run a process using subprocess.Popen. Capture and log its output.
60 Stdout and stderr are captured to memory and decoded as UTF-8. The
61 program command and output is written to log_fd.
63 def __init__(self, proc_args, *args, **kwargs):
64 self.log_fd = kwargs.pop('log_fd', None)
65 kwargs['stdout'] = subprocess.PIPE
66 kwargs['stderr'] = subprocess.PIPE
67 # Make sure communicate() gives us bytes.
68 kwargs['universal_newlines'] = False
69 self.cmd_str = 'command ' + repr(proc_args)
70 super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
74 def wait_and_log(self):
75 '''Wait for the process to finish and log its output.'''
76 # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
77 if sys.version_info[0] >= 3:
78 out_data, err_data = self.communicate(timeout=process_timeout)
79 out_log = out_data.decode('UTF-8', 'replace')
80 err_log = err_data.decode('UTF-8', 'replace')
82 out_data, err_data = self.communicate()
83 out_log = unicode(out_data, 'UTF-8', 'replace')
84 err_log = unicode(err_data, 'UTF-8', 'replace')
85 # Throwing a UnicodeDecodeError exception here is arguably a good thing.
86 self.stdout_str = out_data.decode('UTF-8', 'strict')
87 self.stderr_str = err_data.decode('UTF-8', 'strict')
89 self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
90 self.log_fd.write(out_log)
91 self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
92 self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
93 self.log_fd.write(err_log)
94 self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
97 def stop_process(self, kill=False):
98 '''Stop the process immediately.'''
100 super(LoggingPopen, self).kill()
102 super(LoggingPopen, self).terminate()
105 '''Terminate the process. Do not log its output.'''
106 # XXX Currently unused.
107 self.stop_process(kill=False)
110 '''Kill the process. Do not log its output.'''
111 self.stop_process(kill=True)
113 class SubprocessTestCase(unittest.TestCase):
114 '''Run a program and gather its stdout and stderr.'''
116 def __init__(self, *args, **kwargs):
117 super(SubprocessTestCase, self).__init__(*args, **kwargs)
119 self.exit_command_line = 1
121 self.exit_code = None
122 self.log_fname = None
125 self.cleanup_files = []
128 def log_fd_write_bytes(self, log_data):
129 if sys.version_info[0] >= 3:
130 self.log_fd.write(log_data)
132 self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
134 def filename_from_id(self, filename):
135 '''Generate a filename prefixed with our test ID.'''
136 id_filename = self.id() + '.' + filename
137 if id_filename not in self.cleanup_files:
138 self.cleanup_files.append(id_filename)
141 def kill_processes(self):
142 '''Kill any processes we've opened so far'''
143 for proc in self.processes:
149 def run(self, result=None):
150 # Subclass run() so that we can do the following:
151 # - Open our log file and add it to the cleanup list.
152 # - Check our result before and after the run so that we can tell
153 # if the current test was successful.
155 # Probably not needed, but shouldn't hurt.
156 self.kill_processes()
158 self.log_fname = self.filename_from_id('log')
159 # Our command line utilities generate UTF-8. The log file endcoding
160 # needs to match that.
161 self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8')
162 self.cleanup_files.append(self.log_fname)
163 pre_run_problem_count = 0
165 pre_run_problem_count = len(result.failures) + len(result.errors)
167 super(SubprocessTestCase, self).run(result=result)
168 except KeyboardInterrupt:
169 # XXX This doesn't seem to work on Windows, which is where we need it the most.
170 self.kill_processes()
172 # Tear down our test. We don't do this in tearDown() because Python 3
173 # updates "result" after calling tearDown().
174 self.kill_processes()
177 post_run_problem_count = len(result.failures) + len(result.errors)
178 if pre_run_problem_count != post_run_problem_count:
179 self.dump_files.append(self.log_fname)
180 # Leave some evidence behind.
181 self.cleanup_files = []
182 print('\nProcess output for {}:'.format(self.id()))
183 with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
185 sys.stdout.write(line)
186 for filename in self.cleanup_files:
191 self.cleanup_files = []
193 def getCaptureInfo(self, capinfos_args=None, cap_file=None):
194 '''Run capinfos on a capture file and log its output.
196 capinfos_args must be a sequence.
197 Default cap_file is <test id>.testout.pcap.'''
199 cap_file = self.filename_from_id('testout.pcap')
200 self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
201 capinfos_cmd = [config.cmd_capinfos]
202 if capinfos_args is not None:
203 capinfos_cmd += capinfos_args
204 capinfos_cmd.append(cap_file)
205 capinfos_stdout = str(subprocess.check_output(capinfos_cmd))
206 self.log_fd_write_bytes(capinfos_stdout)
207 return capinfos_stdout
209 def checkPacketCount(self, num_packets, cap_file=None):
210 '''Make sure a capture file contains a specific number of packets.'''
211 got_num_packets = False
212 capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
213 count_pat = 'Number of packets:\s+{}'.format(num_packets)
214 if re.search(count_pat, capinfos_testout):
215 got_num_packets = True
216 self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
218 def countOutput(self, search_pat, proc=None):
219 '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
222 proc = self.processes[-1]
223 # We might want to let the caller decide what we're searching.
224 out_data = proc.stdout_str + proc.stderr_str
225 search_re = re.compile(search_pat)
226 for line in out_data.splitlines():
227 if search_re.search(line):
231 def grepOutput(self, search_pat, proc=None):
232 return self.countOutput(search_pat, proc) > 0
234 def diffOutput(self, blob_a, blob_b, *args, **kwargs):
235 '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
237 blob_a and blob_b must be UTF-8 strings.'''
238 lines_a = blob_a.splitlines()
239 lines_b = blob_b.splitlines()
240 diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
242 if sys.version_info[0] < 3 and not isinstance(diff, unicode):
243 diff = unicode(diff, 'UTF-8', 'replace')
245 self.log_fd.write(u'-- Begin diff output --\n')
246 self.log_fd.writelines(diff)
247 self.log_fd.write(u'-- End diff output --\n')
251 def startProcess(self, proc_args, env=None, shell=False):
252 '''Start a process in the background. Returns a subprocess.Popen object.
254 You typically wait for it using waitProcess() or assertWaitProcess().'''
256 # Avoid using the test user's real environment by default.
257 env = config.test_env
258 proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
259 self.processes.append(proc)
262 def waitProcess(self, process):
263 '''Wait for a process to finish.'''
264 process.wait_and_log()
265 # XXX The shell version ran processes using a script called run_and_catch_crashes
266 # which looked for core dumps and printed stack traces if found. We might want
267 # to do something similar here. This may not be easy on modern Ubuntu systems,
268 # which default to using Apport: https://wiki.ubuntu.com/Apport
270 def assertWaitProcess(self, process, expected_return=0):
271 '''Wait for a process to finish and check its exit code.'''
272 process.wait_and_log()
273 self.assertEqual(process.returncode, expected_return)
275 def runProcess(self, args, env=None, shell=False):
276 '''Start a process and wait for it to finish.'''
277 process = self.startProcess(args, env=env, shell=shell)
278 process.wait_and_log()
281 def assertRun(self, args, env=None, shell=False, expected_return=0):
282 '''Start a process and wait for it to finish. Check its return code.'''
283 process = self.runProcess(args, env=env, shell=shell)
284 self.assertEqual(process.returncode, expected_return)