2 # subunit: extensions to python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Thomi Richards <thomi.richards@canonical.com>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
19 from collections import namedtuple
21 from functools import partial
22 from io import BytesIO
23 from tempfile import NamedTemporaryFile
24 from testtools import TestCase
25 from testtools.matchers import (
32 from testtools.testresult.doubles import StreamResult
34 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
35 from subunit._output import (
38 translate_command_name,
42 import subunit._output as _o
45 class SafeArgumentParser(argparse.ArgumentParser):
47 def exit(self, status=0, message=""):
48 raise RuntimeError("ArgumentParser requested to exit with status "\
49 " %d and message %r" % (status, message))
52 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
55 class OutputFilterArgumentTests(TestCase):
57 """Tests for the command line argument parser."""
59 _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
61 def _test_command(self, command, test_id):
62 args = safe_parse_arguments(args=[command, test_id])
64 self.assertThat(args.action, Equals(command))
65 self.assertThat(args.test_id, Equals(test_id))
67 def test_can_parse_all_commands_with_test_id(self):
68 for command in self._all_supported_commands:
69 self._test_command(command, self.getUniqueString())
71 def test_command_translation(self):
72 self.assertThat(translate_command_name('start'), Equals('inprogress'))
73 self.assertThat(translate_command_name('pass'), Equals('success'))
74 for command in ('fail', 'skip', 'exists'):
75 self.assertThat(translate_command_name(command), Equals(command))
77 def test_all_commands_parse_file_attachment(self):
78 with NamedTemporaryFile() as tmp_file:
79 for command in self._all_supported_commands:
80 args = safe_parse_arguments(
81 args=[command, 'foo', '--attach-file', tmp_file.name]
83 self.assertThat(args.attach_file, IsInstance(file))
84 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
86 def test_all_commands_accept_mimetype_argument(self):
87 for command in self._all_supported_commands:
88 args = safe_parse_arguments(
89 args=[command, 'foo', '--mimetype', "text/plain"]
91 self.assertThat(args.mimetype, Equals("text/plain"))
94 class ByteStreamCompatibilityTests(TestCase):
96 _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
99 super(ByteStreamCompatibilityTests, self).setUp()
100 self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
102 def _get_result_for(self, *commands):
103 """Get a result object from *commands.
105 Runs the 'generate_bytestream' function from subunit._output after
106 parsing *commands as if they were specified on the command line. The
107 resulting bytestream is then converted back into a result object and
113 for command_list in commands:
114 args = safe_parse_arguments(command_list)
115 output_writer = StreamResultToBytes(output_stream=stream)
116 generate_bytestream(args, output_writer)
120 case = ByteStreamToStreamResult(source=stream)
121 result = StreamResult()
125 def test_start_generates_inprogress(self):
126 result = self._get_result_for(
135 test_status='inprogress',
136 timestamp=self._dummy_timestamp,
140 def test_pass_generates_success(self):
141 result = self._get_result_for(
150 test_status='success',
151 timestamp=self._dummy_timestamp,
155 def test_fail_generates_fail(self):
156 result = self._get_result_for(
166 timestamp=self._dummy_timestamp,
170 def test_skip_generates_skip(self):
171 result = self._get_result_for(
181 timestamp=self._dummy_timestamp,
185 def test_exists_generates_exists(self):
186 result = self._get_result_for(
195 test_status='exists',
196 timestamp=self._dummy_timestamp,
201 class FileChunkingTests(TestCase):
203 def _write_chunk_file(self, file_data, chunk_size, mimetype=None):
204 """Write chunked data to a subunit stream, return a StreamResult object."""
206 output_writer = StreamResultToBytes(output_stream=stream)
208 with NamedTemporaryFile() as f:
212 write_chunked_file(f, 'foo_test', output_writer, chunk_size, mimetype)
216 case = ByteStreamToStreamResult(source=stream)
217 result = StreamResult()
221 def test_file_chunk_size_is_honored(self):
222 result = self._write_chunk_file("Hello", 1)
226 MatchesCall(call='status', file_bytes='H', mime_type=None, eof=False),
227 MatchesCall(call='status', file_bytes='e', mime_type=None, eof=False),
228 MatchesCall(call='status', file_bytes='l', mime_type=None, eof=False),
229 MatchesCall(call='status', file_bytes='l', mime_type=None, eof=False),
230 MatchesCall(call='status', file_bytes='o', mime_type=None, eof=False),
231 MatchesCall(call='status', file_bytes='', mime_type=None, eof=True),
235 def test_file_mimetype_is_honored(self):
236 result = self._write_chunk_file("SomeData", 1024, "text/plain")
240 MatchesCall(call='status', file_bytes='SomeData', mime_type="text/plain"),
241 MatchesCall(call='status', file_bytes='', mime_type="text/plain"),
246 class MatchesCall(Matcher):
262 def __init__(self, **kwargs):
263 unknown_kwargs = filter(
264 lambda k: k not in self._position_lookup,
268 raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
269 self._filters = kwargs
271 def match(self, call_tuple):
272 for k,v in self._filters.items():
274 pos = self._position_lookup[k]
275 if call_tuple[pos] != v:
276 return Mismatch("Value for key is %r, not %r" % (call_tuple[pos], v))
278 return Mismatch("Key %s is not present." % k)
281 return "<MatchesCall %r>" % self._filters