$ python -m subunit.run mylib.tests.test_suite
"""
+import os
import sys
from testtools import ExtendedToStreamDecorator
if __name__ == '__main__':
+ # Disable the default buffering, for Python 2.x where pdb doesn't do it
+ # on non-ttys.
+ sys.stdout = os.fdopen(sys.stdout.fileno(), 'ab', 0)
stream = get_default_formatter()
runner = SubunitTestRunner
SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner,
import datetime
from io import UnsupportedOperation
+import os
import select
import struct
import zlib
import subunit.iso8601 as iso8601
__all__ = [
+ 'ByteStreamToStreamResult',
'StreamResultToBytes',
]
# Won't be able to select, fallback to
# one-byte-at-a-time.
break
- readable = select.select([self.source], [], [], 0.050)[0]
+ # Note: this has a very low timeout because with stdin, the
+ # BufferedIO layer typically has all the content available
+ # from the stream when e.g. pdb is dropped into, leading to
+ # select always timing out when in fact we could have read
+ # (from the buffer layer) - we typically fail to aggregate
+ # any content on 3.x Pythons.
+ readable = select.select([self.source], [], [], 0.000001)[0]
if readable:
- buffered.append(self.source.read(1))
+ content = self.source.read(1)
+ if len(content) and content[0] != SIGNATURE[0]:
+ buffered.append(content)
+ else:
+ # EOF or we hit a new packet.
+ break
if not readable or len(buffered) >= 1048576:
break
result.status(
file_name=self.non_subunit_name,
file_bytes=b''.join(buffered))
- continue
+ if not len(content) or content[0] != SIGNATURE[0]:
+ continue
+ # Fall through to process the packet whose first byte is in
+ # content.
try:
packet = [SIGNATURE]
self._parse(packet, result)