9d530c5df0a6fa22fd96773478b6cb21062eff0c
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Thomi Richards <thomi.richards@canonical.com>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17
18 import argparse
19 from collections import namedtuple
20 import datetime
21 from functools import partial
22 from io import BytesIO
23 from tempfile import NamedTemporaryFile
24 from testtools import TestCase
25 from testtools.matchers import (
26     Equals,
27     IsInstance,
28     Matcher,
29     MatchesListwise,
30     Mismatch,
31 )
32 from testtools.testresult.doubles import StreamResult
33
34 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
35 from subunit._output import (
36     generate_bytestream,
37     parse_arguments,
38     translate_command_name,
39     utc,
40     write_chunked_file,
41 )
42 import subunit._output as _o
43
44
45 class SafeArgumentParser(argparse.ArgumentParser):
46
47     def exit(self, status=0, message=""):
48         raise RuntimeError("ArgumentParser requested to exit with status "\
49             " %d and message %r" % (status, message))
50
51
52 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
53
54
55 class OutputFilterArgumentTests(TestCase):
56
57     """Tests for the command line argument parser."""
58
59     _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
60
61     def _test_command(self, command, test_id):
62         args = safe_parse_arguments(args=[command, test_id])
63
64         self.assertThat(args.action, Equals(command))
65         self.assertThat(args.test_id, Equals(test_id))
66
67     def test_can_parse_all_commands_with_test_id(self):
68         for command in self._all_supported_commands:
69             self._test_command(command, self.getUniqueString())
70
71     def test_command_translation(self):
72         self.assertThat(translate_command_name('start'), Equals('inprogress'))
73         self.assertThat(translate_command_name('pass'), Equals('success'))
74         for command in ('fail', 'skip', 'exists'):
75             self.assertThat(translate_command_name(command), Equals(command))
76
77     def test_all_commands_parse_file_attachment(self):
78         with NamedTemporaryFile() as tmp_file:
79             for command in self._all_supported_commands:
80                 args = safe_parse_arguments(
81                     args=[command, 'foo', '--attach-file', tmp_file.name]
82                 )
83                 self.assertThat(args.attach_file, IsInstance(file))
84                 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
85
86
87 class ByteStreamCompatibilityTests(TestCase):
88
89     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
90
91     def setUp(self):
92         super(ByteStreamCompatibilityTests, self).setUp()
93         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
94
95     def _get_result_for(self, *commands):
96         """Get a result object from *commands.
97
98         Runs the 'generate_bytestream' function from subunit._output after
99         parsing *commands as if they were specified on the command line. The
100         resulting bytestream is then converted back into a result object and
101         returned.
102
103         """
104         stream = BytesIO()
105
106         for command_list in commands:
107             args = safe_parse_arguments(command_list)
108             output_writer = StreamResultToBytes(output_stream=stream)
109             generate_bytestream(args, output_writer)
110
111         stream.seek(0)
112
113         case = ByteStreamToStreamResult(source=stream)
114         result = StreamResult()
115         case.run(result)
116         return result
117
118     def test_start_generates_inprogress(self):
119         result = self._get_result_for(
120             ['start', 'foo'],
121         )
122
123         self.assertThat(
124             result._events[0],
125             MatchesCall(
126                 call='status',
127                 test_id='foo',
128                 test_status='inprogress',
129                 timestamp=self._dummy_timestamp,
130             )
131         )
132
133     def test_pass_generates_success(self):
134         result = self._get_result_for(
135             ['pass', 'foo'],
136         )
137
138         self.assertThat(
139             result._events[0],
140             MatchesCall(
141                 call='status',
142                 test_id='foo',
143                 test_status='success',
144                 timestamp=self._dummy_timestamp,
145             )
146         )
147
148     def test_fail_generates_fail(self):
149         result = self._get_result_for(
150             ['fail', 'foo'],
151         )
152
153         self.assertThat(
154             result._events[0],
155             MatchesCall(
156                 call='status',
157                 test_id='foo',
158                 test_status='fail',
159                 timestamp=self._dummy_timestamp,
160             )
161         )
162
163     def test_skip_generates_skip(self):
164         result = self._get_result_for(
165             ['skip', 'foo'],
166         )
167
168         self.assertThat(
169             result._events[0],
170             MatchesCall(
171                 call='status',
172                 test_id='foo',
173                 test_status='skip',
174                 timestamp=self._dummy_timestamp,
175             )
176         )
177
178     def test_exists_generates_exists(self):
179         result = self._get_result_for(
180             ['exists', 'foo'],
181         )
182
183         self.assertThat(
184             result._events[0],
185             MatchesCall(
186                 call='status',
187                 test_id='foo',
188                 test_status='exists',
189                 timestamp=self._dummy_timestamp,
190             )
191         )
192
193
194 class FileChunkingTests(TestCase):
195
196     def _write_chunk_file(self, file_data, chunk_size):
197         """Write chunked data to a subunit stream, return a StreamResult object."""
198         stream = BytesIO()
199         output_writer = StreamResultToBytes(output_stream=stream)
200
201         with NamedTemporaryFile() as f:
202             f.write(file_data)
203             f.seek(0)
204
205             write_chunked_file(f, 'foo_test', output_writer, chunk_size)
206
207         stream.seek(0)
208
209         case = ByteStreamToStreamResult(source=stream)
210         result = StreamResult()
211         case.run(result)
212         return result
213
214     def test_file_chunk_size_is_honored(self):
215         result = self._write_chunk_file("Hello", 1)
216         self.assertThat(
217             result._events,
218             MatchesListwise([
219                 MatchesCall(call='status', file_bytes='H', eof=False),
220                 MatchesCall(call='status', file_bytes='e', eof=False),
221                 MatchesCall(call='status', file_bytes='l', eof=False),
222                 MatchesCall(call='status', file_bytes='l', eof=False),
223                 MatchesCall(call='status', file_bytes='o', eof=False),
224                 MatchesCall(call='status', file_bytes='', eof=True),
225             ])
226         )
227
228 class MatchesCall(Matcher):
229
230     _position_lookup = {
231             'call': 0,
232             'test_id': 1,
233             'test_status': 2,
234             'test_tags': 3,
235             'runnable': 4,
236             'file_name': 5,
237             'file_bytes': 6,
238             'eof': 7,
239             'mime_type': 8,
240             'route_code': 9,
241             'timestamp': 10,
242         }
243
244     def __init__(self, **kwargs):
245         unknown_kwargs = filter(
246             lambda k: k not in self._position_lookup,
247             kwargs
248         )
249         if unknown_kwargs:
250             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
251         self._filters = kwargs
252
253     def match(self, call_tuple):
254         for k,v in self._filters.items():
255             try:
256                 if call_tuple[self._position_lookup[k]] != v:
257                     return Mismatch("Value for key is %r, not %r" % (self._position_lookup[k], v))
258             except IndexError:
259                 return Mismatch("Key %s is not present." % k)
260
261     def __str__(self):
262         return "<MatchesCall %r>" % self._filters
263