ded56b8616303b47656ab85523eb7d3db97cfb28
[third_party/subunit] / python / subunit / tests / test_test_protocol2.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 from io import BytesIO
18 import datetime
19
20 from testtools import TestCase
21 from testtools.matchers import Contains, HasLength
22 from testtools.tests.test_testresult import TestStreamResultContract
23 from testtools.testresult.doubles import StreamResult
24
25 import subunit
26 import subunit.iso8601 as iso8601
27
28 CONSTANT_ENUM = b'\xb3)\x01\x0c\x03foo\x08U_\x1b'
29 CONSTANT_INPROGRESS = b'\xb3)\x02\x0c\x03foo\x8e\xc1-\xb5'
30 CONSTANT_SUCCESS = b'\xb3)\x03\x0c\x03fooE\x9d\xfe\x10'
31 CONSTANT_UXSUCCESS = b'\xb3)\x04\x0c\x03fooX\x98\xce\xa8'
32 CONSTANT_SKIP = b'\xb3)\x05\x0c\x03foo\x93\xc4\x1d\r'
33 CONSTANT_FAIL = b'\xb3)\x06\x0c\x03foo\x15Po\xa3'
34 CONSTANT_XFAIL = b'\xb3)\x07\x0c\x03foo\xde\x0c\xbc\x06'
35 CONSTANT_EOF = b'\xb3!\x10\x08S\x15\x88\xdc'
36 CONSTANT_FILE_CONTENT = b'\xb3!@\x13\x06barney\x03wooA5\xe3\x8c'
37 CONSTANT_MIME = b'\xb3! #\x1aapplication/foo; charset=1x3Q\x15'
38 CONSTANT_TIMESTAMP = b'\xb3+\x03\x13<\x17T\xcf\x80\xaf\xc8\x03barI\x96>-'
39 CONSTANT_ROUTE_CODE = b'\xb3-\x03\x13\x03bar\x06source\x9cY9\x19'
40 CONSTANT_RUNNABLE = b'\xb3(\x03\x0c\x03foo\xe3\xea\xf5\xa4'
41 CONSTANT_TAGS = [
42     b'\xb3)\x80\x15\x03bar\x02\x03foo\x03barTHn\xb4',
43     b'\xb3)\x80\x15\x03bar\x02\x03bar\x03foo\xf8\xf1\x91o',
44     ]
45
46
47 class TestStreamResultToBytesContract(TestCase, TestStreamResultContract):
48     """Check that StreamResult behaves as testtools expects."""
49
50     def _make_result(self):
51         return subunit.StreamResultToBytes(BytesIO())
52
53
54 class TestStreamResultToBytes(TestCase):
55
56     def _make_result(self):
57         output = BytesIO()
58         return subunit.StreamResultToBytes(output), output
59
60     def test_numbers(self):
61         result = subunit.StreamResultToBytes(BytesIO())
62         packet = []
63         self.assertRaises(Exception, result._write_number, -1, packet)
64         self.assertEqual([], packet)
65         result._write_number(0, packet)
66         self.assertEqual([b'\x00'], packet)
67         del packet[:]
68         result._write_number(63, packet)
69         self.assertEqual([b'\x3f'], packet)
70         del packet[:]
71         result._write_number(64, packet)
72         self.assertEqual([b'\x40\x40'], packet)
73         del packet[:]
74         result._write_number(16383, packet)
75         self.assertEqual([b'\x7f\xff'], packet)
76         del packet[:]
77         result._write_number(16384, packet)
78         self.assertEqual([b'\x80\x40', b'\x00'], packet)
79         del packet[:]
80         result._write_number(4194303, packet)
81         self.assertEqual([b'\xbf\xff', b'\xff'], packet)
82         del packet[:]
83         result._write_number(4194304, packet)
84         self.assertEqual([b'\xc0\x40\x00\x00'], packet)
85         del packet[:]
86         result._write_number(1073741823, packet)
87         self.assertEqual([b'\xff\xff\xff\xff'], packet)
88         del packet[:]
89         self.assertRaises(Exception, result._write_number, 1073741824, packet)
90         self.assertEqual([], packet)
91
92     def test_volatile_length(self):
93         # if the length of the packet data before the length itself is
94         # considered is right on the boundary for length's variable length
95         # encoding, it is easy to get the length wrong by not accounting for
96         # length itself.
97         # that is, the encoder has to ensure that length == sum (length_of_rest
98         # + length_of_length)
99         result, output = self._make_result()
100         # 1 byte short:
101         result.status(file_name="", file_bytes=b'\xff'*0)
102         self.assertThat(output.getvalue(), HasLength(10))
103         self.assertEqual(b'\x0a', output.getvalue()[3:4])
104         output.seek(0)
105         output.truncate()
106         # 1 byte long:
107         result.status(file_name="", file_bytes=b'\xff'*53)
108         self.assertThat(output.getvalue(), HasLength(63))
109         self.assertEqual(b'\x3f', output.getvalue()[3:4])
110         output.seek(0)
111         output.truncate()
112         # 2 bytes short
113         result.status(file_name="", file_bytes=b'\xff'*54)
114         self.assertThat(output.getvalue(), HasLength(65))
115         self.assertEqual(b'\x40\x41', output.getvalue()[3:5])
116         output.seek(0)
117         output.truncate()
118         # 2 bytes long
119         result.status(file_name="", file_bytes=b'\xff'*16371)
120         self.assertThat(output.getvalue(), HasLength(16383))
121         self.assertEqual(b'\x7f\xff', output.getvalue()[3:5])
122         output.seek(0)
123         output.truncate()
124         # 3 bytes short
125         result.status(file_name="", file_bytes=b'\xff'*16372)
126         self.assertThat(output.getvalue(), HasLength(16385))
127         self.assertEqual(b'\x80\x40\x01', output.getvalue()[3:6])
128         output.seek(0)
129         output.truncate()
130         # 3 bytes long
131         result.status(file_name="", file_bytes=b'\xff'*4194289)
132         self.assertThat(output.getvalue(), HasLength(4194303))
133         self.assertEqual(b'\xbf\xff\xff', output.getvalue()[3:6])
134         output.seek(0)
135         output.truncate()
136         self.assertRaises(Exception, result.status, file_name="",
137             file_bytes=b'\xff'*4194290)
138
139     def test_trivial_enumeration(self):
140         result, output = self._make_result()
141         result.status("foo", 'exists')
142         self.assertEqual(CONSTANT_ENUM, output.getvalue())
143
144     def test_inprogress(self):
145         result, output = self._make_result()
146         result.status("foo", 'inprogress')
147         self.assertEqual(CONSTANT_INPROGRESS, output.getvalue())
148
149     def test_success(self):
150         result, output = self._make_result()
151         result.status("foo", 'success')
152         self.assertEqual(CONSTANT_SUCCESS, output.getvalue())
153
154     def test_uxsuccess(self):
155         result, output = self._make_result()
156         result.status("foo", 'uxsuccess')
157         self.assertEqual(CONSTANT_UXSUCCESS, output.getvalue())
158
159     def test_skip(self):
160         result, output = self._make_result()
161         result.status("foo", 'skip')
162         self.assertEqual(CONSTANT_SKIP, output.getvalue())
163
164     def test_fail(self):
165         result, output = self._make_result()
166         result.status("foo", 'fail')
167         self.assertEqual(CONSTANT_FAIL, output.getvalue())
168
169     def test_xfail(self):
170         result, output = self._make_result()
171         result.status("foo", 'xfail')
172         self.assertEqual(CONSTANT_XFAIL, output.getvalue())
173
174     def test_unknown_status(self):
175         result, output = self._make_result()
176         self.assertRaises(Exception, result.status, "foo", 'boo')
177         self.assertEqual(b'', output.getvalue())
178
179     def test_eof(self):
180         result, output = self._make_result()
181         result.status(eof=True)
182         self.assertEqual(CONSTANT_EOF, output.getvalue())
183
184     def test_file_content(self):
185         result, output = self._make_result()
186         result.status(file_name="barney", file_bytes=b"woo")
187         self.assertEqual(CONSTANT_FILE_CONTENT, output.getvalue())
188
189     def test_mime(self):
190         result, output = self._make_result()
191         result.status(mime_type="application/foo; charset=1")
192         self.assertEqual(CONSTANT_MIME, output.getvalue())
193
194     def test_route_code(self):
195         result, output = self._make_result()
196         result.status(test_id="bar", test_status='success',
197             route_code="source")
198         self.assertEqual(CONSTANT_ROUTE_CODE, output.getvalue())
199
200     def test_runnable(self):
201         result, output = self._make_result()
202         result.status("foo", 'success', runnable=False)
203         self.assertEqual(CONSTANT_RUNNABLE, output.getvalue())
204
205     def test_tags(self):
206         result, output = self._make_result()
207         result.status(test_id="bar", test_tags=set(['foo', 'bar']))
208         self.assertThat(CONSTANT_TAGS, Contains(output.getvalue()))
209
210     def test_timestamp(self):
211         timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
212             iso8601.Utc())
213         result, output = self._make_result()
214         result.status(test_id="bar", test_status='success', timestamp=timestamp)
215         self.assertEqual(CONSTANT_TIMESTAMP, output.getvalue())
216
217
218 class TestByteStreamToStreamResult(TestCase):
219
220     def test_non_subunit_encapsulated(self):
221         source = BytesIO(b"foo\nbar\n")
222         result = StreamResult()
223         subunit.ByteStreamToStreamResult(
224             source, non_subunit_name="stdout").run(result)
225         self.assertEqual([
226             ('status', None, None, None, True, 'stdout', b'f', False, None, None, None),
227             ('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
228             ('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
229             ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
230             ('status', None, None, None, True, 'stdout', b'b', False, None, None, None),
231             ('status', None, None, None, True, 'stdout', b'a', False, None, None, None),
232             ('status', None, None, None, True, 'stdout', b'r', False, None, None, None),
233             ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
234             ], result._events)
235         self.assertEqual(b'', source.read())
236
237     def test_signature_middle_utf8_char(self):
238         utf8_bytes = b'\xe3\xb3\x8a'
239         source = BytesIO(utf8_bytes)
240         # Should be treated as one character (it is u'\u3cca') and wrapped
241         result = StreamResult()
242         subunit.ByteStreamToStreamResult(
243             source, non_subunit_name="stdout").run(
244             result)
245         self.assertEqual([
246             ('status', None, None, None, True, 'stdout', b'\xe3', False, None, None, None),
247             ('status', None, None, None, True, 'stdout', b'\xb3', False, None, None, None),
248             ('status', None, None, None, True, 'stdout', b'\x8a', False, None, None, None),
249             ], result._events)
250
251     def test_non_subunit_disabled_raises(self):
252         source = BytesIO(b"foo\nbar\n")
253         result = StreamResult()
254         case = subunit.ByteStreamToStreamResult(source)
255         e = self.assertRaises(Exception, case.run, result)
256         self.assertEqual(b'f', e.args[1])
257         self.assertEqual(b'oo\nbar\n', source.read())
258         self.assertEqual([], result._events)
259
260     def test_trivial_enumeration(self):
261         source = BytesIO(CONSTANT_ENUM)
262         result = StreamResult()
263         subunit.ByteStreamToStreamResult(
264             source, non_subunit_name="stdout").run(result)
265         self.assertEqual(b'', source.read())
266         self.assertEqual([
267             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
268             ], result._events)
269
270     def test_multiple_events(self):
271         source = BytesIO(CONSTANT_ENUM + CONSTANT_ENUM)
272         result = StreamResult()
273         subunit.ByteStreamToStreamResult(
274             source, non_subunit_name="stdout").run(result)
275         self.assertEqual(b'', source.read())
276         self.assertEqual([
277             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
278             ('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
279             ], result._events)
280
281     def test_inprogress(self):
282         self.check_event(CONSTANT_INPROGRESS, 'inprogress')
283
284     def test_success(self):
285         self.check_event(CONSTANT_SUCCESS, 'success')
286
287     def test_uxsuccess(self):
288         self.check_event(CONSTANT_UXSUCCESS, 'uxsuccess')
289
290     def test_skip(self):
291         self.check_event(CONSTANT_SKIP, 'skip')
292
293     def test_fail(self):
294         self.check_event(CONSTANT_FAIL, 'fail')
295
296     def test_xfail(self):
297         self.check_event(CONSTANT_XFAIL, 'xfail')
298
299     def check_events(self, source_bytes, events):
300         source = BytesIO(source_bytes)
301         result = StreamResult()
302         subunit.ByteStreamToStreamResult(
303             source, non_subunit_name="stdout").run(result)
304         self.assertEqual(b'', source.read())
305         self.assertEqual(events, result._events)
306
307     def check_event(self, source_bytes, test_status=None, test_id="foo",
308         route_code=None, timestamp=None, tags=None, mime_type=None,
309         file_name=None, file_bytes=None, eof=False, runnable=True):
310         event = self._event(test_id=test_id, test_status=test_status,
311             tags=tags, runnable=runnable, file_name=file_name,
312             file_bytes=file_bytes, eof=eof, mime_type=mime_type,
313             route_code=route_code, timestamp=timestamp)
314         self.check_events(source_bytes, [event])
315
316     def _event(self, test_status=None, test_id=None, route_code=None,
317         timestamp=None, tags=None, mime_type=None, file_name=None,
318         file_bytes=None, eof=False, runnable=True):
319         return ('status', test_id, test_status, tags, runnable, file_name,
320             file_bytes, eof, mime_type, route_code, timestamp)
321
322     def test_eof(self):
323         self.check_event(CONSTANT_EOF, test_id=None, eof=True)
324
325     def test_file_content(self):
326         self.check_event(CONSTANT_FILE_CONTENT,
327             test_id=None, file_name="barney", file_bytes=b"woo")
328
329     def test_file_content_length_into_checksum(self):
330         # A bad file content length which creeps into the checksum.
331         bad_file_length_content = b'\xb3!@\x13\x06barney\x04woo\xdc\xe2\xdb\x35'
332         self.check_events(bad_file_length_content, [
333             self._event(test_id="subunit.parser", eof=True,
334                 file_name="Packet data", file_bytes=bad_file_length_content),
335             self._event(test_id="subunit.parser", test_status="fail", eof=True,
336                 file_name="Parser Error",
337                 file_bytes=b"File content extends past end of packet: claimed 4 bytes, 3 available"),
338             ])
339
340     def test_packet_length_4_word_varint(self):
341         packet_data = b'\xb3!@\xc0\x00\x11'
342         self.check_events(packet_data, [
343             self._event(test_id="subunit.parser", eof=True,
344                 file_name="Packet data", file_bytes=packet_data),
345             self._event(test_id="subunit.parser", test_status="fail", eof=True,
346                 file_name="Parser Error",
347                 file_bytes=b"3 byte maximum given but 4 byte value found."),
348             ])
349
350     def test_mime(self):
351         self.check_event(CONSTANT_MIME,
352             test_id=None, mime_type='application/foo; charset=1')
353
354     def test_route_code(self):
355         self.check_event(CONSTANT_ROUTE_CODE,
356             'success', route_code="source", test_id="bar")
357
358     def test_runnable(self):
359         self.check_event(CONSTANT_RUNNABLE,
360             test_status='success', runnable=False)
361
362     def test_tags(self):
363         self.check_event(CONSTANT_TAGS[0],
364             None, tags=set(['foo', 'bar']), test_id="bar")
365
366     def test_timestamp(self):
367         timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
368             iso8601.Utc())
369         self.check_event(CONSTANT_TIMESTAMP,
370             'success', test_id='bar', timestamp=timestamp)
371
372     def test_bad_crc_errors_via_status(self):
373         file_bytes = CONSTANT_MIME[:-1] + b'\x00'
374         self.check_events( file_bytes, [
375             self._event(test_id="subunit.parser", eof=True,
376                 file_name="Packet data", file_bytes=file_bytes),
377             self._event(test_id="subunit.parser", test_status="fail", eof=True,
378                 file_name="Parser Error",
379                 file_bytes=b'Bad checksum - calculated (0x78335115), '
380                     b'stored (0x78335100)'),
381             ])
382
383     def test_not_utf8_in_string(self):
384         file_bytes = CONSTANT_ROUTE_CODE[:5] + b'\xb4' + CONSTANT_ROUTE_CODE[6:-4] + b'\xce\x56\xc6\x17'
385         self.check_events(file_bytes, [
386             self._event(test_id="subunit.parser", eof=True,
387                 file_name="Packet data", file_bytes=file_bytes),
388             self._event(test_id="subunit.parser", test_status="fail", eof=True,
389                 file_name="Parser Error",
390                 file_bytes=b'UTF8 string at offset 2 is not UTF8'),
391             ])
392
393     def test_NULL_in_string(self):
394         file_bytes = CONSTANT_ROUTE_CODE[:6] + b'\x00' + CONSTANT_ROUTE_CODE[7:-4] + b'\xd7\x41\xac\xfe'
395         self.check_events(file_bytes, [
396             self._event(test_id="subunit.parser", eof=True,
397                 file_name="Packet data", file_bytes=file_bytes),
398             self._event(test_id="subunit.parser", test_status="fail", eof=True,
399                 file_name="Parser Error",
400                 file_bytes=b'UTF8 string at offset 2 contains NUL byte'),
401             ])
402
403     def test_bad_utf8_stringlength(self):
404         file_bytes = CONSTANT_ROUTE_CODE[:4] + b'\x3f' + CONSTANT_ROUTE_CODE[5:-4] + b'\xbe\x29\xe0\xc2'
405         self.check_events(file_bytes, [
406             self._event(test_id="subunit.parser", eof=True,
407                 file_name="Packet data", file_bytes=file_bytes),
408             self._event(test_id="subunit.parser", test_status="fail", eof=True,
409                 file_name="Parser Error",
410                 file_bytes=b'UTF8 string at offset 2 extends past end of '
411                     b'packet: claimed 63 bytes, 10 available'),
412             ])
413
414     def test_route_code_and_file_content(self):
415         content = BytesIO()
416         subunit.StreamResultToBytes(content).status(
417             route_code='0', mime_type='text/plain', file_name='bar',
418             file_bytes=b'foo')
419         self.check_event(content.getvalue(), test_id=None, file_name='bar',
420             route_code='0', mime_type='text/plain', file_bytes=b'foo')