Add support for passing mime-type on the command-line.
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Thomi Richards <thomi.richards@canonical.com>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17
18 import argparse
19 from collections import namedtuple
20 import datetime
21 from functools import partial
22 from io import BytesIO
23 from tempfile import NamedTemporaryFile
24 from testtools import TestCase
25 from testtools.matchers import (
26     Equals,
27     IsInstance,
28     Matcher,
29     MatchesListwise,
30     Mismatch,
31 )
32 from testtools.testresult.doubles import StreamResult
33
34 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
35 from subunit._output import (
36     generate_bytestream,
37     parse_arguments,
38     translate_command_name,
39     utc,
40     write_chunked_file,
41 )
42 import subunit._output as _o
43
44
45 class SafeArgumentParser(argparse.ArgumentParser):
46
47     def exit(self, status=0, message=""):
48         raise RuntimeError("ArgumentParser requested to exit with status "\
49             " %d and message %r" % (status, message))
50
51
52 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
53
54
55 class OutputFilterArgumentTests(TestCase):
56
57     """Tests for the command line argument parser."""
58
59     _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
60
61     def _test_command(self, command, test_id):
62         args = safe_parse_arguments(args=[command, test_id])
63
64         self.assertThat(args.action, Equals(command))
65         self.assertThat(args.test_id, Equals(test_id))
66
67     def test_can_parse_all_commands_with_test_id(self):
68         for command in self._all_supported_commands:
69             self._test_command(command, self.getUniqueString())
70
71     def test_command_translation(self):
72         self.assertThat(translate_command_name('start'), Equals('inprogress'))
73         self.assertThat(translate_command_name('pass'), Equals('success'))
74         for command in ('fail', 'skip', 'exists'):
75             self.assertThat(translate_command_name(command), Equals(command))
76
77     def test_all_commands_parse_file_attachment(self):
78         with NamedTemporaryFile() as tmp_file:
79             for command in self._all_supported_commands:
80                 args = safe_parse_arguments(
81                     args=[command, 'foo', '--attach-file', tmp_file.name]
82                 )
83                 self.assertThat(args.attach_file, IsInstance(file))
84                 self.assertThat(args.attach_file.name, Equals(tmp_file.name))
85
86     def test_all_commands_accept_mimetype_argument(self):
87         for command in self._all_supported_commands:
88             args = safe_parse_arguments(
89                 args=[command, 'foo', '--mimetype', "text/plain"]
90             )
91             self.assertThat(args.mimetype, Equals("text/plain"))
92
93
94 class ByteStreamCompatibilityTests(TestCase):
95
96     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
97
98     def setUp(self):
99         super(ByteStreamCompatibilityTests, self).setUp()
100         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
101
102     def _get_result_for(self, *commands):
103         """Get a result object from *commands.
104
105         Runs the 'generate_bytestream' function from subunit._output after
106         parsing *commands as if they were specified on the command line. The
107         resulting bytestream is then converted back into a result object and
108         returned.
109
110         """
111         stream = BytesIO()
112
113         for command_list in commands:
114             args = safe_parse_arguments(command_list)
115             output_writer = StreamResultToBytes(output_stream=stream)
116             generate_bytestream(args, output_writer)
117
118         stream.seek(0)
119
120         case = ByteStreamToStreamResult(source=stream)
121         result = StreamResult()
122         case.run(result)
123         return result
124
125     def test_start_generates_inprogress(self):
126         result = self._get_result_for(
127             ['start', 'foo'],
128         )
129
130         self.assertThat(
131             result._events[0],
132             MatchesCall(
133                 call='status',
134                 test_id='foo',
135                 test_status='inprogress',
136                 timestamp=self._dummy_timestamp,
137             )
138         )
139
140     def test_pass_generates_success(self):
141         result = self._get_result_for(
142             ['pass', 'foo'],
143         )
144
145         self.assertThat(
146             result._events[0],
147             MatchesCall(
148                 call='status',
149                 test_id='foo',
150                 test_status='success',
151                 timestamp=self._dummy_timestamp,
152             )
153         )
154
155     def test_fail_generates_fail(self):
156         result = self._get_result_for(
157             ['fail', 'foo'],
158         )
159
160         self.assertThat(
161             result._events[0],
162             MatchesCall(
163                 call='status',
164                 test_id='foo',
165                 test_status='fail',
166                 timestamp=self._dummy_timestamp,
167             )
168         )
169
170     def test_skip_generates_skip(self):
171         result = self._get_result_for(
172             ['skip', 'foo'],
173         )
174
175         self.assertThat(
176             result._events[0],
177             MatchesCall(
178                 call='status',
179                 test_id='foo',
180                 test_status='skip',
181                 timestamp=self._dummy_timestamp,
182             )
183         )
184
185     def test_exists_generates_exists(self):
186         result = self._get_result_for(
187             ['exists', 'foo'],
188         )
189
190         self.assertThat(
191             result._events[0],
192             MatchesCall(
193                 call='status',
194                 test_id='foo',
195                 test_status='exists',
196                 timestamp=self._dummy_timestamp,
197             )
198         )
199
200
201 class FileChunkingTests(TestCase):
202
203     def _write_chunk_file(self, file_data, chunk_size, mimetype=None):
204         """Write chunked data to a subunit stream, return a StreamResult object."""
205         stream = BytesIO()
206         output_writer = StreamResultToBytes(output_stream=stream)
207
208         with NamedTemporaryFile() as f:
209             f.write(file_data)
210             f.seek(0)
211
212             write_chunked_file(f, 'foo_test', output_writer, chunk_size, mimetype)
213
214         stream.seek(0)
215
216         case = ByteStreamToStreamResult(source=stream)
217         result = StreamResult()
218         case.run(result)
219         return result
220
221     def test_file_chunk_size_is_honored(self):
222         result = self._write_chunk_file("Hello", 1)
223         self.assertThat(
224             result._events,
225             MatchesListwise([
226                 MatchesCall(call='status', file_bytes='H', mime_type=None, eof=False),
227                 MatchesCall(call='status', file_bytes='e', mime_type=None, eof=False),
228                 MatchesCall(call='status', file_bytes='l', mime_type=None, eof=False),
229                 MatchesCall(call='status', file_bytes='l', mime_type=None, eof=False),
230                 MatchesCall(call='status', file_bytes='o', mime_type=None, eof=False),
231                 MatchesCall(call='status', file_bytes='', mime_type=None, eof=True),
232             ])
233         )
234
235     def test_file_mimetype_is_honored(self):
236         result = self._write_chunk_file("SomeData", 1024, "text/plain")
237         self.assertThat(
238             result._events,
239             MatchesListwise([
240                 MatchesCall(call='status', file_bytes='SomeData', mime_type="text/plain"),
241                 MatchesCall(call='status', file_bytes='', mime_type="text/plain"),
242             ])
243         )
244
245
246 class MatchesCall(Matcher):
247
248     _position_lookup = {
249             'call': 0,
250             'test_id': 1,
251             'test_status': 2,
252             'test_tags': 3,
253             'runnable': 4,
254             'file_name': 5,
255             'file_bytes': 6,
256             'eof': 7,
257             'mime_type': 8,
258             'route_code': 9,
259             'timestamp': 10,
260         }
261
262     def __init__(self, **kwargs):
263         unknown_kwargs = filter(
264             lambda k: k not in self._position_lookup,
265             kwargs
266         )
267         if unknown_kwargs:
268             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
269         self._filters = kwargs
270
271     def match(self, call_tuple):
272         for k,v in self._filters.items():
273             try:
274                 pos = self._position_lookup[k]
275                 if call_tuple[pos] != v:
276                     return Mismatch("Value for key is %r, not %r" % (call_tuple[pos], v))
277             except IndexError:
278                 return Mismatch("Key %s is not present." % k)
279
280     def __str__(self):
281         return "<MatchesCall %r>" % self._filters
282