102b97043cfab9bd9e963741d731165181abd177
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013 'Subunit Contributors'
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17
18 import argparse
19 import datetime
20 from functools import partial
21 from io import BytesIO, StringIO
22 import sys
23 from tempfile import NamedTemporaryFile
24
25 from testscenarios import WithScenarios
26 from testtools import TestCase
27 from testtools.compat import _b, _u
28 from testtools.matchers import (
29     Equals,
30     IsInstance,
31     Matcher,
32     MatchesListwise,
33     Mismatch,
34     raises,
35 )
36 from testtools.testresult.doubles import StreamResult
37
38 from subunit.iso8601 import UTC
39 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
40 from subunit._output import (
41     generate_bytestream,
42     parse_arguments,
43     write_chunked_file,
44 )
45 import subunit._output as _o
46
47
48 class SafeArgumentParser(argparse.ArgumentParser):
49     """An ArgumentParser class that doesn't call sys.exit."""
50
51     def exit(self, status=0, message=""):
52         raise RuntimeError(message)
53
54
55 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
56
57
58 class TestStatusArgParserTests(WithScenarios, TestCase):
59
60     scenarios = [
61         (cmd, dict(command=cmd, option='--' + cmd)) for cmd in (
62             'exists',
63             'fail',
64             'inprogress',
65             'skip',
66             'success',
67             'uxsuccess',
68             'xfail',
69         )
70     ]
71
72     def test_can_parse_all_commands_with_test_id(self):
73         test_id = self.getUniqueString()
74         args = safe_parse_arguments(args=[self.option, test_id])
75
76         self.assertThat(args.action, Equals(self.command))
77         self.assertThat(args.test_id, Equals(test_id))
78
79     def test_all_commands_parse_file_attachment(self):
80         with NamedTemporaryFile() as tmp_file:
81             args = safe_parse_arguments(
82                 args=[self.option, 'foo', '--attach-file', tmp_file.name]
83             )
84             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
85
86     def test_all_commands_accept_mimetype_argument(self):
87         with NamedTemporaryFile() as tmp_file:
88             args = safe_parse_arguments(
89                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
90             )
91             self.assertThat(args.mimetype, Equals("text/plain"))
92
93     def test_all_commands_accept_tags_argument(self):
94         args = safe_parse_arguments(
95             args=[self.option, 'foo', '--tags', "foo,bar,baz"]
96         )
97         self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
98
99     def test_attach_file_with_hyphen_opens_stdin(self):
100         self.patch(_o, 'stdin', StringIO(_u("Hello")))
101         args = safe_parse_arguments(
102             args=[self.option, "foo", "--attach-file", "-"]
103         )
104
105         self.assertThat(args.attach_file.read(), Equals("Hello"))
106
107
108 class ArgParserTests(TestCase):
109
110     def setUp(self):
111         super(ArgParserTests, self).setUp()
112         # prevent ARgumentParser from printing to stderr:
113         self._stderr = BytesIO()
114         self.patch(argparse._sys, 'stderr', self._stderr)
115
116     def test_can_parse_attach_file_without_test_id(self):
117         with NamedTemporaryFile() as tmp_file:
118             args = safe_parse_arguments(
119                 args=["--attach-file", tmp_file.name]
120             )
121             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
122
123     def test_cannot_specify_more_than_one_status_command(self):
124         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
125         self.assertThat(
126             fn,
127             raises(RuntimeError('subunit-output: error: argument --skip: '\
128                 'Only one status may be specified at once.\n'))
129         )
130
131     def test_cannot_specify_mimetype_without_attach_file(self):
132         fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
133         self.assertThat(
134             fn,
135             raises(RuntimeError('subunit-output: error: Cannot specify '\
136                 '--mimetype without --attach-file\n'))
137         )
138
139     def test_cannot_specify_filename_without_attach_file(self):
140         fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
141         self.assertThat(
142             fn,
143             raises(RuntimeError('subunit-output: error: Cannot specify '\
144                 '--file-name without --attach-file\n'))
145         )
146
147     def test_cannot_specify_tags_without_status_command(self):
148         fn = lambda: safe_parse_arguments(['--tags', 'foo'])
149         self.assertThat(
150             fn,
151             raises(RuntimeError('subunit-output: error: Cannot specify '\
152                 '--tags without a status command\n'))
153         )
154
155
156 def get_result_for(commands):
157     """Get a result object from *commands.
158
159     Runs the 'generate_bytestream' function from subunit._output after
160     parsing *commands as if they were specified on the command line. The
161     resulting bytestream is then converted back into a result object and
162     returned.
163     """
164     stream = BytesIO()
165
166     args = safe_parse_arguments(commands)
167     output_writer = StreamResultToBytes(output_stream=stream)
168     generate_bytestream(args, output_writer)
169
170     stream.seek(0)
171
172     case = ByteStreamToStreamResult(source=stream)
173     result = StreamResult()
174     case.run(result)
175     return result
176
177
178 class ByteStreamCompatibilityTests(WithScenarios, TestCase):
179
180     scenarios = [
181         (s, dict(status=s, option='--' + s)) for s in (
182             'exists',
183             'fail',
184             'inprogress',
185             'skip',
186             'success',
187             'uxsuccess',
188             'xfail',
189         )
190     ]
191
192     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
193
194     def setUp(self):
195         super(ByteStreamCompatibilityTests, self).setUp()
196         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
197
198
199     def test_correct_status_is_generated(self):
200         result = get_result_for([self.option, 'foo'])
201
202         self.assertThat(
203             result._events[0],
204             MatchesCall(
205                 call='status',
206                 test_id='foo',
207                 test_status=self.status,
208                 timestamp=self._dummy_timestamp,
209             )
210         )
211
212     def test_all_commands_accept_tags(self):
213         result = get_result_for([self.option, 'foo', '--tags', 'hello,world'])
214         self.assertThat(
215             result._events[0],
216             MatchesCall(
217                 call='status',
218                 test_id='foo',
219                 test_tags=set(['hello', 'world']),
220                 timestamp=self._dummy_timestamp,
221             )
222         )
223
224
225 class FileChunkingTests(WithScenarios, TestCase):
226
227     scenarios = [
228         ("With test_id", dict(test_id="foo")),
229         ("Without test_id", dict(test_id=None)),
230     ]
231
232     def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None, test_id=None):
233         """Write file data to a subunit stream, get a StreamResult object."""
234         stream = BytesIO()
235         output_writer = StreamResultToBytes(output_stream=stream)
236
237         with NamedTemporaryFile() as f:
238             self._tmp_filename = f.name
239             f.write(file_data)
240             f.seek(0)
241
242             write_chunked_file(
243                 file_obj=f,
244                 output_writer=output_writer,
245                 chunk_size=chunk_size,
246                 mime_type=mimetype,
247                 test_id=test_id,
248                 file_name=filename,
249             )
250
251         stream.seek(0)
252
253         case = ByteStreamToStreamResult(source=stream)
254         result = StreamResult()
255         case.run(result)
256         return result
257
258     def test_file_chunk_size_is_honored(self):
259         result = self._write_chunk_file(
260             file_data=_b("Hello"),
261             chunk_size=1,
262             test_id=self.test_id,
263         )
264         self.assertThat(
265             result._events,
266             MatchesListwise([
267                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('H'), mime_type=None, eof=False),
268                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('e'), mime_type=None, eof=False),
269                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
270                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
271                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('o'), mime_type=None, eof=False),
272                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type=None, eof=True),
273             ])
274         )
275
276     def test_file_mimetype_is_honored(self):
277         result = self._write_chunk_file(
278             file_data=_b("SomeData"),
279             mimetype="text/plain",
280             test_id=self.test_id,
281         )
282         self.assertThat(
283             result._events,
284             MatchesListwise([
285                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('SomeData'), mime_type="text/plain"),
286                 MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type="text/plain"),
287             ])
288         )
289
290     def test_file_name_is_honored(self):
291         result = self._write_chunk_file(
292             file_data=_b("data"),
293             filename="/some/name",
294             test_id=self.test_id
295         )
296         self.assertThat(
297             result._events,
298             MatchesListwise([
299                 MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
300                 MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
301             ])
302         )
303
304     def test_default_filename_is_used(self):
305         result = self._write_chunk_file(file_data=_b("data"))
306         self.assertThat(
307             result._events,
308             MatchesListwise([
309                 MatchesCall(call='status', file_name=self._tmp_filename),
310                 MatchesCall(call='status', file_name=self._tmp_filename),
311             ])
312         )
313
314
315 class MatchesCall(Matcher):
316
317     _position_lookup = {
318             'call': 0,
319             'test_id': 1,
320             'test_status': 2,
321             'test_tags': 3,
322             'runnable': 4,
323             'file_name': 5,
324             'file_bytes': 6,
325             'eof': 7,
326             'mime_type': 8,
327             'route_code': 9,
328             'timestamp': 10,
329         }
330
331     def __init__(self, **kwargs):
332         unknown_kwargs = list(filter(
333             lambda k: k not in self._position_lookup,
334             kwargs
335         ))
336         if unknown_kwargs:
337             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
338         self._filters = kwargs
339
340     def match(self, call_tuple):
341         for k,v in self._filters.items():
342             try:
343                 pos = self._position_lookup[k]
344                 if call_tuple[pos] != v:
345                     return Mismatch(
346                         "Value for key is %r, not %r" % (call_tuple[pos], v)
347                     )
348             except IndexError:
349                 return Mismatch("Key %s is not present." % k)
350
351     def __str__(self):
352         return "<MatchesCall %r>" % self._filters
353