import subprocess
import sys
import unittest
-if sys.version_info > (3, 0):
+try:
from io import UnsupportedOperation as _UnsupportedOperation
-else:
+except ImportError:
_UnsupportedOperation = AttributeError
-
+from extras import safe_hasattr
from testtools import content, content_type, ExtendedToOriginalDecorator
from testtools.content import TracebackContent
from testtools.compat import _b, _u, BytesIO, StringIO
except ImportError:
raise ImportError ("testtools.testresult.real does not contain "
"_StringException, check your version.")
-from testtools import testresult
+from testtools import testresult, CopyStreamResult
from subunit import chunked, details, iso8601, test_results
+from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes
+
+# same format as sys.version_info: "A tuple containing the five components of
+# the version number: major, minor, micro, releaselevel, and serial. All
+# values except releaselevel are integers; the release level is 'alpha',
+# 'beta', 'candidate', or 'final'. The version_info value corresponding to the
+# Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a
+# releaselevel of 'dev' for unreleased under-development code.
+#
+# If the releaselevel is 'alpha' then the major/minor/micro components are not
+# established at this point, and setup.py will use a version of next-$(revno).
+# If the releaselevel is 'final', then the tarball will be major.minor.micro.
+# Otherwise it is major.minor.micro~$(revno).
+__version__ = (0, 0, 13, 'final', 0)
PROGRESS_SET = 0
PROGRESS_CUR = 1
def __init__(self, stream):
testresult.TestResult.__init__(self)
- stream = _make_stream_binary(stream)
+ stream = make_stream_binary(stream)
self._stream = stream
self._progress_fmt = _b("progress: ")
self._bytes_eol = _b("\n")
protocol = TestProtocolServer(result)
process = subprocess.Popen(self.script, shell=True,
stdout=subprocess.PIPE)
- _make_stream_binary(process.stdout)
+ make_stream_binary(process.stdout)
output = process.communicate()[0]
protocol.readFrom(BytesIO(output))
return result
-def TAP2SubUnit(tap, subunit):
+def TAP2SubUnit(tap, output_stream):
"""Filter a TAP pipe into a subunit pipe.
- :param tap: A tap pipe/stream/file object.
+ This should be invoked once per TAP script, as TAP scripts get
+ mapped to a single runnable case with multiple components.
+
+ :param tap: A tap pipe/stream/file object - should emit unicode strings.
:param subunit: A pipe/stream/file object to write subunit results to.
:return: The exit code to exit with.
"""
+ output = StreamResultToBytes(output_stream)
+ UTF8_TEXT = 'text/plain; charset=UTF8'
BEFORE_PLAN = 0
AFTER_PLAN = 1
SKIP_STREAM = 2
state = BEFORE_PLAN
plan_start = 1
plan_stop = 0
- def _skipped_test(subunit, plan_start):
- # Some tests were skipped.
- subunit.write('test test %d\n' % plan_start)
- subunit.write('error test %d [\n' % plan_start)
- subunit.write('test missing from TAP output\n')
- subunit.write(']\n')
- return plan_start + 1
# Test data for the next test to emit
test_name = None
log = []
result = None
+ def missing_test(plan_start):
+ output.status(test_id='test %d' % plan_start,
+ test_status='fail', runnable=False,
+ mime_type=UTF8_TEXT, eof=True, file_name="tap meta",
+ file_bytes=b"test missing from TAP output")
def _emit_test():
"write out a test"
if test_name is None:
return
- subunit.write("test %s\n" % test_name)
- if not log:
- subunit.write("%s %s\n" % (result, test_name))
- else:
- subunit.write("%s %s [\n" % (result, test_name))
if log:
- for line in log:
- subunit.write("%s\n" % line)
- subunit.write("]\n")
+ log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log)
+ mime_type = UTF8_TEXT
+ file_name = 'tap comment'
+ eof = True
+ else:
+ log_bytes = None
+ mime_type = None
+ file_name = None
+ eof = True
del log[:]
+ output.status(test_id=test_name, test_status=result,
+ file_bytes=log_bytes, mime_type=mime_type, eof=eof,
+ file_name=file_name, runnable=False)
for line in tap:
if state == BEFORE_PLAN:
match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
if plan_start > plan_stop and plan_stop == 0:
# skipped file
state = SKIP_STREAM
- subunit.write("test file skip\n")
- subunit.write("skip file skip [\n")
- subunit.write("%s\n" % comment)
- subunit.write("]\n")
+ output.status(test_id='file skip', test_status='skip',
+ file_bytes=comment.encode('utf8'), eof=True,
+ file_name='tap comment')
continue
# not a plan line, or have seen one before
match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
if status == 'ok':
result = 'success'
else:
- result = "failure"
+ result = "fail"
if description is None:
description = ''
else:
if number is not None:
number = int(number)
while plan_start < number:
- plan_start = _skipped_test(subunit, plan_start)
+ missing_test(plan_start)
+ plan_start += 1
test_name = "test %d%s" % (plan_start, description)
plan_start += 1
continue
extra = ' %s' % reason
_emit_test()
test_name = "Bail out!%s" % extra
- result = "error"
+ result = "fail"
state = SKIP_STREAM
continue
match = re.match("\#.*\n", line)
if match:
log.append(line[:-1])
continue
- subunit.write(line)
+ # Should look at buffering status and binding this to the prior result.
+ output.status(file_bytes=line.encode('utf8'), file_name='stdout',
+ mime_type=UTF8_TEXT)
_emit_test()
while plan_start <= plan_stop:
# record missed tests
- plan_start = _skipped_test(subunit, plan_start)
+ missing_test(plan_start)
+ plan_start += 1
return 0
:return: 0
"""
new_tags, gone_tags = tags_to_new_gone(tags)
- def write_tags(new_tags, gone_tags):
- if new_tags or gone_tags:
- filtered.write("tags: " + ' '.join(new_tags))
- if gone_tags:
- for tag in gone_tags:
- filtered.write("-" + tag)
- filtered.write("\n")
- write_tags(new_tags, gone_tags)
- # TODO: use the protocol parser and thus don't mangle test comments.
- for line in original:
- if line.startswith("tags:"):
- line_tags = line[5:].split()
- line_new, line_gone = tags_to_new_gone(line_tags)
- line_new = line_new - gone_tags
- line_gone = line_gone - new_tags
- write_tags(line_new, line_gone)
- else:
- filtered.write(line)
+ source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
+ class Tagger(CopyStreamResult):
+ def status(self, **kwargs):
+ tags = kwargs.get('test_tags')
+ if not tags:
+ tags = set()
+ tags.update(new_tags)
+ tags.difference_update(gone_tags)
+ if tags:
+ kwargs['test_tags'] = tags
+ else:
+ kwargs['test_tags'] = None
+ super(Tagger, self).status(**kwargs)
+ output = Tagger([StreamResultToBytes(filtered)])
+ source.run(output)
return 0
:param forward: A stream to pass subunit input on to. If not supplied
subunit input is not forwarded.
"""
- stream = _make_stream_binary(stream)
+ stream = make_stream_binary(stream)
self._stream = stream
self._passthrough = passthrough
if forward is not None:
- forward = _make_stream_binary(forward)
+ forward = make_stream_binary(forward)
self._forward = forward
def __call__(self, result=None):
else:
stream = sys.stdout
if sys.version_info > (3, 0):
- stream = stream.buffer
+ if safe_hasattr(stream, 'buffer'):
+ stream = stream.buffer
return stream
f.close()
-def _make_stream_binary(stream):
+def make_stream_binary(stream):
"""Ensure that a stream will be binary safe. See _make_binary_on_windows.
:return: A binary version of the same stream (some streams cannot be
"""
try:
fileno = stream.fileno()
- except _UnsupportedOperation:
+ except (_UnsupportedOperation, AttributeError):
pass
else:
_make_binary_on_windows(fileno)
return _unwrap_text(stream)
+
def _make_binary_on_windows(fileno):
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
if sys.platform == "win32":
def _unwrap_text(stream):
"""Unwrap stream if it is a text stream to get the original buffer."""
if sys.version_info > (3, 0):
+ unicode_type = str
+ else:
+ unicode_type = unicode
+ try:
+ # Read streams
+ if type(stream.read(0)) is unicode_type:
+ return stream.buffer
+ except (_UnsupportedOperation, IOError):
+ # Cannot read from the stream: try via writes
try:
- # Read streams
- if type(stream.read(0)) is str:
- return stream.buffer
- except (_UnsupportedOperation, IOError):
- # Cannot read from the stream: try via writes
- try:
- stream.write(_b(''))
- except TypeError:
- return stream.buffer
+ stream.write(_b(''))
+ except TypeError:
+ return stream.buffer
return stream