2 # subunit: extensions to python unittest to get test results from subprocesses.
3 # Copyright (C) 2013 'Subunit Contributors'
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
20 from functools import partial
21 from io import BytesIO, StringIO
23 from tempfile import NamedTemporaryFile
25 from testscenarios import WithScenarios
26 from testtools import TestCase
27 from testtools.compat import _b, _u
28 from testtools.matchers import (
36 from testtools.testresult.doubles import StreamResult
38 from subunit.iso8601 import UTC
39 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
40 from subunit._output import (
45 import subunit._output as _o
48 class SafeArgumentParser(argparse.ArgumentParser):
49 """An ArgumentParser class that doesn't call sys.exit."""
51 def exit(self, status=0, message=""):
52 raise RuntimeError(message)
55 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
58 class TestStatusArgParserTests(WithScenarios, TestCase):
61 (cmd, dict(command=cmd, option='--' + cmd)) for cmd in (
72 def test_can_parse_all_commands_with_test_id(self):
73 test_id = self.getUniqueString()
74 args = safe_parse_arguments(args=[self.option, test_id])
76 self.assertThat(args.action, Equals(self.command))
77 self.assertThat(args.test_id, Equals(test_id))
79 def test_all_commands_parse_file_attachment(self):
80 with NamedTemporaryFile() as tmp_file:
81 args = safe_parse_arguments(
82 args=[self.option, 'foo', '--attach-file', tmp_file.name]
84 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
86 def test_all_commands_accept_mimetype_argument(self):
87 with NamedTemporaryFile() as tmp_file:
88 args = safe_parse_arguments(
89 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
91 self.assertThat(args.mimetype, Equals("text/plain"))
93 def test_all_commands_accept_tags_argument(self):
94 args = safe_parse_arguments(
95 args=[self.option, 'foo', '--tags', "foo,bar,baz"]
97 self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
99 def test_attach_file_with_hyphen_opens_stdin(self):
100 self.patch(_o, 'stdin', StringIO(_u("Hello")))
101 args = safe_parse_arguments(
102 args=[self.option, "foo", "--attach-file", "-"]
105 self.assertThat(args.attach_file.read(), Equals("Hello"))
108 class ArgParserTests(TestCase):
111 super(ArgParserTests, self).setUp()
112 # prevent ARgumentParser from printing to stderr:
113 self._stderr = BytesIO()
114 self.patch(argparse._sys, 'stderr', self._stderr)
116 def test_can_parse_attach_file_without_test_id(self):
117 with NamedTemporaryFile() as tmp_file:
118 args = safe_parse_arguments(
119 args=["--attach-file", tmp_file.name]
121 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
123 def test_cannot_specify_more_than_one_status_command(self):
124 fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
127 raises(RuntimeError('subunit-output: error: argument --skip: '\
128 'Only one status may be specified at once.\n'))
131 def test_cannot_specify_mimetype_without_attach_file(self):
132 fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
135 raises(RuntimeError('subunit-output: error: Cannot specify '\
136 '--mimetype without --attach-file\n'))
139 def test_cannot_specify_filename_without_attach_file(self):
140 fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
143 raises(RuntimeError('subunit-output: error: Cannot specify '\
144 '--file-name without --attach-file\n'))
147 def test_cannot_specify_tags_without_status_command(self):
148 fn = lambda: safe_parse_arguments(['--tags', 'foo'])
151 raises(RuntimeError('subunit-output: error: Cannot specify '\
152 '--tags without a status command\n'))
156 def get_result_for(commands):
157 """Get a result object from *commands.
159 Runs the 'generate_bytestream' function from subunit._output after
160 parsing *commands as if they were specified on the command line. The
161 resulting bytestream is then converted back into a result object and
166 args = safe_parse_arguments(commands)
167 output_writer = StreamResultToBytes(output_stream=stream)
168 generate_bytestream(args, output_writer)
172 case = ByteStreamToStreamResult(source=stream)
173 result = StreamResult()
178 class ByteStreamCompatibilityTests(WithScenarios, TestCase):
181 (s, dict(status=s, option='--' + s)) for s in (
192 _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
195 super(ByteStreamCompatibilityTests, self).setUp()
196 self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
199 def test_correct_status_is_generated(self):
200 result = get_result_for([self.option, 'foo'])
207 test_status=self.status,
208 timestamp=self._dummy_timestamp,
212 def test_all_commands_accept_tags(self):
213 result = get_result_for([self.option, 'foo', '--tags', 'hello,world'])
219 test_tags=set(['hello', 'world']),
220 timestamp=self._dummy_timestamp,
225 class FileChunkingTests(WithScenarios, TestCase):
228 ("With test_id", dict(test_id="foo")),
229 ("Without test_id", dict(test_id=None)),
232 def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None, test_id=None):
233 """Write file data to a subunit stream, get a StreamResult object."""
235 output_writer = StreamResultToBytes(output_stream=stream)
237 with NamedTemporaryFile() as f:
238 self._tmp_filename = f.name
244 output_writer=output_writer,
245 chunk_size=chunk_size,
253 case = ByteStreamToStreamResult(source=stream)
254 result = StreamResult()
258 def test_file_chunk_size_is_honored(self):
259 result = self._write_chunk_file(
260 file_data=_b("Hello"),
262 test_id=self.test_id,
267 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('H'), mime_type=None, eof=False),
268 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('e'), mime_type=None, eof=False),
269 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
270 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
271 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('o'), mime_type=None, eof=False),
272 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type=None, eof=True),
276 def test_file_mimetype_is_honored(self):
277 result = self._write_chunk_file(
278 file_data=_b("SomeData"),
279 mimetype="text/plain",
280 test_id=self.test_id,
285 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('SomeData'), mime_type="text/plain"),
286 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type="text/plain"),
290 def test_file_name_is_honored(self):
291 result = self._write_chunk_file(
292 file_data=_b("data"),
293 filename="/some/name",
299 MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
300 MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
304 def test_default_filename_is_used(self):
305 result = self._write_chunk_file(file_data=_b("data"))
309 MatchesCall(call='status', file_name=self._tmp_filename),
310 MatchesCall(call='status', file_name=self._tmp_filename),
315 class MatchesCall(Matcher):
331 def __init__(self, **kwargs):
332 unknown_kwargs = list(filter(
333 lambda k: k not in self._position_lookup,
337 raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
338 self._filters = kwargs
340 def match(self, call_tuple):
341 for k,v in self._filters.items():
343 pos = self._position_lookup[k]
344 if call_tuple[pos] != v:
346 "Value for key is %r, not %r" % (call_tuple[pos], v)
349 return Mismatch("Key %s is not present." % k)
352 return "<MatchesCall %r>" % self._filters