61bab5f970da5cd707c9913f0bce4a06ca9f9ca8
[metze/wireshark/wip.git] / test / subprocesstest.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 #
8 # SPDX-License-Identifier: GPL-2.0-or-later
9 #
10 '''Subprocess test case superclass'''
11
12 import config
13 import difflib
14 import io
15 import os
16 import os.path
17 import re
18 import subprocess
19 import sys
20 import unittest
21
22 # To do:
23 # - Add a subprocesstest.SkipUnlessCapture decorator?
24 # - Try to catch crashes? See the comments below in waitProcess.
25
26 # XXX This should probably be in config.py and settable from
27 # the command line.
28 if sys.version_info[0] >= 3:
29     process_timeout = 300 # Seconds
30
31 def capture_command(cmd, *args, **kwargs):
32     '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
33
34     If shell is true, return a string. Otherwise, return a list of arguments.'''
35     shell = kwargs.pop('shell', False)
36     if shell:
37         cap_cmd = ['"' + cmd + '"']
38     else:
39         cap_cmd = [cmd]
40     if cmd == config.cmd_wireshark:
41         cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
42     cap_cmd += args
43     if shell:
44         return ' '.join(cap_cmd)
45     else:
46         return cap_cmd
47
48 def cat_dhcp_command(mode):
49     '''Create a command string for dumping dhcp.pcap to stdout'''
50     # XXX Do this in Python in a thread?
51     sd_cmd = ''
52     if sys.executable:
53         sd_cmd = '"{}" '.format(sys.executable)
54     sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
55     return sd_cmd
56
57 class LoggingPopen(subprocess.Popen):
58     '''Run a process using subprocess.Popen. Capture and log its output.
59
60     Stdout and stderr are captured to memory and decoded as UTF-8. The
61     program command and output is written to log_fd.
62     '''
63     def __init__(self, proc_args, *args, **kwargs):
64         self.log_fd = kwargs.pop('log_fd', None)
65         kwargs['stdout'] = subprocess.PIPE
66         kwargs['stderr'] = subprocess.PIPE
67         # Make sure communicate() gives us bytes.
68         kwargs['universal_newlines'] = False
69         self.cmd_str = 'command ' + repr(proc_args)
70         super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
71         self.stdout_str = ''
72         self.stderr_str = ''
73
74     def wait_and_log(self):
75         '''Wait for the process to finish and log its output.'''
76         # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
77         if sys.version_info[0] >= 3:
78             out_data, err_data = self.communicate(timeout=process_timeout)
79             out_log = out_data.decode('UTF-8', 'replace')
80             err_log = err_data.decode('UTF-8', 'replace')
81         else:
82             out_data, err_data = self.communicate()
83             out_log = unicode(out_data, 'UTF-8', 'replace')
84             err_log = unicode(err_data, 'UTF-8', 'replace')
85         # Throwing a UnicodeDecodeError exception here is arguably a good thing.
86         self.stdout_str = out_data.decode('UTF-8', 'strict')
87         self.stderr_str = err_data.decode('UTF-8', 'strict')
88         self.log_fd.flush()
89         self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
90         self.log_fd.write(out_log)
91         self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
92         self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
93         self.log_fd.write(err_log)
94         self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
95         self.log_fd.flush()
96
97     def stop_process(self, kill=False):
98         '''Stop the process immediately.'''
99         if kill:
100             super(LoggingPopen, self).kill()
101         else:
102             super(LoggingPopen, self).terminate()
103
104     def terminate(self):
105         '''Terminate the process. Do not log its output.'''
106         # XXX Currently unused.
107         self.stop_process(kill=False)
108
109     def kill(self):
110         '''Kill the process. Do not log its output.'''
111         self.stop_process(kill=True)
112
113 class SubprocessTestCase(unittest.TestCase):
114     '''Run a program and gather its stdout and stderr.'''
115
116     def __init__(self, *args, **kwargs):
117         super(SubprocessTestCase, self).__init__(*args, **kwargs)
118         self.exit_ok = 0
119         self.exit_command_line = 1
120         self.exit_error = 2
121         self.exit_code = None
122         self.log_fname = None
123         self.log_fd = None
124         self.processes = []
125         self.cleanup_files = []
126         self.dump_files = []
127
128     def log_fd_write_bytes(self, log_data):
129         if sys.version_info[0] >= 3:
130             self.log_fd.write(log_data)
131         else:
132             self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
133
134     def filename_from_id(self, filename):
135         '''Generate a filename prefixed with our test ID.'''
136         id_filename = self.id() + '.' + filename
137         if id_filename not in self.cleanup_files:
138             self.cleanup_files.append(id_filename)
139         return id_filename
140
141     def kill_processes(self):
142         '''Kill any processes we've opened so far'''
143         for proc in self.processes:
144             try:
145                 proc.kill()
146             except:
147                 pass
148
149     def run(self, result=None):
150         # Subclass run() so that we can do the following:
151         # - Open our log file and add it to the cleanup list.
152         # - Check our result before and after the run so that we can tell
153         #   if the current test was successful.
154
155         # Probably not needed, but shouldn't hurt.
156         self.kill_processes()
157         self.processes = []
158         self.log_fname = self.filename_from_id('log')
159         # Our command line utilities generate UTF-8. The log file endcoding
160         # needs to match that.
161         # XXX newline='\n' works for now, but we might have to do more work
162         # to handle line endings in the future.
163         self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8', newline='\n')
164         self.cleanup_files.append(self.log_fname)
165         pre_run_problem_count = 0
166         if result:
167             pre_run_problem_count = len(result.failures) + len(result.errors)
168         try:
169             super(SubprocessTestCase, self).run(result=result)
170         except KeyboardInterrupt:
171             # XXX This doesn't seem to work on Windows, which is where we need it the most.
172             self.kill_processes()
173
174         # Tear down our test. We don't do this in tearDown() because Python 3
175         # updates "result" after calling tearDown().
176         self.kill_processes()
177         self.log_fd.close()
178         if result:
179             post_run_problem_count = len(result.failures) + len(result.errors)
180             if pre_run_problem_count != post_run_problem_count:
181                 self.dump_files.append(self.log_fname)
182                 # Leave some evidence behind.
183                 self.cleanup_files = []
184                 print('\nProcess output for {}:'.format(self.id()))
185                 with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
186                     for line in log_fd:
187                         sys.stdout.write(line)
188         for filename in self.cleanup_files:
189             try:
190                 os.unlink(filename)
191             except OSError:
192                 pass
193         self.cleanup_files = []
194
195     def getCaptureInfo(self, capinfos_args=None, cap_file=None):
196         '''Run capinfos on a capture file and log its output.
197
198         capinfos_args must be a sequence.
199         Default cap_file is <test id>.testout.pcap.'''
200         if not cap_file:
201             cap_file = self.filename_from_id('testout.pcap')
202         self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
203         capinfos_cmd = [config.cmd_capinfos]
204         if capinfos_args is not None:
205             capinfos_cmd += capinfos_args
206         capinfos_cmd.append(cap_file)
207         capinfos_data = subprocess.check_output(capinfos_cmd)
208         if sys.version_info[0] >= 3:
209             capinfos_stdout = capinfos_data.decode('UTF-8', 'replace')
210         else:
211             capinfos_stdout = unicode(capinfos_data, 'UTF-8', 'replace')
212         self.log_fd.write(capinfos_stdout)
213         return capinfos_stdout
214
215     def checkPacketCount(self, num_packets, cap_file=None):
216         '''Make sure a capture file contains a specific number of packets.'''
217         got_num_packets = False
218         capinfos_testout = self.getCaptureInfo(cap_file=cap_file)
219         count_pat = 'Number of packets:\s+{}'.format(num_packets)
220         if re.search(count_pat, capinfos_testout):
221             got_num_packets = True
222         self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
223
224     def countOutput(self, search_pat=None, count_stdout=True, count_stderr=False, proc=None):
225         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
226         match_count = 0
227         self.assertTrue(count_stdout or count_stderr, 'No output to count.')
228
229         if proc is None:
230             proc = self.processes[-1]
231
232         out_data = u''
233         if count_stdout:
234             out_data = proc.stdout_str
235         if count_stderr:
236             out_data += proc.stderr_str
237
238         if search_pat is None:
239             return len(out_data.splitlines())
240
241         search_re = re.compile(search_pat)
242         for line in out_data.splitlines():
243             if search_re.search(line):
244                 match_count += 1
245
246         return match_count
247
248     def grepOutput(self, search_pat, proc=None):
249         return self.countOutput(search_pat, count_stderr=True, proc=proc) > 0
250
251     def diffOutput(self, blob_a, blob_b, *args, **kwargs):
252         '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
253
254         blob_a and blob_b must be UTF-8 strings.'''
255         lines_a = blob_a.splitlines()
256         lines_b = blob_b.splitlines()
257         diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
258         if len(diff) > 0:
259             if sys.version_info[0] < 3 and not isinstance(diff, unicode):
260                 diff = unicode(diff, 'UTF-8', 'replace')
261             self.log_fd.flush()
262             self.log_fd.write(u'-- Begin diff output --\n')
263             self.log_fd.writelines(diff)
264             self.log_fd.write(u'-- End diff output --\n')
265             return False
266         return True
267
268     def startProcess(self, proc_args, env=None, shell=False):
269         '''Start a process in the background. Returns a subprocess.Popen object.
270
271         You typically wait for it using waitProcess() or assertWaitProcess().'''
272         if env is None:
273             # Avoid using the test user's real environment by default.
274             env = config.test_env
275         proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
276         self.processes.append(proc)
277         return proc
278
279     def waitProcess(self, process):
280         '''Wait for a process to finish.'''
281         process.wait_and_log()
282         # XXX The shell version ran processes using a script called run_and_catch_crashes
283         # which looked for core dumps and printed stack traces if found. We might want
284         # to do something similar here. This may not be easy on modern Ubuntu systems,
285         # which default to using Apport: https://wiki.ubuntu.com/Apport
286
287     def assertWaitProcess(self, process, expected_return=0):
288         '''Wait for a process to finish and check its exit code.'''
289         process.wait_and_log()
290         self.assertEqual(process.returncode, expected_return)
291
292     def runProcess(self, args, env=None, shell=False):
293         '''Start a process and wait for it to finish.'''
294         process = self.startProcess(args, env=env, shell=shell)
295         process.wait_and_log()
296         return process
297
298     def assertRun(self, args, env=None, shell=False, expected_return=0):
299         '''Start a process and wait for it to finish. Check its return code.'''
300         process = self.runProcess(args, env=env, shell=shell)
301         self.assertEqual(process.returncode, expected_return)
302         return process