2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2013 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
18 utf_8_decode = codecs.utf_8_decode
20 from io import UnsupportedOperation
26 from extras import safe_hasattr, try_imports
27 builtins = try_imports(['__builtin__', 'builtins'])
30 import subunit.iso8601 as iso8601
33 'ByteStreamToStreamResult',
34 'StreamResultToBytes',
44 FLAG_ROUTE_CODE = 0x0400
45 FLAG_TIMESTAMP = 0x0200
46 FLAG_RUNNABLE = 0x0100
48 FLAG_MIME_TYPE = 0x0020
50 FLAG_FILE_CONTENT = 0x0040
51 EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=iso8601.Utc())
52 NUL_ELEMENT = b'\0'[0]
55 class ParseError(Exception):
56 """Used to pass error messages within the parser."""
59 class StreamResultToBytes(object):
60 """Convert StreamResult API calls to bytes.
62 The StreamResult API is defined by testtools.StreamResult.
78 def __init__(self, output_stream):
79 """Create a StreamResultToBytes with output written to output_stream.
81 :param output_stream: A file-like object. Must support write(bytes)
82 and flush() methods. Flush will be called after each write.
83 The stream will be passed through subunit.make_stream_binary,
84 to handle regular cases such as stdout.
86 self.output_stream = subunit.make_stream_binary(output_stream)
88 def startTestRun(self):
91 def stopTestRun(self):
94 def status(self, test_id=None, test_status=None, test_tags=None,
95 runnable=True, file_name=None, file_bytes=None, eof=False,
96 mime_type=None, route_code=None, timestamp=None):
97 self._write_packet(test_id=test_id, test_status=test_status,
98 test_tags=test_tags, runnable=runnable, file_name=file_name,
99 file_bytes=file_bytes, eof=eof, mime_type=mime_type,
100 route_code=route_code, timestamp=timestamp)
102 def _write_utf8(self, a_string, packet):
103 utf8 = a_string.encode('utf-8')
104 self._write_number(len(utf8), packet)
107 def _write_len16(self, length, packet):
108 assert length < 65536
109 packet.append(struct.pack(FMT_16, length))
111 def _write_number(self, value, packet):
112 packet.extend(self._encode_number(value))
114 def _encode_number(self, value):
117 return [struct.pack(FMT_8, value)]
119 value = value | 0x4000
120 return [struct.pack(FMT_16, value)]
121 elif value < 4194304:
122 value = value | 0x800000
123 return [struct.pack(FMT_16, value >> 8),
124 struct.pack(FMT_8, value & 0xff)]
125 elif value < 1073741824:
126 value = value | 0xc0000000
127 return [struct.pack(FMT_32, value)]
129 raise ValueError('value too large to encode: %r' % (value,))
131 def _write_packet(self, test_id=None, test_status=None, test_tags=None,
132 runnable=True, file_name=None, file_bytes=None, eof=False,
133 mime_type=None, route_code=None, timestamp=None):
135 packet.append(b'FF') # placeholder for flags
136 # placeholder for length, but see below as length is variable.
138 flags = 0x2000 # Version 0x2
139 if timestamp is not None:
140 flags = flags | FLAG_TIMESTAMP
141 since_epoch = timestamp - EPOCH
142 nanoseconds = since_epoch.microseconds * 1000
143 seconds = (since_epoch.seconds + since_epoch.days * 24 * 3600)
144 packet.append(struct.pack(FMT_32, seconds))
145 self._write_number(nanoseconds, packet)
146 if test_id is not None:
147 flags = flags | FLAG_TEST_ID
148 self._write_utf8(test_id, packet)
150 flags = flags | FLAG_TAGS
151 self._write_number(len(test_tags), packet)
152 for tag in test_tags:
153 self._write_utf8(tag, packet)
155 flags = flags | FLAG_RUNNABLE
157 flags = flags | FLAG_MIME_TYPE
158 self._write_utf8(mime_type, packet)
159 if file_name is not None:
160 flags = flags | FLAG_FILE_CONTENT
161 self._write_utf8(file_name, packet)
162 self._write_number(len(file_bytes), packet)
163 packet.append(file_bytes)
165 flags = flags | FLAG_EOF
166 if route_code is not None:
167 flags = flags | FLAG_ROUTE_CODE
168 self._write_utf8(route_code, packet)
169 # 0x0008 - not used in v2.
170 flags = flags | self.status_mask[test_status]
171 packet[1] = struct.pack(FMT_16, flags)
172 base_length = sum(map(len, packet)) + 4
173 if base_length <= 62:
174 # one byte to encode length, 62+1 = 63
176 elif base_length <= 16381:
177 # two bytes to encode length, 16381+2 = 16383
179 elif base_length <= 4194300:
180 # three bytes to encode length, 419430+3=4194303
183 # Longer than policy:
184 # TODO: chunk the packet automatically?
185 # - strip all but file data
186 # - do 4M chunks of that till done
187 # - include original data in final chunk.
188 raise ValueError("Length too long: %r" % base_length)
189 packet[2:3] = self._encode_number(base_length + length_length)
190 # We could either do a partial application of crc32 over each chunk
191 # or a single join to a temp variable then a final join
192 # or two writes (that python might then split).
193 # For now, simplest code: join, crc32, join, output
194 content = b''.join(packet)
195 self.output_stream.write(content + struct.pack(
196 FMT_32, zlib.crc32(content) & 0xffffffff))
197 self.output_stream.flush()
200 class ByteStreamToStreamResult(object):
201 """Parse a subunit byte stream.
203 Mixed streams that contain non-subunit content is supported when a
204 non_subunit_name is passed to the contructor. The default is to raise an
205 error containing the non-subunit byte after it has been read from the
210 >>> case = ByteStreamToStreamResult(sys.stdin.buffer)
211 >>> result = StreamResult()
212 >>> result.startTestRun()
214 >>> result.stopTestRun()
228 def __init__(self, source, non_subunit_name=None):
229 """Create a ByteStreamToStreamResult.
231 :param source: A file like object to read bytes from. Must support
232 read(<count>) and return bytes. The file is not closed by
233 ByteStreamToStreamResult. subunit.make_stream_binary() is
234 called on the stream to get it into bytes mode.
235 :param non_subunit_name: If set to non-None, non subunit content
236 encountered in the stream will be converted into file packets
237 labelled with this name.
239 self.non_subunit_name = non_subunit_name
240 self.source = subunit.make_stream_binary(source)
241 self.codec = codecs.lookup('utf8').incrementaldecoder()
243 def run(self, result):
244 """Parse source and emit events to result.
246 This is a blocking call: it will run until EOF is detected on source.
249 mid_character = False
251 # We're in blocking mode; read one char
252 content = self.source.read(1)
256 if not mid_character and content[0] == SIGNATURE[0]:
257 self._parse_packet(result)
259 if self.non_subunit_name is None:
260 raise Exception("Non subunit content", content)
262 if self.codec.decode(content):
264 mid_character = False
267 except UnicodeDecodeError:
268 # Bad unicode, not our concern.
269 mid_character = False
270 # Aggregate all content that is not subunit until either
271 # 1MiB is accumulated or 50ms has passed with no input.
272 # Both are arbitrary amounts intended to give a simple
273 # balance between efficiency (avoiding death by a thousand
274 # one-byte packets), buffering (avoiding overlarge state
275 # being hidden on intermediary nodes) and interactivity
276 # (when driving a debugger, slow response to typing is
279 while len(buffered[-1]):
283 # Won't be able to select, fallback to
284 # one-byte-at-a-time.
286 # Note: this has a very low timeout because with stdin, the
287 # BufferedIO layer typically has all the content available
288 # from the stream when e.g. pdb is dropped into, leading to
289 # select always timing out when in fact we could have read
290 # (from the buffer layer) - we typically fail to aggregate
291 # any content on 3.x Pythons.
292 readable = select.select([self.source], [], [], 0.000001)[0]
294 content = self.source.read(1)
296 # EOF - break and emit buffered.
298 if not mid_character and content[0] == SIGNATURE[0]:
299 # New packet, break, emit buffered, then parse.
301 buffered.append(content)
302 # Feed into the codec.
304 if self.codec.decode(content):
306 mid_character = False
309 except UnicodeDecodeError:
310 # Bad unicode, not our concern.
311 mid_character = False
312 if not readable or len(buffered) >= 1048576:
313 # timeout or too much data, emit what we have.
316 file_name=self.non_subunit_name,
317 file_bytes=b''.join(buffered))
318 if mid_character or not len(content) or content[0] != SIGNATURE[0]:
320 # Otherwise, parse a data packet.
321 self._parse_packet(result)
323 def _parse_packet(self, result):
326 self._parse(packet, result)
327 except ParseError as error:
328 result.status(test_id="subunit.parser", eof=True,
329 file_name="Packet data", file_bytes=b''.join(packet))
330 result.status(test_id="subunit.parser", test_status='fail',
331 eof=True, file_name="Parser Error",
332 file_bytes=(error.args[0]).encode('utf8'))
334 def _to_bytes(self, data, pos, length):
335 """Return a slice of data from pos for length as bytes."""
336 # memoryview in 2.7.3 and 3.2 isn't directly usable with struct :(.
337 # see https://bugs.launchpad.net/subunit/+bug/1216163
338 result = data[pos:pos+length]
339 if type(result) is not bytes:
340 return result.tobytes()
343 def _parse_varint(self, data, pos, max_3_bytes=False):
344 # because the only incremental IO we do is at the start, and the 32 bit
345 # CRC means we can always safely read enough to cover any varint, we
346 # can be sure that there should be enough data - and if not it is an
347 # error not a normal situation.
348 data_0 = struct.unpack(FMT_8, self._to_bytes(data, pos, 1))[0]
349 typeenum = data_0 & 0xc0
350 value_0 = data_0 & 0x3f
353 elif typeenum == 0x40:
354 data_1 = struct.unpack(FMT_8, self._to_bytes(data, pos+1, 1))[0]
355 return (value_0 << 8) | data_1, 2
356 elif typeenum == 0x80:
357 data_1 = struct.unpack(FMT_16, self._to_bytes(data, pos+1, 2))[0]
358 return (value_0 << 16) | data_1, 3
361 raise ParseError('3 byte maximum given but 4 byte value found.')
362 data_1, data_2 = struct.unpack(FMT_24, self._to_bytes(data, pos+1, 3))
363 result = (value_0 << 24) | data_1 << 8 | data_2
366 def _parse(self, packet, result):
367 # 2 bytes flags, at most 3 bytes length.
368 packet.append(self.source.read(5))
369 flags = struct.unpack(FMT_16, packet[-1][:2])[0]
370 length, consumed = self._parse_varint(
371 packet[-1], 2, max_3_bytes=True)
372 remainder = self.source.read(length - 6)
373 if len(remainder) != length - 6:
375 'Short read - got %d bytes, wanted %d bytes' % (
376 len(remainder), length - 6))
378 # Avoid having to parse torn values
379 packet[-1] += remainder
382 # Avoid copying potentially lots of data.
383 packet.append(remainder)
385 crc = zlib.crc32(packet[0])
386 for fragment in packet[1:-1]:
387 crc = zlib.crc32(fragment, crc)
388 crc = zlib.crc32(packet[-1][:-4], crc) & 0xffffffff
389 packet_crc = struct.unpack(FMT_32, packet[-1][-4:])[0]
390 if crc != packet_crc:
391 # Bad CRC, report it and stop parsing the packet.
393 'Bad checksum - calculated (0x%x), stored (0x%x)'
395 if safe_hasattr(builtins, 'memoryview'):
396 body = memoryview(packet[-1])
401 # One packet could have both file and status data; the Python API
402 # presents these separately (perhaps it shouldn't?)
403 if flags & FLAG_TIMESTAMP:
404 seconds = struct.unpack(FMT_32, self._to_bytes(body, pos, 4))[0]
405 nanoseconds, consumed = self._parse_varint(body, pos+4)
406 pos = pos + 4 + consumed
407 timestamp = EPOCH + datetime.timedelta(
408 seconds=seconds, microseconds=nanoseconds/1000)
411 if flags & FLAG_TEST_ID:
412 test_id, pos = self._read_utf8(body, pos)
415 if flags & FLAG_TAGS:
416 tag_count, consumed = self._parse_varint(body, pos)
419 for _ in range(tag_count):
420 tag, pos = self._read_utf8(body, pos)
424 if flags & FLAG_MIME_TYPE:
425 mime_type, pos = self._read_utf8(body, pos)
428 if flags & FLAG_FILE_CONTENT:
429 file_name, pos = self._read_utf8(body, pos)
430 content_length, consumed = self._parse_varint(body, pos)
432 file_bytes = self._to_bytes(body, pos, content_length)
433 if len(file_bytes) != content_length:
434 raise ParseError('File content extends past end of packet: '
435 'claimed %d bytes, %d available' % (
436 content_length, len(file_bytes)))
437 pos += content_length
441 if flags & FLAG_ROUTE_CODE:
442 route_code, pos = self._read_utf8(body, pos)
445 runnable = bool(flags & FLAG_RUNNABLE)
446 eof = bool(flags & FLAG_EOF)
447 test_status = self.status_lookup[flags & 0x0007]
448 result.status(test_id=test_id, test_status=test_status,
449 test_tags=test_tags, runnable=runnable, mime_type=mime_type,
450 eof=eof, file_name=file_name, file_bytes=file_bytes,
451 route_code=route_code, timestamp=timestamp)
454 def _read_utf8(self, buf, pos):
455 length, consumed = self._parse_varint(buf, pos)
457 utf8_bytes = buf[pos:pos+length]
458 if length != len(utf8_bytes):
460 'UTF8 string at offset %d extends past end of packet: '
461 'claimed %d bytes, %d available' % (pos - 2, length,
463 if NUL_ELEMENT in utf8_bytes:
464 raise ParseError('UTF8 string at offset %d contains NUL byte' % (
467 utf8, decoded_bytes = utf_8_decode(utf8_bytes)
468 if decoded_bytes != length:
469 raise ParseError("Invalid (partially decodable) string at "
470 "offset %d, %d undecoded bytes" % (
471 pos-2, length - decoded_bytes))
472 return utf8, length+pos
473 except UnicodeDecodeError:
474 raise ParseError('UTF8 string at offset %d is not UTF8' % (pos-2,))