Don't make tests convert to and from bytes. Instead, just use a StreamResult double...
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013 Subunit Contributors
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 import datetime
18 from functools import partial
19 from io import BytesIO, StringIO, TextIOWrapper
20 import optparse
21 import sys
22 from tempfile import NamedTemporaryFile
23
24 from contextlib import contextmanager
25 from testtools import TestCase
26 from testtools.compat import _u
27 from testtools.matchers import (
28     Equals,
29     Matcher,
30     MatchesListwise,
31     Mismatch,
32     raises,
33 )
34 from testtools.testresult.doubles import StreamResult
35
36 from subunit.iso8601 import UTC
37 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
38 from subunit._output import (
39     _ALL_ACTIONS,
40     _FINAL_ACTIONS,
41     generate_stream_results,
42     parse_arguments,
43 )
44 import subunit._output as _o
45
46
47 class SafeOptionParser(optparse.OptionParser):
48     """An ArgumentParser class that doesn't call sys.exit."""
49
50     def exit(self, status=0, message=""):
51         raise RuntimeError(message)
52
53     def error(self, message):
54         raise RuntimeError(message)
55
56
57 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeOptionParser)
58
59
60 class TestStatusArgParserTests(TestCase):
61
62     scenarios = [
63         (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS
64     ]
65
66     def test_can_parse_all_commands_with_test_id(self):
67         test_id = self.getUniqueString()
68         args = safe_parse_arguments(args=[self.option, test_id])
69
70         self.assertThat(args.action, Equals(self.command))
71         self.assertThat(args.test_id, Equals(test_id))
72
73     def test_all_commands_parse_file_attachment(self):
74         with NamedTemporaryFile() as tmp_file:
75             args = safe_parse_arguments(
76                 args=[self.option, 'foo', '--attach-file', tmp_file.name]
77             )
78             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
79
80     def test_all_commands_accept_mimetype_argument(self):
81         with NamedTemporaryFile() as tmp_file:
82             args = safe_parse_arguments(
83                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
84             )
85             self.assertThat(args.mimetype, Equals("text/plain"))
86
87     def test_all_commands_accept_file_name_argument(self):
88         with NamedTemporaryFile() as tmp_file:
89             args = safe_parse_arguments(
90                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--file-name', "foo"]
91             )
92             self.assertThat(args.file_name, Equals("foo"))
93
94     def test_all_commands_accept_tags_argument(self):
95         args = safe_parse_arguments(
96             args=[self.option, 'foo', '--tag', "foo", "--tag", "bar", "--tag", "baz"]
97         )
98         self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
99
100     def test_attach_file_with_hyphen_opens_stdin(self):
101         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"Hello")))
102         args = safe_parse_arguments(
103             args=[self.option, "foo", "--attach-file", "-"]
104         )
105
106         self.assertThat(args.attach_file.read(), Equals(b"Hello"))
107
108     def test_attach_file_with_hyphen_sets_filename_to_stdin(self):
109         args = safe_parse_arguments(
110             args=[self.option, "foo", "--attach-file", "-"]
111         )
112
113         self.assertThat(args.file_name, Equals("stdin"))
114
115     def test_can_override_stdin_filename(self):
116         args = safe_parse_arguments(
117             args=[self.option, "foo", "--attach-file", "-", '--file-name', 'foo']
118         )
119
120         self.assertThat(args.file_name, Equals("foo"))
121
122     def test_requires_test_id(self):
123         fn = lambda: safe_parse_arguments(args=[self.option])
124         self.assertThat(
125             fn,
126             raises(RuntimeError('argument %s: must specify a single TEST_ID.' % self.option))
127         )
128
129
130 class ArgParserTests(TestCase):
131
132     def test_can_parse_attach_file_without_test_id(self):
133         with NamedTemporaryFile() as tmp_file:
134             args = safe_parse_arguments(
135                 args=["--attach-file", tmp_file.name]
136             )
137             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
138
139     def test_can_run_without_args(self):
140         args = safe_parse_arguments([])
141
142     def test_cannot_specify_more_than_one_status_command(self):
143         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
144         self.assertThat(
145             fn,
146             raises(RuntimeError('argument --skip: Only one status may be specified at once.'))
147         )
148
149     def test_cannot_specify_mimetype_without_attach_file(self):
150         fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
151         self.assertThat(
152             fn,
153             raises(RuntimeError('Cannot specify --mimetype without --attach-file'))
154         )
155
156     def test_cannot_specify_filename_without_attach_file(self):
157         fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
158         self.assertThat(
159             fn,
160             raises(RuntimeError('Cannot specify --file-name without --attach-file'))
161         )
162
163     def test_can_specify_tags_without_status_command(self):
164         args = safe_parse_arguments(['--tag', 'foo'])
165         self.assertEqual(['foo'], args.tags)
166
167     def test_must_specify_tags_with_tags_options(self):
168         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--tag'])
169         self.assertThat(
170             fn,
171             raises(RuntimeError('--tag option requires 1 argument'))
172         )
173
174
175 def get_result_for(commands):
176     """Get a result object from *commands.
177
178     Runs the 'generate_stream_results' function from subunit._output after
179     parsing *commands as if they were specified on the command line. The
180     resulting bytestream is then converted back into a result object and
181     returned.
182     """
183     result = StreamResult()
184     args = safe_parse_arguments(commands)
185     generate_stream_results(args, result)
186     return result
187
188
189 @contextmanager
190 def temp_file_contents(data):
191     """Create a temporary file on disk containing 'data'."""
192     with NamedTemporaryFile() as f:
193         f.write(data)
194         f.seek(0)
195         yield f
196
197
198 class StatusStreamResultTests(TestCase):
199
200     scenarios = [
201         (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS
202     ]
203
204     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
205
206     def setUp(self):
207         super(StatusStreamResultTests, self).setUp()
208         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
209         self.test_id = self.getUniqueString()
210
211     def test_only_one_packet_is_generated(self):
212         result = get_result_for([self.option, self.test_id])
213         self.assertThat(
214             len(result._events),
215             Equals(3) # startTestRun and stopTestRun are also called, making 3 total.
216         )
217
218     def test_correct_status_is_generated(self):
219         result = get_result_for([self.option, self.test_id])
220
221         self.assertThat(
222             result._events[1],
223             MatchesStatusCall(test_status=self.status)
224         )
225
226     def test_all_commands_generate_tags(self):
227         result = get_result_for([self.option, self.test_id, '--tag', 'hello', '--tag', 'world'])
228         self.assertThat(
229             result._events[1],
230             MatchesStatusCall(test_tags=set(['hello', 'world']))
231         )
232
233     def test_all_commands_generate_timestamp(self):
234         result = get_result_for([self.option, self.test_id])
235
236         self.assertThat(
237             result._events[1],
238             MatchesStatusCall(timestamp=self._dummy_timestamp)
239         )
240
241     def test_all_commands_generate_correct_test_id(self):
242         result = get_result_for([self.option, self.test_id])
243
244         self.assertThat(
245             result._events[1],
246             MatchesStatusCall(test_id=self.test_id)
247         )
248
249     def test_file_is_sent_in_single_packet(self):
250         with temp_file_contents(b"Hello") as f:
251             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
252
253             self.assertThat(
254                 result._events,
255                 MatchesListwise([
256                     MatchesStatusCall(call='startTestRun'),
257                     MatchesStatusCall(file_bytes=b'Hello', eof=True),
258                     MatchesStatusCall(call='stopTestRun'),
259                 ])
260             )
261
262     def test_can_read_binary_files(self):
263         with temp_file_contents(b"\xDE\xAD\xBE\xEF") as f:
264             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
265
266             self.assertThat(
267                 result._events,
268                 MatchesListwise([
269                     MatchesStatusCall(call='startTestRun'),
270                     MatchesStatusCall(file_bytes=b"\xDE\xAD\xBE\xEF", eof=True),
271                     MatchesStatusCall(call='stopTestRun'),
272                 ])
273             )
274
275     def test_can_read_empty_files(self):
276         with temp_file_contents(b"") as f:
277             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
278
279             self.assertThat(
280                 result._events,
281                 MatchesListwise([
282                     MatchesStatusCall(call='startTestRun'),
283                     MatchesStatusCall(file_bytes=b"", file_name=f.name, eof=True),
284                     MatchesStatusCall(call='stopTestRun'),
285                 ])
286             )
287
288     def test_can_read_stdin(self):
289         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"\xFE\xED\xFA\xCE")))
290         result = get_result_for([self.option, self.test_id, '--attach-file', '-'])
291
292         self.assertThat(
293             result._events,
294             MatchesListwise([
295                 MatchesStatusCall(call='startTestRun'),
296                 MatchesStatusCall(file_bytes=b"\xFE\xED\xFA\xCE", file_name='stdin', eof=True),
297                 MatchesStatusCall(call='stopTestRun'),
298             ])
299         )
300
301     def test_file_is_sent_with_test_id(self):
302         with temp_file_contents(b"Hello") as f:
303             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
304
305             self.assertThat(
306                 result._events,
307                 MatchesListwise([
308                     MatchesStatusCall(call='startTestRun'),
309                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True),
310                     MatchesStatusCall(call='stopTestRun'),
311                 ])
312             )
313
314     def test_file_is_sent_with_test_status(self):
315         with temp_file_contents(b"Hello") as f:
316             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
317
318             self.assertThat(
319                 result._events,
320                 MatchesListwise([
321                     MatchesStatusCall(call='startTestRun'),
322                     MatchesStatusCall(test_status=self.status, file_bytes=b'Hello', eof=True),
323                     MatchesStatusCall(call='stopTestRun'),
324                 ])
325             )
326
327     def test_file_chunk_size_is_honored(self):
328         with temp_file_contents(b"Hello") as f:
329             self.patch(_o, '_CHUNK_SIZE', 1)
330             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
331
332             self.assertThat(
333                 result._events,
334                 MatchesListwise([
335                     MatchesStatusCall(call='startTestRun'),
336                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False),
337                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False),
338                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
339                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
340                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True),
341                     MatchesStatusCall(call='stopTestRun'),
342                 ])
343             )
344
345     def test_file_mimetype_specified_once_only(self):
346         with temp_file_contents(b"Hi") as f:
347             self.patch(_o, '_CHUNK_SIZE', 1)
348             result = get_result_for([
349                 self.option,
350                 self.test_id,
351                 '--attach-file',
352                 f.name,
353                 '--mimetype',
354                 'text/plain',
355             ])
356
357             self.assertThat(
358                 result._events,
359                 MatchesListwise([
360                     MatchesStatusCall(call='startTestRun'),
361                     MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False),
362                     MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True),
363                     MatchesStatusCall(call='stopTestRun'),
364                 ])
365             )
366
367     def test_tags_specified_once_only(self):
368         with temp_file_contents(b"Hi") as f:
369             self.patch(_o, '_CHUNK_SIZE', 1)
370             result = get_result_for([
371                 self.option,
372                 self.test_id,
373                 '--attach-file',
374                 f.name,
375                 '--tag',
376                 'foo',
377                 '--tag',
378                 'bar',
379             ])
380
381             self.assertThat(
382                 result._events,
383                 MatchesListwise([
384                     MatchesStatusCall(call='startTestRun'),
385                     MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])),
386                     MatchesStatusCall(test_id=self.test_id, test_tags=None),
387                     MatchesStatusCall(call='stopTestRun'),
388                 ])
389             )
390
391     def test_timestamp_specified_once_only(self):
392         with temp_file_contents(b"Hi") as f:
393             self.patch(_o, '_CHUNK_SIZE', 1)
394             result = get_result_for([
395                 self.option,
396                 self.test_id,
397                 '--attach-file',
398                 f.name,
399             ])
400
401             self.assertThat(
402                 result._events,
403                 MatchesListwise([
404                     MatchesStatusCall(call='startTestRun'),
405                     MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp),
406                     MatchesStatusCall(test_id=self.test_id, timestamp=None),
407                     MatchesStatusCall(call='stopTestRun'),
408                 ])
409             )
410
411     def test_test_status_specified_once_only(self):
412         with temp_file_contents(b"Hi") as f:
413             self.patch(_o, '_CHUNK_SIZE', 1)
414             result = get_result_for([
415                 self.option,
416                 self.test_id,
417                 '--attach-file',
418                 f.name,
419             ])
420
421             # 'inprogress' status should be on the first packet only, all other
422             # statuses should be on the last packet.
423             if self.status in _FINAL_ACTIONS:
424                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
425                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
426             else:
427                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
428                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
429             self.assertThat(
430                 result._events,
431                 MatchesListwise([
432                     MatchesStatusCall(call='startTestRun'),
433                     first_call,
434                     last_call,
435                     MatchesStatusCall(call='stopTestRun'),
436                 ])
437             )
438
439     def test_filename_can_be_overridden(self):
440         with temp_file_contents(b"Hello") as f:
441             specified_file_name = self.getUniqueString()
442             result = get_result_for([
443                 self.option,
444                 self.test_id,
445                 '--attach-file',
446                 f.name,
447                 '--file-name',
448                 specified_file_name])
449
450             self.assertThat(
451                 result._events,
452                 MatchesListwise([
453                     MatchesStatusCall(call='startTestRun'),
454                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
455                     MatchesStatusCall(call='stopTestRun'),
456                 ])
457             )
458
459     def test_file_name_is_used_by_default(self):
460         with temp_file_contents(b"Hello") as f:
461             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
462
463             self.assertThat(
464                 result._events,
465                 MatchesListwise([
466                     MatchesStatusCall(call='startTestRun'),
467                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
468                     MatchesStatusCall(call='stopTestRun'),
469                 ])
470             )
471
472
473 class FileDataTests(TestCase):
474
475     def test_can_attach_file_without_test_id(self):
476         with temp_file_contents(b"Hello") as f:
477             result = get_result_for(['--attach-file', f.name])
478
479             self.assertThat(
480                 result._events,
481                 MatchesListwise([
482                     MatchesStatusCall(call='startTestRun'),
483                     MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True),
484                     MatchesStatusCall(call='stopTestRun'),
485                 ])
486             )
487
488     def test_file_name_is_used_by_default(self):
489         with temp_file_contents(b"Hello") as f:
490             result = get_result_for(['--attach-file', f.name])
491
492             self.assertThat(
493                 result._events,
494                 MatchesListwise([
495                     MatchesStatusCall(call='startTestRun'),
496                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
497                     MatchesStatusCall(call='stopTestRun'),
498                 ])
499             )
500
501     def test_filename_can_be_overridden(self):
502         with temp_file_contents(b"Hello") as f:
503             specified_file_name = self.getUniqueString()
504             result = get_result_for([
505                 '--attach-file',
506                 f.name,
507                 '--file-name',
508                 specified_file_name
509             ])
510
511             self.assertThat(
512                 result._events,
513                 MatchesListwise([
514                     MatchesStatusCall(call='startTestRun'),
515                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
516                     MatchesStatusCall(call='stopTestRun'),
517                 ])
518             )
519
520     def test_files_have_timestamp(self):
521         _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
522         self.patch(_o, 'create_timestamp', lambda: _dummy_timestamp)
523
524         with temp_file_contents(b"Hello") as f:
525             specified_file_name = self.getUniqueString()
526             result = get_result_for([
527                 '--attach-file',
528                 f.name,
529             ])
530
531             self.assertThat(
532                 result._events,
533                 MatchesListwise([
534                     MatchesStatusCall(call='startTestRun'),
535                     MatchesStatusCall(file_bytes=b'Hello', timestamp=_dummy_timestamp),
536                     MatchesStatusCall(call='stopTestRun'),
537                 ])
538             )
539
540     def test_can_specify_tags_without_test_status(self):
541         result = get_result_for([
542             '--tag',
543             'foo',
544         ])
545
546         self.assertThat(
547             result._events,
548             MatchesListwise([
549                 MatchesStatusCall(call='startTestRun'),
550                 MatchesStatusCall(test_tags=set(['foo'])),
551                 MatchesStatusCall(call='stopTestRun'),
552             ])
553         )
554
555
556 class MatchesStatusCall(Matcher):
557
558     _position_lookup = {
559         'call': 0,
560         'test_id': 1,
561         'test_status': 2,
562         'test_tags': 3,
563         'runnable': 4,
564         'file_name': 5,
565         'file_bytes': 6,
566         'eof': 7,
567         'mime_type': 8,
568         'route_code': 9,
569         'timestamp': 10,
570     }
571
572     def __init__(self, **kwargs):
573         unknown_kwargs = list(filter(
574             lambda k: k not in self._position_lookup,
575             kwargs
576         ))
577         if unknown_kwargs:
578             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
579         self._filters = kwargs
580
581     def match(self, call_tuple):
582         for k, v in self._filters.items():
583             try:
584                 pos = self._position_lookup[k]
585                 if call_tuple[pos] != v:
586                     return Mismatch(
587                         "Value for key is %r, not %r" % (call_tuple[pos], v)
588                     )
589             except IndexError:
590                 return Mismatch("Key %s is not present." % k)
591
592     def __str__(self):
593         return "<MatchesStatusCall %r>" % self._filters