Import matcher used.
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013 Subunit Contributors
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 import datetime
18 from functools import partial
19 from io import BytesIO, StringIO, TextIOWrapper
20 import optparse
21 import sys
22 from tempfile import NamedTemporaryFile
23
24 from contextlib import contextmanager
25 from testtools import TestCase
26 from testtools.compat import _u
27 from testtools.matchers import (
28     Equals,
29     Matcher,
30     MatchesAny,
31     MatchesListwise,
32     Mismatch,
33     raises,
34 )
35 from testtools.testresult.doubles import StreamResult
36
37 from subunit.iso8601 import UTC
38 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
39 from subunit._output import (
40     _ALL_ACTIONS,
41     _FINAL_ACTIONS,
42     generate_stream_results,
43     parse_arguments,
44 )
45 import subunit._output as _o
46
47
48 class SafeOptionParser(optparse.OptionParser):
49     """An ArgumentParser class that doesn't call sys.exit."""
50
51     def exit(self, status=0, message=""):
52         raise RuntimeError(message)
53
54     def error(self, message):
55         raise RuntimeError(message)
56
57
58 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeOptionParser)
59
60
61 class TestStatusArgParserTests(TestCase):
62
63     scenarios = [
64         (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS
65     ]
66
67     def test_can_parse_all_commands_with_test_id(self):
68         test_id = self.getUniqueString()
69         args = safe_parse_arguments(args=[self.option, test_id])
70
71         self.assertThat(args.action, Equals(self.command))
72         self.assertThat(args.test_id, Equals(test_id))
73
74     def test_all_commands_parse_file_attachment(self):
75         with NamedTemporaryFile() as tmp_file:
76             args = safe_parse_arguments(
77                 args=[self.option, 'foo', '--attach-file', tmp_file.name]
78             )
79             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
80
81     def test_all_commands_accept_mimetype_argument(self):
82         with NamedTemporaryFile() as tmp_file:
83             args = safe_parse_arguments(
84                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
85             )
86             self.assertThat(args.mimetype, Equals("text/plain"))
87
88     def test_all_commands_accept_file_name_argument(self):
89         with NamedTemporaryFile() as tmp_file:
90             args = safe_parse_arguments(
91                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--file-name', "foo"]
92             )
93             self.assertThat(args.file_name, Equals("foo"))
94
95     def test_all_commands_accept_tags_argument(self):
96         args = safe_parse_arguments(
97             args=[self.option, 'foo', '--tag', "foo", "--tag", "bar", "--tag", "baz"]
98         )
99         self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
100
101     def test_attach_file_with_hyphen_opens_stdin(self):
102         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"Hello")))
103         args = safe_parse_arguments(
104             args=[self.option, "foo", "--attach-file", "-"]
105         )
106
107         self.assertThat(args.attach_file.read(), Equals(b"Hello"))
108
109     def test_attach_file_with_hyphen_sets_filename_to_stdin(self):
110         args = safe_parse_arguments(
111             args=[self.option, "foo", "--attach-file", "-"]
112         )
113
114         self.assertThat(args.file_name, Equals("stdin"))
115
116     def test_can_override_stdin_filename(self):
117         args = safe_parse_arguments(
118             args=[self.option, "foo", "--attach-file", "-", '--file-name', 'foo']
119         )
120
121         self.assertThat(args.file_name, Equals("foo"))
122
123     def test_requires_test_id(self):
124         fn = lambda: safe_parse_arguments(args=[self.option])
125         self.assertThat(
126             fn,
127             raises(RuntimeError('argument %s: must specify a single TEST_ID.' % self.option))
128         )
129
130
131 class ArgParserTests(TestCase):
132
133     def test_can_parse_attach_file_without_test_id(self):
134         with NamedTemporaryFile() as tmp_file:
135             args = safe_parse_arguments(
136                 args=["--attach-file", tmp_file.name]
137             )
138             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
139
140     def test_can_run_without_args(self):
141         args = safe_parse_arguments([])
142
143     def test_cannot_specify_more_than_one_status_command(self):
144         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
145         self.assertThat(
146             fn,
147             raises(RuntimeError('argument --skip: Only one status may be specified at once.'))
148         )
149
150     def test_cannot_specify_mimetype_without_attach_file(self):
151         fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
152         self.assertThat(
153             fn,
154             raises(RuntimeError('Cannot specify --mimetype without --attach-file'))
155         )
156
157     def test_cannot_specify_filename_without_attach_file(self):
158         fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
159         self.assertThat(
160             fn,
161             raises(RuntimeError('Cannot specify --file-name without --attach-file'))
162         )
163
164     def test_can_specify_tags_without_status_command(self):
165         args = safe_parse_arguments(['--tag', 'foo'])
166         self.assertEqual(['foo'], args.tags)
167
168     def test_must_specify_tags_with_tags_options(self):
169         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--tag'])
170         self.assertThat(
171             fn,
172             MatchesAny(
173                 raises(RuntimeError('--tag option requires 1 argument')),
174                 raises(RuntimeError('--tag option requires an argument')),
175             )
176         )
177
178 def get_result_for(commands):
179     """Get a result object from *commands.
180
181     Runs the 'generate_stream_results' function from subunit._output after
182     parsing *commands as if they were specified on the command line. The
183     resulting bytestream is then converted back into a result object and
184     returned.
185     """
186     result = StreamResult()
187     args = safe_parse_arguments(commands)
188     generate_stream_results(args, result)
189     return result
190
191
192 @contextmanager
193 def temp_file_contents(data):
194     """Create a temporary file on disk containing 'data'."""
195     with NamedTemporaryFile() as f:
196         f.write(data)
197         f.seek(0)
198         yield f
199
200
201 class StatusStreamResultTests(TestCase):
202
203     scenarios = [
204         (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS
205     ]
206
207     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
208
209     def setUp(self):
210         super(StatusStreamResultTests, self).setUp()
211         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
212         self.test_id = self.getUniqueString()
213
214     def test_only_one_packet_is_generated(self):
215         result = get_result_for([self.option, self.test_id])
216         self.assertThat(
217             len(result._events),
218             Equals(3) # startTestRun and stopTestRun are also called, making 3 total.
219         )
220
221     def test_correct_status_is_generated(self):
222         result = get_result_for([self.option, self.test_id])
223
224         self.assertThat(
225             result._events[1],
226             MatchesStatusCall(test_status=self.status)
227         )
228
229     def test_all_commands_generate_tags(self):
230         result = get_result_for([self.option, self.test_id, '--tag', 'hello', '--tag', 'world'])
231         self.assertThat(
232             result._events[1],
233             MatchesStatusCall(test_tags=set(['hello', 'world']))
234         )
235
236     def test_all_commands_generate_timestamp(self):
237         result = get_result_for([self.option, self.test_id])
238
239         self.assertThat(
240             result._events[1],
241             MatchesStatusCall(timestamp=self._dummy_timestamp)
242         )
243
244     def test_all_commands_generate_correct_test_id(self):
245         result = get_result_for([self.option, self.test_id])
246
247         self.assertThat(
248             result._events[1],
249             MatchesStatusCall(test_id=self.test_id)
250         )
251
252     def test_file_is_sent_in_single_packet(self):
253         with temp_file_contents(b"Hello") as f:
254             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
255
256             self.assertThat(
257                 result._events,
258                 MatchesListwise([
259                     MatchesStatusCall(call='startTestRun'),
260                     MatchesStatusCall(file_bytes=b'Hello', eof=True),
261                     MatchesStatusCall(call='stopTestRun'),
262                 ])
263             )
264
265     def test_can_read_binary_files(self):
266         with temp_file_contents(b"\xDE\xAD\xBE\xEF") as f:
267             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
268
269             self.assertThat(
270                 result._events,
271                 MatchesListwise([
272                     MatchesStatusCall(call='startTestRun'),
273                     MatchesStatusCall(file_bytes=b"\xDE\xAD\xBE\xEF", eof=True),
274                     MatchesStatusCall(call='stopTestRun'),
275                 ])
276             )
277
278     def test_can_read_empty_files(self):
279         with temp_file_contents(b"") as f:
280             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
281
282             self.assertThat(
283                 result._events,
284                 MatchesListwise([
285                     MatchesStatusCall(call='startTestRun'),
286                     MatchesStatusCall(file_bytes=b"", file_name=f.name, eof=True),
287                     MatchesStatusCall(call='stopTestRun'),
288                 ])
289             )
290
291     def test_can_read_stdin(self):
292         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"\xFE\xED\xFA\xCE")))
293         result = get_result_for([self.option, self.test_id, '--attach-file', '-'])
294
295         self.assertThat(
296             result._events,
297             MatchesListwise([
298                 MatchesStatusCall(call='startTestRun'),
299                 MatchesStatusCall(file_bytes=b"\xFE\xED\xFA\xCE", file_name='stdin', eof=True),
300                 MatchesStatusCall(call='stopTestRun'),
301             ])
302         )
303
304     def test_file_is_sent_with_test_id(self):
305         with temp_file_contents(b"Hello") as f:
306             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
307
308             self.assertThat(
309                 result._events,
310                 MatchesListwise([
311                     MatchesStatusCall(call='startTestRun'),
312                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True),
313                     MatchesStatusCall(call='stopTestRun'),
314                 ])
315             )
316
317     def test_file_is_sent_with_test_status(self):
318         with temp_file_contents(b"Hello") as f:
319             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
320
321             self.assertThat(
322                 result._events,
323                 MatchesListwise([
324                     MatchesStatusCall(call='startTestRun'),
325                     MatchesStatusCall(test_status=self.status, file_bytes=b'Hello', eof=True),
326                     MatchesStatusCall(call='stopTestRun'),
327                 ])
328             )
329
330     def test_file_chunk_size_is_honored(self):
331         with temp_file_contents(b"Hello") as f:
332             self.patch(_o, '_CHUNK_SIZE', 1)
333             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
334
335             self.assertThat(
336                 result._events,
337                 MatchesListwise([
338                     MatchesStatusCall(call='startTestRun'),
339                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False),
340                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False),
341                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
342                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
343                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True),
344                     MatchesStatusCall(call='stopTestRun'),
345                 ])
346             )
347
348     def test_file_mimetype_specified_once_only(self):
349         with temp_file_contents(b"Hi") as f:
350             self.patch(_o, '_CHUNK_SIZE', 1)
351             result = get_result_for([
352                 self.option,
353                 self.test_id,
354                 '--attach-file',
355                 f.name,
356                 '--mimetype',
357                 'text/plain',
358             ])
359
360             self.assertThat(
361                 result._events,
362                 MatchesListwise([
363                     MatchesStatusCall(call='startTestRun'),
364                     MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False),
365                     MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True),
366                     MatchesStatusCall(call='stopTestRun'),
367                 ])
368             )
369
370     def test_tags_specified_once_only(self):
371         with temp_file_contents(b"Hi") as f:
372             self.patch(_o, '_CHUNK_SIZE', 1)
373             result = get_result_for([
374                 self.option,
375                 self.test_id,
376                 '--attach-file',
377                 f.name,
378                 '--tag',
379                 'foo',
380                 '--tag',
381                 'bar',
382             ])
383
384             self.assertThat(
385                 result._events,
386                 MatchesListwise([
387                     MatchesStatusCall(call='startTestRun'),
388                     MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])),
389                     MatchesStatusCall(test_id=self.test_id, test_tags=None),
390                     MatchesStatusCall(call='stopTestRun'),
391                 ])
392             )
393
394     def test_timestamp_specified_once_only(self):
395         with temp_file_contents(b"Hi") as f:
396             self.patch(_o, '_CHUNK_SIZE', 1)
397             result = get_result_for([
398                 self.option,
399                 self.test_id,
400                 '--attach-file',
401                 f.name,
402             ])
403
404             self.assertThat(
405                 result._events,
406                 MatchesListwise([
407                     MatchesStatusCall(call='startTestRun'),
408                     MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp),
409                     MatchesStatusCall(test_id=self.test_id, timestamp=None),
410                     MatchesStatusCall(call='stopTestRun'),
411                 ])
412             )
413
414     def test_test_status_specified_once_only(self):
415         with temp_file_contents(b"Hi") as f:
416             self.patch(_o, '_CHUNK_SIZE', 1)
417             result = get_result_for([
418                 self.option,
419                 self.test_id,
420                 '--attach-file',
421                 f.name,
422             ])
423
424             # 'inprogress' status should be on the first packet only, all other
425             # statuses should be on the last packet.
426             if self.status in _FINAL_ACTIONS:
427                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
428                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
429             else:
430                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
431                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
432             self.assertThat(
433                 result._events,
434                 MatchesListwise([
435                     MatchesStatusCall(call='startTestRun'),
436                     first_call,
437                     last_call,
438                     MatchesStatusCall(call='stopTestRun'),
439                 ])
440             )
441
442     def test_filename_can_be_overridden(self):
443         with temp_file_contents(b"Hello") as f:
444             specified_file_name = self.getUniqueString()
445             result = get_result_for([
446                 self.option,
447                 self.test_id,
448                 '--attach-file',
449                 f.name,
450                 '--file-name',
451                 specified_file_name])
452
453             self.assertThat(
454                 result._events,
455                 MatchesListwise([
456                     MatchesStatusCall(call='startTestRun'),
457                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
458                     MatchesStatusCall(call='stopTestRun'),
459                 ])
460             )
461
462     def test_file_name_is_used_by_default(self):
463         with temp_file_contents(b"Hello") as f:
464             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
465
466             self.assertThat(
467                 result._events,
468                 MatchesListwise([
469                     MatchesStatusCall(call='startTestRun'),
470                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
471                     MatchesStatusCall(call='stopTestRun'),
472                 ])
473             )
474
475
476 class FileDataTests(TestCase):
477
478     def test_can_attach_file_without_test_id(self):
479         with temp_file_contents(b"Hello") as f:
480             result = get_result_for(['--attach-file', f.name])
481
482             self.assertThat(
483                 result._events,
484                 MatchesListwise([
485                     MatchesStatusCall(call='startTestRun'),
486                     MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True),
487                     MatchesStatusCall(call='stopTestRun'),
488                 ])
489             )
490
491     def test_file_name_is_used_by_default(self):
492         with temp_file_contents(b"Hello") as f:
493             result = get_result_for(['--attach-file', f.name])
494
495             self.assertThat(
496                 result._events,
497                 MatchesListwise([
498                     MatchesStatusCall(call='startTestRun'),
499                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
500                     MatchesStatusCall(call='stopTestRun'),
501                 ])
502             )
503
504     def test_filename_can_be_overridden(self):
505         with temp_file_contents(b"Hello") as f:
506             specified_file_name = self.getUniqueString()
507             result = get_result_for([
508                 '--attach-file',
509                 f.name,
510                 '--file-name',
511                 specified_file_name
512             ])
513
514             self.assertThat(
515                 result._events,
516                 MatchesListwise([
517                     MatchesStatusCall(call='startTestRun'),
518                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
519                     MatchesStatusCall(call='stopTestRun'),
520                 ])
521             )
522
523     def test_files_have_timestamp(self):
524         _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
525         self.patch(_o, 'create_timestamp', lambda: _dummy_timestamp)
526
527         with temp_file_contents(b"Hello") as f:
528             specified_file_name = self.getUniqueString()
529             result = get_result_for([
530                 '--attach-file',
531                 f.name,
532             ])
533
534             self.assertThat(
535                 result._events,
536                 MatchesListwise([
537                     MatchesStatusCall(call='startTestRun'),
538                     MatchesStatusCall(file_bytes=b'Hello', timestamp=_dummy_timestamp),
539                     MatchesStatusCall(call='stopTestRun'),
540                 ])
541             )
542
543     def test_can_specify_tags_without_test_status(self):
544         result = get_result_for([
545             '--tag',
546             'foo',
547         ])
548
549         self.assertThat(
550             result._events,
551             MatchesListwise([
552                 MatchesStatusCall(call='startTestRun'),
553                 MatchesStatusCall(test_tags=set(['foo'])),
554                 MatchesStatusCall(call='stopTestRun'),
555             ])
556         )
557
558
559 class MatchesStatusCall(Matcher):
560
561     _position_lookup = {
562         'call': 0,
563         'test_id': 1,
564         'test_status': 2,
565         'test_tags': 3,
566         'runnable': 4,
567         'file_name': 5,
568         'file_bytes': 6,
569         'eof': 7,
570         'mime_type': 8,
571         'route_code': 9,
572         'timestamp': 10,
573     }
574
575     def __init__(self, **kwargs):
576         unknown_kwargs = list(filter(
577             lambda k: k not in self._position_lookup,
578             kwargs
579         ))
580         if unknown_kwargs:
581             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
582         self._filters = kwargs
583
584     def match(self, call_tuple):
585         for k, v in self._filters.items():
586             try:
587                 pos = self._position_lookup[k]
588                 if call_tuple[pos] != v:
589                     return Mismatch(
590                         "Value for key is %r, not %r" % (call_tuple[pos], v)
591                     )
592             except IndexError:
593                 return Mismatch("Key %s is not present." % k)
594
595     def __str__(self):
596         return "<MatchesStatusCall %r>" % self._filters