2 # subunit: extensions to python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Thomi Richards <thomi.richards@canonical.com>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
19 from collections import namedtuple
21 from functools import partial
22 from io import BytesIO
23 from tempfile import NamedTemporaryFile
24 from testtools import TestCase
25 from testtools.matchers import (
32 from testtools.testresult.doubles import StreamResult
34 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
35 from subunit._output import (
38 translate_command_name,
42 import subunit._output as _o
45 class SafeArgumentParser(argparse.ArgumentParser):
47 def exit(self, status=0, message=""):
48 raise RuntimeError("ArgumentParser requested to exit with status "\
49 " %d and message %r" % (status, message))
52 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
55 class OutputFilterArgumentTests(TestCase):
57 """Tests for the command line argument parser."""
59 _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
61 def _test_command(self, command, test_id):
62 args = safe_parse_arguments(args=[command, test_id])
64 self.assertThat(args.action, Equals(command))
65 self.assertThat(args.test_id, Equals(test_id))
67 def test_can_parse_all_commands_with_test_id(self):
68 for command in self._all_supported_commands:
69 self._test_command(command, self.getUniqueString())
71 def test_command_translation(self):
72 self.assertThat(translate_command_name('start'), Equals('inprogress'))
73 self.assertThat(translate_command_name('pass'), Equals('success'))
74 for command in ('fail', 'skip', 'exists'):
75 self.assertThat(translate_command_name(command), Equals(command))
77 def test_all_commands_parse_file_attachment(self):
78 with NamedTemporaryFile() as tmp_file:
79 for command in self._all_supported_commands:
80 args = safe_parse_arguments(
81 args=[command, 'foo', '--attach-file', tmp_file.name]
83 self.assertThat(args.attach_file, IsInstance(file))
84 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
87 class ByteStreamCompatibilityTests(TestCase):
89 _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
92 super(ByteStreamCompatibilityTests, self).setUp()
93 self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
95 def _get_result_for(self, *commands):
96 """Get a result object from *commands.
98 Runs the 'generate_bytestream' function from subunit._output after
99 parsing *commands as if they were specified on the command line. The
100 resulting bytestream is then converted back into a result object and
106 for command_list in commands:
107 args = safe_parse_arguments(command_list)
108 output_writer = StreamResultToBytes(output_stream=stream)
109 generate_bytestream(args, output_writer)
113 case = ByteStreamToStreamResult(source=stream)
114 result = StreamResult()
118 def test_start_generates_inprogress(self):
119 result = self._get_result_for(
128 test_status='inprogress',
129 timestamp=self._dummy_timestamp,
133 def test_pass_generates_success(self):
134 result = self._get_result_for(
143 test_status='success',
144 timestamp=self._dummy_timestamp,
148 def test_fail_generates_fail(self):
149 result = self._get_result_for(
159 timestamp=self._dummy_timestamp,
163 def test_skip_generates_skip(self):
164 result = self._get_result_for(
174 timestamp=self._dummy_timestamp,
178 def test_exists_generates_exists(self):
179 result = self._get_result_for(
188 test_status='exists',
189 timestamp=self._dummy_timestamp,
194 class FileChunkingTests(TestCase):
196 def _write_chunk_file(self, file_data, chunk_size):
197 """Write chunked data to a subunit stream, return a StreamResult object."""
199 output_writer = StreamResultToBytes(output_stream=stream)
201 with NamedTemporaryFile() as f:
205 write_chunked_file(f, 'foo_test', output_writer, chunk_size)
209 case = ByteStreamToStreamResult(source=stream)
210 result = StreamResult()
214 def test_file_chunk_size_is_honored(self):
215 result = self._write_chunk_file("Hello", 1)
219 MatchesCall(call='status', file_bytes='H', eof=False),
220 MatchesCall(call='status', file_bytes='e', eof=False),
221 MatchesCall(call='status', file_bytes='l', eof=False),
222 MatchesCall(call='status', file_bytes='l', eof=False),
223 MatchesCall(call='status', file_bytes='o', eof=False),
224 MatchesCall(call='status', file_bytes='', eof=True),
228 class MatchesCall(Matcher):
244 def __init__(self, **kwargs):
245 unknown_kwargs = filter(
246 lambda k: k not in self._position_lookup,
250 raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
251 self._filters = kwargs
253 def match(self, call_tuple):
254 for k,v in self._filters.items():
256 if call_tuple[self._position_lookup[k]] != v:
257 return Mismatch("Value for key is %r, not %r" % (self._position_lookup[k], v))
259 return Mismatch("Key %s is not present." % k)
262 return "<MatchesCall %r>" % self._filters