f88e0aa07b76fff3d6a8d77275bfff30c7eef625
[third_party/subunit] / python / subunit / tests / test_test_protocol2.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 from io import BytesIO
18 import datetime
19
20 from testtools import TestCase
21 from testtools.matchers import Contains, HasLength
22 from testtools.tests.test_testresult import TestStreamResultContract
23 from testtools.testresult.doubles import StreamResult
24
25 import subunit
26 import subunit.iso8601 as iso8601
27
28 CONSTANT_ENUM = b'\xb3)\x01\x0c\x03foo\x08U_\x1b'
29 CONSTANT_INPROGRESS = b'\xb3)\x02\x0c\x03foo\x8e\xc1-\xb5'
30 CONSTANT_SUCCESS = b'\xb3)\x03\x0c\x03fooE\x9d\xfe\x10'
31 CONSTANT_UXSUCCESS = b'\xb3)\x04\x0c\x03fooX\x98\xce\xa8'
32 CONSTANT_SKIP = b'\xb3)\x05\x0c\x03foo\x93\xc4\x1d\r'
33 CONSTANT_FAIL = b'\xb3)\x06\x0c\x03foo\x15Po\xa3'
34 CONSTANT_XFAIL = b'\xb3)\x07\x0c\x03foo\xde\x0c\xbc\x06'
35 CONSTANT_EOF = b'\xb3!\x10\x08S\x15\x88\xdc'
36 CONSTANT_FILE_CONTENT = b'\xb3!@\x13\x06barney\x03wooA5\xe3\x8c'
37 CONSTANT_MIME = b'\xb3! #\x1aapplication/foo; charset=1x3Q\x15'
38 CONSTANT_TIMESTAMP = b'\xb3+\x03\x13<\x17T\xcf\x80\xaf\xc8\x03barI\x96>-'
39 CONSTANT_ROUTE_CODE = b'\xb3-\x03\x13\x03bar\x06source\x9cY9\x19'
40 CONSTANT_RUNNABLE = b'\xb3(\x03\x0c\x03foo\xe3\xea\xf5\xa4'
41 CONSTANT_TAGS = [
42     b'\xb3)\x80\x15\x03bar\x02\x03foo\x03barTHn\xb4',
43     b'\xb3)\x80\x15\x03bar\x02\x03bar\x03foo\xf8\xf1\x91o',
44     ]
45
46
47 class TestStreamResultToBytesContract(TestCase, TestStreamResultContract):
48     """Check that StreamResult behaves as testtools expects."""
49
50     def _make_result(self):
51         return subunit.StreamResultToBytes(BytesIO())
52
53
54 class TestStreamResultToBytes(TestCase):
55
56     def _make_result(self):
57         output = BytesIO()
58         return subunit.StreamResultToBytes(output), output
59
60     def test_numbers(self):
61         result = subunit.StreamResultToBytes(BytesIO())
62         packet = []
63         self.assertRaises(Exception, result._write_number, -1, packet)
64         self.assertEqual([], packet)
65         result._write_number(0, packet)
66         self.assertEqual([b'\x00'], packet)
67         del packet[:]
68         result._write_number(63, packet)
69         self.assertEqual([b'\x3f'], packet)
70         del packet[:]
71         result._write_number(64, packet)
72         self.assertEqual([b'\x40\x40'], packet)
73         del packet[:]
74         result._write_number(16383, packet)
75         self.assertEqual([b'\x7f\xff'], packet)
76         del packet[:]
77         result._write_number(16384, packet)
78         self.assertEqual([b'\x80\x40', b'\x00'], packet)
79         del packet[:]
80         result._write_number(4194303, packet)
81         self.assertEqual([b'\xbf\xff', b'\xff'], packet)
82         del packet[:]
83         result._write_number(4194304, packet)
84         self.assertEqual([b'\xc0\x40\x00\x00'], packet)
85         del packet[:]
86         result._write_number(1073741823, packet)
87         self.assertEqual([b'\xff\xff\xff\xff'], packet)
88         del packet[:]
89         self.assertRaises(Exception, result._write_number, 1073741824, packet)
90         self.assertEqual([], packet)
91
92     def test_volatile_length(self):
93         # if the length of the packet data before the length itself is
94         # considered is right on the boundary for length's variable length
95         # encoding, it is easy to get the length wrong by not accounting for
96         # length itself.
97         # that is, the encoder has to ensure that length == sum (length_of_rest
98         # + length_of_length)
99         result, output = self._make_result()
100         # 1 byte short:
101         result.status(file_name="", file_bytes=b'\xff'*0)
102         self.assertThat(output.getvalue(), HasLength(10))
103         self.assertEqual(b'\x0a', output.getvalue()[3:4])
104         output.seek(0)
105         output.truncate()
106         # 1 byte long:
107         result.status(file_name="", file_bytes=b'\xff'*53)
108         self.assertThat(output.getvalue(), HasLength(63))
109         self.assertEqual(b'\x3f', output.getvalue()[3:4])
110         output.seek(0)
111         output.truncate()
112         # 2 bytes short
113         result.status(file_name="", file_bytes=b'\xff'*54)
114         self.assertThat(output.getvalue(), HasLength(65))
115         self.assertEqual(b'\x40\x41', output.getvalue()[3:5])
116         output.seek(0)
117         output.truncate()
118         # 2 bytes long
119         result.status(file_name="", file_bytes=b'\xff'*16371)
120         self.assertThat(output.getvalue(), HasLength(16383))
121         self.assertEqual(b'\x7f\xff', output.getvalue()[3:5])
122         output.seek(0)
123         output.truncate()
124         # 3 bytes short
125         result.status(file_name="", file_bytes=b'\xff'*16372)
126         self.assertThat(output.getvalue(), HasLength(16385))
127         self.assertEqual(b'\x80\x40\x01', output.getvalue()[3:6])
128         output.seek(0)
129         output.truncate()
130         # 3 bytes long
131         result.status(file_name="", file_bytes=b'\xff'*4194289)
132         self.assertThat(output.getvalue(), HasLength(4194303))
133         self.assertEqual(b'\xbf\xff\xff', output.getvalue()[3:6])
134         output.seek(0)
135         output.truncate()
136         self.assertRaises(Exception, result.status, file_name="",
137             file_bytes=b'\xff'*4194290)
138
139     def test_trivial_enumeration(self):
140         result, output = self._make_result()
141         result.status("foo", 'exists')
142         self.assertEqual(CONSTANT_ENUM, output.getvalue())
143
144     def test_inprogress(self):
145         result, output = self._make_result()
146         result.status("foo", 'inprogress')
147         self.assertEqual(CONSTANT_INPROGRESS, output.getvalue())
148
149     def test_success(self):
150         result, output = self._make_result()
151         result.status("foo", 'success')
152         self.assertEqual(CONSTANT_SUCCESS, output.getvalue())
153
154     def test_uxsuccess(self):
155         result, output = self._make_result()
156         result.status("foo", 'uxsuccess')
157         self.assertEqual(CONSTANT_UXSUCCESS, output.getvalue())
158
159     def test_skip(self):
160         result, output = self._make_result()
161         result.status("foo", 'skip')
162         self.assertEqual(CONSTANT_SKIP, output.getvalue())
163
164     def test_fail(self):
165         result, output = self._make_result()
166         result.status("foo", 'fail')
167         self.assertEqual(CONSTANT_FAIL, output.getvalue())
168
169     def test_xfail(self):
170         result, output = self._make_result()
171         result.status("foo", 'xfail')
172         self.assertEqual(CONSTANT_XFAIL, output.getvalue())
173
174     def test_unknown_status(self):
175         result, output = self._make_result()
176         self.assertRaises(Exception, result.status, "foo", 'boo')
177         self.assertEqual(b'', output.getvalue())
178
179     def test_eof(self):
180         result, output = self._make_result()
181         result.status(eof=True)
182         self.assertEqual(CONSTANT_EOF, output.getvalue())
183
184     def test_file_content(self):
185         result, output = self._make_result()
186         result.status(file_name="barney", file_bytes=b"woo")
187         self.assertEqual(CONSTANT_FILE_CONTENT, output.getvalue())
188
189     def test_mime(self):
190         result, output = self._make_result()
191         result.status(mime_type="application/foo; charset=1")
192         self.assertEqual(CONSTANT_MIME, output.getvalue())
193
194     def test_route_code(self):
195         result, output = self._make_result()
196         result.status(test_id="bar", test_status='success',
197             route_code="source")
198         self.assertEqual(CONSTANT_ROUTE_CODE, output.getvalue())
199
200     def test_runnable(self):
201         result, output = self._make_result()
202         result.status("foo", 'success', runnable=False)
203         self.assertEqual(CONSTANT_RUNNABLE, output.getvalue())
204
205     def test_tags(self):
206         result, output = self._make_result()
207         result.status(test_id="bar", test_tags=set(['foo', 'bar']))
208         self.assertThat(CONSTANT_TAGS, Contains(output.getvalue()))
209
210     def test_timestamp(self):
211         timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
212             iso8601.Utc())
213         result, output = self._make_result()
214         result.status(test_id="bar", test_status='success', timestamp=timestamp)
215         self.assertEqual(CONSTANT_TIMESTAMP, output.getvalue())
216
217
218 class TestByteStreamToStreamResult(TestCase):
219
220     def test_non_subunit_encapsulated(self):
221         source = BytesIO(b"foo\nbar\n")
222         result = StreamResult()
223         subunit.ByteStreamToStreamResult(
224             source, non_subunit_name="stdout").run(result)
225         self.assertEqual([
226             ('status', None, None, None, True, 'stdout', b'f', False, None, None, None),
227             ('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
228             ('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
229             ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
230             ('status', None, None, None, True, 'stdout', b'b', False, None, None, None),
231             ('status', None, None, None, True, 'stdout', b'a', False, None, None, None),
232             ('status', None, None, None, True, 'stdout', b'r', False, None, None, None),
233             ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
234             ], result._events)
235         self.assertEqual(b'', source.read())
236
237     def test_signature_middle_utf8_char(self):
238         utf8_bytes = b'\xe3\xb3\x8a'
239         source = BytesIO(utf8_bytes)
240         # Should be treated as one character (it is u'\u3cca') and wrapped
241         result = StreamResult()
242         subunit.ByteStreamToStreamResult(
243             source, non_subunit_name="stdout").run(
244             result)
245         self.assertEqual([
246             ('status', None, None, None, True, 'stdout', b'\xe3', False, None, None, None),
247             ('status', None, None, None, True, 'stdout', b'\xb3', False, None, None, None),
248             ('status', None, None, None, True, 'stdout', b'\x8a', False, None, None, None),
249             ], result._events)
250
251     def test_non_subunit_disabled_raises(self):
252         source = BytesIO(b"foo\nbar\n")
253         result = StreamResult()
254         case = subunit.ByteStreamToStreamResult(source)
255         e = self.assertRaises(Exception, case.run, result)
256         self.assertEqual(b'f', e.args[1])
257         self.assertEqual(b'oo\nbar\n', source.read())
258         self.assertEqual([], result._events)
259
260     def test_trivial_enumeration(self):
261         source = BytesIO(CONSTANT_ENUM)
262         result = StreamResult()
263         subunit.ByteStreamToStreamResult(
264             source, non_subunit_name="stdout").run(result)
265         self.assertEqual(b'', source.read())
266         self.assertEqual([
267             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
268             ], result._events)
269
270     def test_multiple_events(self):
271         source = BytesIO(CONSTANT_ENUM + CONSTANT_ENUM)
272         result = StreamResult()
273         subunit.ByteStreamToStreamResult(
274             source, non_subunit_name="stdout").run(result)
275         self.assertEqual(b'', source.read())
276         self.assertEqual([
277             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
278             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
279             ], result._events)
280
281     def test_inprogress(self):
282         self.check_event(CONSTANT_INPROGRESS, 'inprogress')
283
284     def test_success(self):
285         self.check_event(CONSTANT_SUCCESS, 'success')
286
287     def test_uxsuccess(self):
288         self.check_event(CONSTANT_UXSUCCESS, 'uxsuccess')
289
290     def test_skip(self):
291         self.check_event(CONSTANT_SKIP, 'skip')
292
293     def test_fail(self):
294         self.check_event(CONSTANT_FAIL, 'fail')
295
296     def test_xfail(self):
297         self.check_event(CONSTANT_XFAIL, 'xfail')
298
299     def check_events(self, source_bytes, events):
300         source = BytesIO(source_bytes)
301         result = StreamResult()
302         subunit.ByteStreamToStreamResult(
303             source, non_subunit_name="stdout").run(result)
304         self.assertEqual(b'', source.read())
305         self.assertEqual(events, result._events)
306         #- any file attachments should be byte contents [as users assume that].
307         for event in result._events:
308             if event[5] is not None:
309                 self.assertIsInstance(event[6], bytes)
310
311     def check_event(self, source_bytes, test_status=None, test_id="foo",
312         route_code=None, timestamp=None, tags=None, mime_type=None,
313         file_name=None, file_bytes=None, eof=False, runnable=True):
314         event = self._event(test_id=test_id, test_status=test_status,
315             tags=tags, runnable=runnable, file_name=file_name,
316             file_bytes=file_bytes, eof=eof, mime_type=mime_type,
317             route_code=route_code, timestamp=timestamp)
318         self.check_events(source_bytes, [event])
319
320     def _event(self, test_status=None, test_id=None, route_code=None,
321         timestamp=None, tags=None, mime_type=None, file_name=None,
322         file_bytes=None, eof=False, runnable=True):
323         return ('status', test_id, test_status, tags, runnable, file_name,
324             file_bytes, eof, mime_type, route_code, timestamp)
325
326     def test_eof(self):
327         self.check_event(CONSTANT_EOF, test_id=None, eof=True)
328
329     def test_file_content(self):
330         self.check_event(CONSTANT_FILE_CONTENT,
331             test_id=None, file_name="barney", file_bytes=b"woo")
332
333     def test_file_content_length_into_checksum(self):
334         # A bad file content length which creeps into the checksum.
335         bad_file_length_content = b'\xb3!@\x13\x06barney\x04woo\xdc\xe2\xdb\x35'
336         self.check_events(bad_file_length_content, [
337             self._event(test_id="subunit.parser", eof=True,
338                 file_name="Packet data", file_bytes=bad_file_length_content),
339             self._event(test_id="subunit.parser", test_status="fail", eof=True,
340                 file_name="Parser Error",
341                 file_bytes=b"File content extends past end of packet: claimed 4 bytes, 3 available"),
342             ])
343
344     def test_packet_length_4_word_varint(self):
345         packet_data = b'\xb3!@\xc0\x00\x11'
346         self.check_events(packet_data, [
347             self._event(test_id="subunit.parser", eof=True,
348                 file_name="Packet data", file_bytes=packet_data),
349             self._event(test_id="subunit.parser", test_status="fail", eof=True,
350                 file_name="Parser Error",
351                 file_bytes=b"3 byte maximum given but 4 byte value found."),
352             ])
353
354     def test_mime(self):
355         self.check_event(CONSTANT_MIME,
356             test_id=None, mime_type='application/foo; charset=1')
357
358     def test_route_code(self):
359         self.check_event(CONSTANT_ROUTE_CODE,
360             'success', route_code="source", test_id="bar")
361
362     def test_runnable(self):
363         self.check_event(CONSTANT_RUNNABLE,
364             test_status='success', runnable=False)
365
366     def test_tags(self):
367         self.check_event(CONSTANT_TAGS[0],
368             None, tags=set(['foo', 'bar']), test_id="bar")
369
370     def test_timestamp(self):
371         timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
372             iso8601.Utc())
373         self.check_event(CONSTANT_TIMESTAMP,
374             'success', test_id='bar', timestamp=timestamp)
375
376     def test_bad_crc_errors_via_status(self):
377         file_bytes = CONSTANT_MIME[:-1] + b'\x00'
378         self.check_events( file_bytes, [
379             self._event(test_id="subunit.parser", eof=True,
380                 file_name="Packet data", file_bytes=file_bytes),
381             self._event(test_id="subunit.parser", test_status="fail", eof=True,
382                 file_name="Parser Error",
383                 file_bytes=b'Bad checksum - calculated (0x78335115), '
384                     b'stored (0x78335100)'),
385             ])
386
387     def test_not_utf8_in_string(self):
388         file_bytes = CONSTANT_ROUTE_CODE[:5] + b'\xb4' + CONSTANT_ROUTE_CODE[6:-4] + b'\xce\x56\xc6\x17'
389         self.check_events(file_bytes, [
390             self._event(test_id="subunit.parser", eof=True,
391                 file_name="Packet data", file_bytes=file_bytes),
392             self._event(test_id="subunit.parser", test_status="fail", eof=True,
393                 file_name="Parser Error",
394                 file_bytes=b'UTF8 string at offset 2 is not UTF8'),
395             ])
396
397     def test_NULL_in_string(self):
398         file_bytes = CONSTANT_ROUTE_CODE[:6] + b'\x00' + CONSTANT_ROUTE_CODE[7:-4] + b'\xd7\x41\xac\xfe'
399         self.check_events(file_bytes, [
400             self._event(test_id="subunit.parser", eof=True,
401                 file_name="Packet data", file_bytes=file_bytes),
402             self._event(test_id="subunit.parser", test_status="fail", eof=True,
403                 file_name="Parser Error",
404                 file_bytes=b'UTF8 string at offset 2 contains NUL byte'),
405             ])
406
407     def test_bad_utf8_stringlength(self):
408         file_bytes = CONSTANT_ROUTE_CODE[:4] + b'\x3f' + CONSTANT_ROUTE_CODE[5:-4] + b'\xbe\x29\xe0\xc2'
409         self.check_events(file_bytes, [
410             self._event(test_id="subunit.parser", eof=True,
411                 file_name="Packet data", file_bytes=file_bytes),
412             self._event(test_id="subunit.parser", test_status="fail", eof=True,
413                 file_name="Parser Error",
414                 file_bytes=b'UTF8 string at offset 2 extends past end of '
415                     b'packet: claimed 63 bytes, 10 available'),
416             ])
417
418     def test_route_code_and_file_content(self):
419         content = BytesIO()
420         subunit.StreamResultToBytes(content).status(
421             route_code='0', mime_type='text/plain', file_name='bar',
422             file_bytes=b'foo')
423         self.check_event(content.getvalue(), test_id=None, file_name='bar',
424             route_code='0', mime_type='text/plain', file_bytes=b'foo')