Remove bundled subunit.
[nivanova/samba-autobuild/.git] / lib / subunit / python / subunit / __init__.py
diff --git a/lib/subunit/python/subunit/__init__.py b/lib/subunit/python/subunit/__init__.py
deleted file mode 100644 (file)
index 42dcf29..0000000
+++ /dev/null
@@ -1,1314 +0,0 @@
-#
-#  subunit: extensions to Python unittest to get test results from subprocesses.
-#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
-#
-#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-#  license at the users choice. A copy of both licenses are available in the
-#  project source as Apache-2.0 and BSD. You may not use this file except in
-#  compliance with one of these two licences.
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
-#  license you chose for the specific language governing permissions and
-#  limitations under that license.
-#
-
-"""Subunit - a streaming test protocol
-
-Overview
-++++++++
-
-The ``subunit`` Python package provides a number of ``unittest`` extensions
-which can be used to cause tests to output Subunit, to parse Subunit streams
-into test activity, perform seamless test isolation within a regular test
-case and variously sort, filter and report on test runs.
-
-
-Key Classes
------------
-
-The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
-extension which will translate a test run into a Subunit stream.
-
-The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
-protocol and the ``unittest.TestCase`` object protocol. It is used to translate
-a stream into a test run, which regular ``unittest.TestResult`` objects can
-process and report/inspect.
-
-Subunit has support for non-blocking usage too, for use with asyncore or
-Twisted. See the ``TestProtocolServer`` parser class for more details.
-
-Subunit includes extensions to the Python ``TestResult`` protocol. These are
-all done in a compatible manner: ``TestResult`` objects that do not implement
-the extension methods will not cause errors to be raised, instead the extension
-will either lose fidelity (for instance, folding expected failures to success
-in Python versions < 2.7 or 3.1), or discard the extended data (for extra
-details, tags, timestamping and progress markers).
-
-The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
-``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
-which can be used instead of the usual python unittest parameter.
-When used the value of details should be a dict from ``string`` to
-``testtools.content.Content`` objects. This is a draft API being worked on with
-the Python Testing In Python mail list, with the goal of permitting a common
-way to provide additional data beyond a traceback, such as captured data from
-disk, logging messages etc. The reference for this API is in testtools (0.9.0
-and newer).
-
-The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
-remove tags in the test run that is currently executing. If called when no
-test is in progress (that is, if called outside of the ``startTest``,
-``stopTest`` pair), the the tags apply to all subsequent tests. If called
-when a test is in progress, then the tags only apply to that test.
-
-The ``time(a_datetime)`` method is called (if present) when a ``time:``
-directive is encountered in a Subunit stream. This is used to tell a TestResult
-about the time that events in the stream occurred at, to allow reconstructing
-test timing from a stream.
-
-The ``progress(offset, whence)`` method controls progress data for a stream.
-The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
-subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
-ignore the offset parameter.
-
-
-Python test support
--------------------
-
-``subunit.run`` is a convenience wrapper to run a Python test suite via
-the command line, reporting via Subunit::
-
-  $ python -m subunit.run mylib.tests.test_suite
-
-The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
-tests, allowing isolation between the test runner and some tests.
-
-Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
-tests that will fork() before that individual test is run.
-
-`ExecTestCase`` is a convenience wrapper for running an external
-program to get a Subunit stream and then report that back to an arbitrary
-result object::
-
- class AggregateTests(subunit.ExecTestCase):
-
-     def test_script_one(self):
-         './bin/script_one'
-
-     def test_script_two(self):
-         './bin/script_two'
-
- # Normally your normal test loading would take of this automatically,
- # It is only spelt out in detail here for clarity.
- suite = unittest.TestSuite([AggregateTests("test_script_one"),
-     AggregateTests("test_script_two")])
- # Create any TestResult class you like.
- result = unittest._TextTestResult(sys.stdout)
- # And run your suite as normal, Subunit will exec each external script as
- # needed and report to your result object.
- suite.run(result)
-
-Utility modules
----------------
-
-* subunit.chunked contains HTTP chunked encoding/decoding logic.
-* subunit.test_results contains TestResult helper classes.
-"""
-
-import os
-import re
-import subprocess
-import sys
-import unittest
-if sys.version_info > (3, 0):
-    from io import UnsupportedOperation as _UnsupportedOperation
-else:
-    _UnsupportedOperation = AttributeError
-
-
-from testtools import content, content_type, ExtendedToOriginalDecorator
-from testtools.content import TracebackContent
-from testtools.compat import _b, _u, BytesIO, StringIO
-try:
-    from testtools.testresult.real import _StringException
-    RemoteException = _StringException
-    # For testing: different pythons have different str() implementations.
-    if sys.version_info > (3, 0):
-        _remote_exception_str = "testtools.testresult.real._StringException"
-        _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
-    else:
-        _remote_exception_str = "_StringException" 
-        _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
-except ImportError:
-    raise ImportError ("testtools.testresult.real does not contain "
-        "_StringException, check your version.")
-from testtools import testresult
-
-from subunit import chunked, details, iso8601, test_results
-
-# same format as sys.version_info: "A tuple containing the five components of
-# the version number: major, minor, micro, releaselevel, and serial. All
-# values except releaselevel are integers; the release level is 'alpha',
-# 'beta', 'candidate', or 'final'. The version_info value corresponding to the
-# Python version 2.0 is (2, 0, 0, 'final', 0)."  Additionally we use a
-# releaselevel of 'dev' for unreleased under-development code.
-#
-# If the releaselevel is 'alpha' then the major/minor/micro components are not
-# established at this point, and setup.py will use a version of next-$(revno).
-# If the releaselevel is 'final', then the tarball will be major.minor.micro.
-# Otherwise it is major.minor.micro~$(revno).
-
-__version__ = (0, 0, 9, 'final', 0)
-
-PROGRESS_SET = 0
-PROGRESS_CUR = 1
-PROGRESS_PUSH = 2
-PROGRESS_POP = 3
-
-
-def test_suite():
-    import subunit.tests
-    return subunit.tests.test_suite()
-
-
-def join_dir(base_path, path):
-    """
-    Returns an absolute path to C{path}, calculated relative to the parent
-    of C{base_path}.
-
-    @param base_path: A path to a file or directory.
-    @param path: An absolute path, or a path relative to the containing
-    directory of C{base_path}.
-
-    @return: An absolute path to C{path}.
-    """
-    return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
-
-
-def tags_to_new_gone(tags):
-    """Split a list of tags into a new_set and a gone_set."""
-    new_tags = set()
-    gone_tags = set()
-    for tag in tags:
-        if tag[0] == '-':
-            gone_tags.add(tag[1:])
-        else:
-            new_tags.add(tag)
-    return new_tags, gone_tags
-
-
-class DiscardStream(object):
-    """A filelike object which discards what is written to it."""
-
-    def fileno(self):
-        raise _UnsupportedOperation()
-
-    def write(self, bytes):
-        pass
-
-    def read(self, len=0):
-        return _b('')
-
-
-class _ParserState(object):
-    """State for the subunit parser."""
-
-    def __init__(self, parser):
-        self.parser = parser
-        self._test_sym = (_b('test'), _b('testing'))
-        self._colon_sym = _b(':')
-        self._error_sym = (_b('error'),)
-        self._failure_sym = (_b('failure'),)
-        self._progress_sym = (_b('progress'),)
-        self._skip_sym = _b('skip')
-        self._success_sym = (_b('success'), _b('successful'))
-        self._tags_sym = (_b('tags'),)
-        self._time_sym = (_b('time'),)
-        self._xfail_sym = (_b('xfail'),)
-        self._uxsuccess_sym = (_b('uxsuccess'),)
-        self._start_simple = _u(" [")
-        self._start_multipart = _u(" [ multipart")
-
-    def addError(self, offset, line):
-        """An 'error:' directive has been read."""
-        self.parser.stdOutLineReceived(line)
-
-    def addExpectedFail(self, offset, line):
-        """An 'xfail:' directive has been read."""
-        self.parser.stdOutLineReceived(line)
-
-    def addFailure(self, offset, line):
-        """A 'failure:' directive has been read."""
-        self.parser.stdOutLineReceived(line)
-
-    def addSkip(self, offset, line):
-        """A 'skip:' directive has been read."""
-        self.parser.stdOutLineReceived(line)
-
-    def addSuccess(self, offset, line):
-        """A 'success:' directive has been read."""
-        self.parser.stdOutLineReceived(line)
-
-    def lineReceived(self, line):
-        """a line has been received."""
-        parts = line.split(None, 1)
-        if len(parts) == 2 and line.startswith(parts[0]):
-            cmd, rest = parts
-            offset = len(cmd) + 1
-            cmd = cmd.rstrip(self._colon_sym)
-            if cmd in self._test_sym:
-                self.startTest(offset, line)
-            elif cmd in self._error_sym:
-                self.addError(offset, line)
-            elif cmd in self._failure_sym:
-                self.addFailure(offset, line)
-            elif cmd in self._progress_sym:
-                self.parser._handleProgress(offset, line)
-            elif cmd in self._skip_sym:
-                self.addSkip(offset, line)
-            elif cmd in self._success_sym:
-                self.addSuccess(offset, line)
-            elif cmd in self._tags_sym:
-                self.parser._handleTags(offset, line)
-                self.parser.subunitLineReceived(line)
-            elif cmd in self._time_sym:
-                self.parser._handleTime(offset, line)
-                self.parser.subunitLineReceived(line)
-            elif cmd in self._xfail_sym:
-                self.addExpectedFail(offset, line)
-            elif cmd in self._uxsuccess_sym:
-                self.addUnexpectedSuccess(offset, line)
-            else:
-                self.parser.stdOutLineReceived(line)
-        else:
-            self.parser.stdOutLineReceived(line)
-
-    def lostConnection(self):
-        """Connection lost."""
-        self.parser._lostConnectionInTest(_u('unknown state of '))
-
-    def startTest(self, offset, line):
-        """A test start command received."""
-        self.parser.stdOutLineReceived(line)
-
-
-class _InTest(_ParserState):
-    """State for the subunit parser after reading a test: directive."""
-
-    def _outcome(self, offset, line, no_details, details_state):
-        """An outcome directive has been read.
-
-        :param no_details: Callable to call when no details are presented.
-        :param details_state: The state to switch to for details
-            processing of this outcome.
-        """
-        test_name = line[offset:-1].decode('utf8')
-        if self.parser.current_test_description == test_name:
-            self.parser._state = self.parser._outside_test
-            self.parser.current_test_description = None
-            no_details()
-            self.parser.client.stopTest(self.parser._current_test)
-            self.parser._current_test = None
-            self.parser.subunitLineReceived(line)
-        elif self.parser.current_test_description + self._start_simple == \
-            test_name:
-            self.parser._state = details_state
-            details_state.set_simple()
-            self.parser.subunitLineReceived(line)
-        elif self.parser.current_test_description + self._start_multipart == \
-            test_name:
-            self.parser._state = details_state
-            details_state.set_multipart()
-            self.parser.subunitLineReceived(line)
-        else:
-            self.parser.stdOutLineReceived(line)
-
-    def _error(self):
-        self.parser.client.addError(self.parser._current_test,
-            details={})
-
-    def addError(self, offset, line):
-        """An 'error:' directive has been read."""
-        self._outcome(offset, line, self._error,
-            self.parser._reading_error_details)
-
-    def _xfail(self):
-        self.parser.client.addExpectedFailure(self.parser._current_test,
-            details={})
-
-    def addExpectedFail(self, offset, line):
-        """An 'xfail:' directive has been read."""
-        self._outcome(offset, line, self._xfail,
-            self.parser._reading_xfail_details)
-
-    def _uxsuccess(self):
-        self.parser.client.addUnexpectedSuccess(self.parser._current_test)
-
-    def addUnexpectedSuccess(self, offset, line):
-        """A 'uxsuccess:' directive has been read."""
-        self._outcome(offset, line, self._uxsuccess,
-            self.parser._reading_uxsuccess_details)
-
-    def _failure(self):
-        self.parser.client.addFailure(self.parser._current_test, details={})
-
-    def addFailure(self, offset, line):
-        """A 'failure:' directive has been read."""
-        self._outcome(offset, line, self._failure,
-            self.parser._reading_failure_details)
-
-    def _skip(self):
-        self.parser.client.addSkip(self.parser._current_test, details={})
-
-    def addSkip(self, offset, line):
-        """A 'skip:' directive has been read."""
-        self._outcome(offset, line, self._skip,
-            self.parser._reading_skip_details)
-
-    def _succeed(self):
-        self.parser.client.addSuccess(self.parser._current_test, details={})
-
-    def addSuccess(self, offset, line):
-        """A 'success:' directive has been read."""
-        self._outcome(offset, line, self._succeed,
-            self.parser._reading_success_details)
-
-    def lostConnection(self):
-        """Connection lost."""
-        self.parser._lostConnectionInTest(_u(''))
-
-
-class _OutSideTest(_ParserState):
-    """State for the subunit parser outside of a test context."""
-
-    def lostConnection(self):
-        """Connection lost."""
-
-    def startTest(self, offset, line):
-        """A test start command received."""
-        self.parser._state = self.parser._in_test
-        test_name = line[offset:-1].decode('utf8')
-        self.parser._current_test = RemotedTestCase(test_name)
-        self.parser.current_test_description = test_name
-        self.parser.client.startTest(self.parser._current_test)
-        self.parser.subunitLineReceived(line)
-
-
-class _ReadingDetails(_ParserState):
-    """Common logic for readin state details."""
-
-    def endDetails(self):
-        """The end of a details section has been reached."""
-        self.parser._state = self.parser._outside_test
-        self.parser.current_test_description = None
-        self._report_outcome()
-        self.parser.client.stopTest(self.parser._current_test)
-
-    def lineReceived(self, line):
-        """a line has been received."""
-        self.details_parser.lineReceived(line)
-        self.parser.subunitLineReceived(line)
-
-    def lostConnection(self):
-        """Connection lost."""
-        self.parser._lostConnectionInTest(_u('%s report of ') %
-            self._outcome_label())
-
-    def _outcome_label(self):
-        """The label to describe this outcome."""
-        raise NotImplementedError(self._outcome_label)
-
-    def set_simple(self):
-        """Start a simple details parser."""
-        self.details_parser = details.SimpleDetailsParser(self)
-
-    def set_multipart(self):
-        """Start a multipart details parser."""
-        self.details_parser = details.MultipartDetailsParser(self)
-
-
-class _ReadingFailureDetails(_ReadingDetails):
-    """State for the subunit parser when reading failure details."""
-
-    def _report_outcome(self):
-        self.parser.client.addFailure(self.parser._current_test,
-            details=self.details_parser.get_details())
-
-    def _outcome_label(self):
-        return "failure"
-
-
-class _ReadingErrorDetails(_ReadingDetails):
-    """State for the subunit parser when reading error details."""
-
-    def _report_outcome(self):
-        self.parser.client.addError(self.parser._current_test,
-            details=self.details_parser.get_details())
-
-    def _outcome_label(self):
-        return "error"
-
-
-class _ReadingExpectedFailureDetails(_ReadingDetails):
-    """State for the subunit parser when reading xfail details."""
-
-    def _report_outcome(self):
-        self.parser.client.addExpectedFailure(self.parser._current_test,
-            details=self.details_parser.get_details())
-
-    def _outcome_label(self):
-        return "xfail"
-
-
-class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
-    """State for the subunit parser when reading uxsuccess details."""
-
-    def _report_outcome(self):
-        self.parser.client.addUnexpectedSuccess(self.parser._current_test,
-            details=self.details_parser.get_details())
-
-    def _outcome_label(self):
-        return "uxsuccess"
-
-
-class _ReadingSkipDetails(_ReadingDetails):
-    """State for the subunit parser when reading skip details."""
-
-    def _report_outcome(self):
-        self.parser.client.addSkip(self.parser._current_test,
-            details=self.details_parser.get_details("skip"))
-
-    def _outcome_label(self):
-        return "skip"
-
-
-class _ReadingSuccessDetails(_ReadingDetails):
-    """State for the subunit parser when reading success details."""
-
-    def _report_outcome(self):
-        self.parser.client.addSuccess(self.parser._current_test,
-            details=self.details_parser.get_details("success"))
-
-    def _outcome_label(self):
-        return "success"
-
-
-class TestProtocolServer(object):
-    """A parser for subunit.
-
-    :ivar tags: The current tags associated with the protocol stream.
-    """
-
-    def __init__(self, client, stream=None, forward_stream=None):
-        """Create a TestProtocolServer instance.
-
-        :param client: An object meeting the unittest.TestResult protocol.
-        :param stream: The stream that lines received which are not part of the
-            subunit protocol should be written to. This allows custom handling
-            of mixed protocols. By default, sys.stdout will be used for
-            convenience. It should accept bytes to its write() method.
-        :param forward_stream: A stream to forward subunit lines to. This
-            allows a filter to forward the entire stream while still parsing
-            and acting on it. By default forward_stream is set to
-            DiscardStream() and no forwarding happens.
-        """
-        self.client = ExtendedToOriginalDecorator(client)
-        if stream is None:
-            stream = sys.stdout
-            if sys.version_info > (3, 0):
-                stream = stream.buffer
-        self._stream = stream
-        self._forward_stream = forward_stream or DiscardStream()
-        # state objects we can switch too
-        self._in_test = _InTest(self)
-        self._outside_test = _OutSideTest(self)
-        self._reading_error_details = _ReadingErrorDetails(self)
-        self._reading_failure_details = _ReadingFailureDetails(self)
-        self._reading_skip_details = _ReadingSkipDetails(self)
-        self._reading_success_details = _ReadingSuccessDetails(self)
-        self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
-        self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
-        # start with outside test.
-        self._state = self._outside_test
-        # Avoid casts on every call
-        self._plusminus = _b('+-')
-        self._push_sym = _b('push')
-        self._pop_sym = _b('pop')
-
-    def _handleProgress(self, offset, line):
-        """Process a progress directive."""
-        line = line[offset:].strip()
-        if line[0] in self._plusminus:
-            whence = PROGRESS_CUR
-            delta = int(line)
-        elif line == self._push_sym:
-            whence = PROGRESS_PUSH
-            delta = None
-        elif line == self._pop_sym:
-            whence = PROGRESS_POP
-            delta = None
-        else:
-            whence = PROGRESS_SET
-            delta = int(line)
-        self.client.progress(delta, whence)
-
-    def _handleTags(self, offset, line):
-        """Process a tags command."""
-        tags = line[offset:].decode('utf8').split()
-        new_tags, gone_tags = tags_to_new_gone(tags)
-        self.client.tags(new_tags, gone_tags)
-
-    def _handleTime(self, offset, line):
-        # Accept it, but do not do anything with it yet.
-        try:
-            event_time = iso8601.parse_date(line[offset:-1])
-        except TypeError:
-            raise TypeError(_u("Failed to parse %r, got %r")
-                % (line, sys.exec_info[1]))
-        self.client.time(event_time)
-
-    def lineReceived(self, line):
-        """Call the appropriate local method for the received line."""
-        self._state.lineReceived(line)
-
-    def _lostConnectionInTest(self, state_string):
-        error_string = _u("lost connection during %stest '%s'") % (
-            state_string, self.current_test_description)
-        self.client.addError(self._current_test, RemoteError(error_string))
-        self.client.stopTest(self._current_test)
-
-    def lostConnection(self):
-        """The input connection has finished."""
-        self._state.lostConnection()
-
-    def readFrom(self, pipe):
-        """Blocking convenience API to parse an entire stream.
-
-        :param pipe: A file-like object supporting readlines().
-        :return: None.
-        """
-        for line in pipe.readlines():
-            self.lineReceived(line)
-        self.lostConnection()
-
-    def _startTest(self, offset, line):
-        """Internal call to change state machine. Override startTest()."""
-        self._state.startTest(offset, line)
-
-    def subunitLineReceived(self, line):
-        self._forward_stream.write(line)
-
-    def stdOutLineReceived(self, line):
-        self._stream.write(line)
-
-
-class TestProtocolClient(testresult.TestResult):
-    """A TestResult which generates a subunit stream for a test run.
-
-    # Get a TestSuite or TestCase to run
-    suite = make_suite()
-    # Create a stream (any object with a 'write' method). This should accept
-    # bytes not strings: subunit is a byte orientated protocol.
-    stream = file('tests.log', 'wb')
-    # Create a subunit result object which will output to the stream
-    result = subunit.TestProtocolClient(stream)
-    # Optionally, to get timing data for performance analysis, wrap the
-    # serialiser with a timing decorator
-    result = subunit.test_results.AutoTimingTestResultDecorator(result)
-    # Run the test suite reporting to the subunit result object
-    suite.run(result)
-    # Close the stream.
-    stream.close()
-    """
-
-    def __init__(self, stream):
-        testresult.TestResult.__init__(self)
-        stream = _make_stream_binary(stream)
-        self._stream = stream
-        self._progress_fmt = _b("progress: ")
-        self._bytes_eol = _b("\n")
-        self._progress_plus = _b("+")
-        self._progress_push = _b("push")
-        self._progress_pop = _b("pop")
-        self._empty_bytes = _b("")
-        self._start_simple = _b(" [\n")
-        self._end_simple = _b("]\n")
-
-    def addError(self, test, error=None, details=None):
-        """Report an error in test test.
-
-        Only one of error and details should be provided: conceptually there
-        are two separate methods:
-            addError(self, test, error)
-            addError(self, test, details)
-
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        :param details: New Testing-in-python drafted API; a dict from string
-            to subunit.Content objects.
-        """
-        self._addOutcome("error", test, error=error, details=details)
-        if self.failfast:
-            self.stop()
-
-    def addExpectedFailure(self, test, error=None, details=None):
-        """Report an expected failure in test test.
-
-        Only one of error and details should be provided: conceptually there
-        are two separate methods:
-            addError(self, test, error)
-            addError(self, test, details)
-
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        :param details: New Testing-in-python drafted API; a dict from string
-            to subunit.Content objects.
-        """
-        self._addOutcome("xfail", test, error=error, details=details)
-
-    def addFailure(self, test, error=None, details=None):
-        """Report a failure in test test.
-
-        Only one of error and details should be provided: conceptually there
-        are two separate methods:
-            addFailure(self, test, error)
-            addFailure(self, test, details)
-
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        :param details: New Testing-in-python drafted API; a dict from string
-            to subunit.Content objects.
-        """
-        self._addOutcome("failure", test, error=error, details=details)
-        if self.failfast:
-            self.stop()
-
-    def _addOutcome(self, outcome, test, error=None, details=None,
-        error_permitted=True):
-        """Report a failure in test test.
-
-        Only one of error and details should be provided: conceptually there
-        are two separate methods:
-            addOutcome(self, test, error)
-            addOutcome(self, test, details)
-
-        :param outcome: A string describing the outcome - used as the
-            event name in the subunit stream.
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        :param details: New Testing-in-python drafted API; a dict from string
-            to subunit.Content objects.
-        :param error_permitted: If True then one and only one of error or
-            details must be supplied. If False then error must not be supplied
-            and details is still optional.  """
-        self._stream.write(_b("%s: " % outcome) + self._test_id(test))
-        if error_permitted:
-            if error is None and details is None:
-                raise ValueError
-        else:
-            if error is not None:
-                raise ValueError
-        if error is not None:
-            self._stream.write(self._start_simple)
-            tb_content = TracebackContent(error, test)
-            for bytes in tb_content.iter_bytes():
-                self._stream.write(bytes)
-        elif details is not None:
-            self._write_details(details)
-        else:
-            self._stream.write(_b("\n"))
-        if details is not None or error is not None:
-            self._stream.write(self._end_simple)
-
-    def addSkip(self, test, reason=None, details=None):
-        """Report a skipped test."""
-        if reason is None:
-            self._addOutcome("skip", test, error=None, details=details)
-        else:
-            self._stream.write(_b("skip: %s [\n" % test.id()))
-            self._stream.write(_b("%s\n" % reason))
-            self._stream.write(self._end_simple)
-
-    def addSuccess(self, test, details=None):
-        """Report a success in a test."""
-        self._addOutcome("successful", test, details=details, error_permitted=False)
-
-    def addUnexpectedSuccess(self, test, details=None):
-        """Report an unexpected success in test test.
-
-        Details can optionally be provided: conceptually there
-        are two separate methods:
-            addError(self, test)
-            addError(self, test, details)
-
-        :param details: New Testing-in-python drafted API; a dict from string
-            to subunit.Content objects.
-        """
-        self._addOutcome("uxsuccess", test, details=details,
-            error_permitted=False)
-        if self.failfast:
-            self.stop()
-
-    def _test_id(self, test):
-        result = test.id()
-        if type(result) is not bytes:
-            result = result.encode('utf8')
-        return result
-
-    def startTest(self, test):
-        """Mark a test as starting its test run."""
-        super(TestProtocolClient, self).startTest(test)
-        self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
-        self._stream.flush()
-
-    def stopTest(self, test):
-        super(TestProtocolClient, self).stopTest(test)
-        self._stream.flush()
-
-    def progress(self, offset, whence):
-        """Provide indication about the progress/length of the test run.
-
-        :param offset: Information about the number of tests remaining. If
-            whence is PROGRESS_CUR, then offset increases/decreases the
-            remaining test count. If whence is PROGRESS_SET, then offset
-            specifies exactly the remaining test count.
-        :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
-            PROGRESS_POP.
-        """
-        if whence == PROGRESS_CUR and offset > -1:
-            prefix = self._progress_plus
-            offset = _b(str(offset))
-        elif whence == PROGRESS_PUSH:
-            prefix = self._empty_bytes
-            offset = self._progress_push
-        elif whence == PROGRESS_POP:
-            prefix = self._empty_bytes
-            offset = self._progress_pop
-        else:
-            prefix = self._empty_bytes
-            offset = _b(str(offset))
-        self._stream.write(self._progress_fmt + prefix + offset +
-            self._bytes_eol)
-
-    def tags(self, new_tags, gone_tags):
-        """Inform the client about tags added/removed from the stream."""
-        if not new_tags and not gone_tags:
-            return
-        tags = set([tag.encode('utf8') for tag in new_tags])
-        tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
-        tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
-        self._stream.write(tag_line)
-
-    def time(self, a_datetime):
-        """Inform the client of the time.
-
-        ":param datetime: A datetime.datetime object.
-        """
-        time = a_datetime.astimezone(iso8601.Utc())
-        self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
-            time.year, time.month, time.day, time.hour, time.minute,
-            time.second, time.microsecond)))
-
-    def _write_details(self, details):
-        """Output details to the stream.
-
-        :param details: An extended details dict for a test outcome.
-        """
-        self._stream.write(_b(" [ multipart\n"))
-        for name, content in sorted(details.items()):
-            self._stream.write(_b("Content-Type: %s/%s" %
-                (content.content_type.type, content.content_type.subtype)))
-            parameters = content.content_type.parameters
-            if parameters:
-                self._stream.write(_b(";"))
-                param_strs = []
-                for param, value in parameters.items():
-                    param_strs.append("%s=%s" % (param, value))
-                self._stream.write(_b(",".join(param_strs)))
-            self._stream.write(_b("\n%s\n" % name))
-            encoder = chunked.Encoder(self._stream)
-            list(map(encoder.write, content.iter_bytes()))
-            encoder.close()
-
-    def done(self):
-        """Obey the testtools result.done() interface."""
-
-
-def RemoteError(description=_u("")):
-    return (_StringException, _StringException(description), None)
-
-
-class RemotedTestCase(unittest.TestCase):
-    """A class to represent test cases run in child processes.
-
-    Instances of this class are used to provide the Python test API a TestCase
-    that can be printed to the screen, introspected for metadata and so on.
-    However, as they are a simply a memoisation of a test that was actually
-    run in the past by a separate process, they cannot perform any interactive
-    actions.
-    """
-
-    def __eq__ (self, other):
-        try:
-            return self.__description == other.__description
-        except AttributeError:
-            return False
-
-    def __init__(self, description):
-        """Create a psuedo test case with description description."""
-        self.__description = description
-
-    def error(self, label):
-        raise NotImplementedError("%s on RemotedTestCases is not permitted." %
-            label)
-
-    def setUp(self):
-        self.error("setUp")
-
-    def tearDown(self):
-        self.error("tearDown")
-
-    def shortDescription(self):
-        return self.__description
-
-    def id(self):
-        return "%s" % (self.__description,)
-
-    def __str__(self):
-        return "%s (%s)" % (self.__description, self._strclass())
-
-    def __repr__(self):
-        return "<%s description='%s'>" % \
-               (self._strclass(), self.__description)
-
-    def run(self, result=None):
-        if result is None: result = self.defaultTestResult()
-        result.startTest(self)
-        result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
-        result.stopTest(self)
-
-    def _strclass(self):
-        cls = self.__class__
-        return "%s.%s" % (cls.__module__, cls.__name__)
-
-
-class ExecTestCase(unittest.TestCase):
-    """A test case which runs external scripts for test fixtures."""
-
-    def __init__(self, methodName='runTest'):
-        """Create an instance of the class that will use the named test
-           method when executed. Raises a ValueError if the instance does
-           not have a method with the specified name.
-        """
-        unittest.TestCase.__init__(self, methodName)
-        testMethod = getattr(self, methodName)
-        self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
-                               testMethod.__doc__)
-
-    def countTestCases(self):
-        return 1
-
-    def run(self, result=None):
-        if result is None: result = self.defaultTestResult()
-        self._run(result)
-
-    def debug(self):
-        """Run the test without collecting errors in a TestResult"""
-        self._run(testresult.TestResult())
-
-    def _run(self, result):
-        protocol = TestProtocolServer(result)
-        process = subprocess.Popen(self.script, shell=True,
-            stdout=subprocess.PIPE)
-        _make_stream_binary(process.stdout)
-        output = process.communicate()[0]
-        protocol.readFrom(BytesIO(output))
-
-
-class IsolatedTestCase(unittest.TestCase):
-    """A TestCase which executes in a forked process.
-
-    Each test gets its own process, which has a performance overhead but will
-    provide excellent isolation from global state (such as django configs,
-    zope utilities and so on).
-    """
-
-    def run(self, result=None):
-        if result is None: result = self.defaultTestResult()
-        run_isolated(unittest.TestCase, self, result)
-
-
-class IsolatedTestSuite(unittest.TestSuite):
-    """A TestSuite which runs its tests in a forked process.
-
-    This decorator that will fork() before running the tests and report the
-    results from the child process using a Subunit stream.  This is useful for
-    handling tests that mutate global state, or are testing C extensions that
-    could crash the VM.
-    """
-
-    def run(self, result=None):
-        if result is None: result = testresult.TestResult()
-        run_isolated(unittest.TestSuite, self, result)
-
-
-def run_isolated(klass, self, result):
-    """Run a test suite or case in a subprocess, using the run method on klass.
-    """
-    c2pread, c2pwrite = os.pipe()
-    # fixme - error -> result
-    # now fork
-    pid = os.fork()
-    if pid == 0:
-        # Child
-        # Close parent's pipe ends
-        os.close(c2pread)
-        # Dup fds for child
-        os.dup2(c2pwrite, 1)
-        # Close pipe fds.
-        os.close(c2pwrite)
-
-        # at this point, sys.stdin is redirected, now we want
-        # to filter it to escape ]'s.
-        ### XXX: test and write that bit.
-        stream = os.fdopen(1, 'wb')
-        result = TestProtocolClient(stream)
-        klass.run(self, result)
-        stream.flush()
-        sys.stderr.flush()
-        # exit HARD, exit NOW.
-        os._exit(0)
-    else:
-        # Parent
-        # Close child pipe ends
-        os.close(c2pwrite)
-        # hookup a protocol engine
-        protocol = TestProtocolServer(result)
-        fileobj = os.fdopen(c2pread, 'rb')
-        protocol.readFrom(fileobj)
-        os.waitpid(pid, 0)
-        # TODO return code evaluation.
-    return result
-
-
-def TAP2SubUnit(tap, subunit):
-    """Filter a TAP pipe into a subunit pipe.
-
-    :param tap: A tap pipe/stream/file object.
-    :param subunit: A pipe/stream/file object to write subunit results to.
-    :return: The exit code to exit with.
-    """
-    BEFORE_PLAN = 0
-    AFTER_PLAN = 1
-    SKIP_STREAM = 2
-    state = BEFORE_PLAN
-    plan_start = 1
-    plan_stop = 0
-    def _skipped_test(subunit, plan_start):
-        # Some tests were skipped.
-        subunit.write('test test %d\n' % plan_start)
-        subunit.write('error test %d [\n' % plan_start)
-        subunit.write('test missing from TAP output\n')
-        subunit.write(']\n')
-        return plan_start + 1
-    # Test data for the next test to emit
-    test_name = None
-    log = []
-    result = None
-    def _emit_test():
-        "write out a test"
-        if test_name is None:
-            return
-        subunit.write("test %s\n" % test_name)
-        if not log:
-            subunit.write("%s %s\n" % (result, test_name))
-        else:
-            subunit.write("%s %s [\n" % (result, test_name))
-        if log:
-            for line in log:
-                subunit.write("%s\n" % line)
-            subunit.write("]\n")
-        del log[:]
-    for line in tap:
-        if state == BEFORE_PLAN:
-            match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
-            if match:
-                state = AFTER_PLAN
-                _, plan_stop, comment = match.groups()
-                plan_stop = int(plan_stop)
-                if plan_start > plan_stop and plan_stop == 0:
-                    # skipped file
-                    state = SKIP_STREAM
-                    subunit.write("test file skip\n")
-                    subunit.write("skip file skip [\n")
-                    subunit.write("%s\n" % comment)
-                    subunit.write("]\n")
-                continue
-        # not a plan line, or have seen one before
-        match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
-        if match:
-            # new test, emit current one.
-            _emit_test()
-            status, number, description, directive, directive_comment = match.groups()
-            if status == 'ok':
-                result = 'success'
-            else:
-                result = "failure"
-            if description is None:
-                description = ''
-            else:
-                description = ' ' + description
-            if directive is not None:
-                if directive.upper() == 'TODO':
-                    result = 'xfail'
-                elif directive.upper() == 'SKIP':
-                    result = 'skip'
-                if directive_comment is not None:
-                    log.append(directive_comment)
-            if number is not None:
-                number = int(number)
-                while plan_start < number:
-                    plan_start = _skipped_test(subunit, plan_start)
-            test_name = "test %d%s" % (plan_start, description)
-            plan_start += 1
-            continue
-        match = re.match("Bail out\!(?:\s*(.*))?\n", line)
-        if match:
-            reason, = match.groups()
-            if reason is None:
-                extra = ''
-            else:
-                extra = ' %s' % reason
-            _emit_test()
-            test_name = "Bail out!%s" % extra
-            result = "error"
-            state = SKIP_STREAM
-            continue
-        match = re.match("\#.*\n", line)
-        if match:
-            log.append(line[:-1])
-            continue
-        subunit.write(line)
-    _emit_test()
-    while plan_start <= plan_stop:
-        # record missed tests
-        plan_start = _skipped_test(subunit, plan_start)
-    return 0
-
-
-def tag_stream(original, filtered, tags):
-    """Alter tags on a stream.
-
-    :param original: The input stream.
-    :param filtered: The output stream.
-    :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
-        '-TAG' commands.
-
-        A 'TAG' command will add the tag to the output stream,
-        and override any existing '-TAG' command in that stream.
-        Specifically:
-         * A global 'tags: TAG' will be added to the start of the stream.
-         * Any tags commands with -TAG will have the -TAG removed.
-
-        A '-TAG' command will remove the TAG command from the stream.
-        Specifically:
-         * A 'tags: -TAG' command will be added to the start of the stream.
-         * Any 'tags: TAG' command will have 'TAG' removed from it.
-        Additionally, any redundant tagging commands (adding a tag globally
-        present, or removing a tag globally removed) are stripped as a
-        by-product of the filtering.
-    :return: 0
-    """
-    new_tags, gone_tags = tags_to_new_gone(tags)
-    def write_tags(new_tags, gone_tags):
-        if new_tags or gone_tags:
-            filtered.write("tags: " + ' '.join(new_tags))
-            if gone_tags:
-                for tag in gone_tags:
-                    filtered.write("-" + tag)
-            filtered.write("\n")
-    write_tags(new_tags, gone_tags)
-    # TODO: use the protocol parser and thus don't mangle test comments.
-    for line in original:
-        if line.startswith("tags:"):
-            line_tags = line[5:].split()
-            line_new, line_gone = tags_to_new_gone(line_tags)
-            line_new = line_new - gone_tags
-            line_gone = line_gone - new_tags
-            write_tags(line_new, line_gone)
-        else:
-            filtered.write(line)
-    return 0
-
-
-class ProtocolTestCase(object):
-    """Subunit wire protocol to unittest.TestCase adapter.
-
-    ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
-    calling a ProtocolTestCase or invoking the run() method will make a 'test
-    run' happen. The 'test run' will simply be a replay of the test activity
-    that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
-    and ``countTestCases`` methods are not supported because there isn't a
-    sensible mapping for those methods.
-
-    # Get a stream (any object with a readline() method), in this case the
-    # stream output by the example from ``subunit.TestProtocolClient``.
-    stream = file('tests.log', 'rb')
-    # Create a parser which will read from the stream and emit
-    # activity to a unittest.TestResult when run() is called.
-    suite = subunit.ProtocolTestCase(stream)
-    # Create a result object to accept the contents of that stream.
-    result = unittest._TextTestResult(sys.stdout)
-    # 'run' the tests - process the stream and feed its contents to result.
-    suite.run(result)
-    stream.close()
-
-    :seealso: TestProtocolServer (the subunit wire protocol parser).
-    """
-
-    def __init__(self, stream, passthrough=None, forward=None):
-        """Create a ProtocolTestCase reading from stream.
-
-        :param stream: A filelike object which a subunit stream can be read
-            from.
-        :param passthrough: A stream pass non subunit input on to. If not
-            supplied, the TestProtocolServer default is used.
-        :param forward: A stream to pass subunit input on to. If not supplied
-            subunit input is not forwarded.
-        """
-        stream = _make_stream_binary(stream)
-        self._stream = stream
-        self._passthrough = passthrough
-        if forward is not None:
-            forward = _make_stream_binary(forward)
-        self._forward = forward
-
-    def __call__(self, result=None):
-        return self.run(result)
-
-    def run(self, result=None):
-        if result is None:
-            result = self.defaultTestResult()
-        protocol = TestProtocolServer(result, self._passthrough, self._forward)
-        line = self._stream.readline()
-        while line:
-            protocol.lineReceived(line)
-            line = self._stream.readline()
-        protocol.lostConnection()
-
-
-class TestResultStats(testresult.TestResult):
-    """A pyunit TestResult interface implementation for making statistics.
-
-    :ivar total_tests: The total tests seen.
-    :ivar passed_tests: The tests that passed.
-    :ivar failed_tests: The tests that failed.
-    :ivar seen_tags: The tags seen across all tests.
-    """
-
-    def __init__(self, stream):
-        """Create a TestResultStats which outputs to stream."""
-        testresult.TestResult.__init__(self)
-        self._stream = stream
-        self.failed_tests = 0
-        self.skipped_tests = 0
-        self.seen_tags = set()
-
-    @property
-    def total_tests(self):
-        return self.testsRun
-
-    def addError(self, test, err, details=None):
-        self.failed_tests += 1
-
-    def addFailure(self, test, err, details=None):
-        self.failed_tests += 1
-
-    def addSkip(self, test, reason, details=None):
-        self.skipped_tests += 1
-
-    def formatStats(self):
-        self._stream.write("Total tests:   %5d\n" % self.total_tests)
-        self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
-        self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
-        self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
-        tags = sorted(self.seen_tags)
-        self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
-
-    @property
-    def passed_tests(self):
-        return self.total_tests - self.failed_tests - self.skipped_tests
-
-    def tags(self, new_tags, gone_tags):
-        """Accumulate the seen tags."""
-        self.seen_tags.update(new_tags)
-
-    def wasSuccessful(self):
-        """Tells whether or not this result was a success"""
-        return self.failed_tests == 0
-
-
-def get_default_formatter():
-    """Obtain the default formatter to write to.
-
-    :return: A file-like object.
-    """
-    formatter = os.getenv("SUBUNIT_FORMATTER")
-    if formatter:
-        return os.popen(formatter, "w")
-    else:
-        stream = sys.stdout
-        if sys.version_info > (3, 0):
-            stream = stream.buffer
-        return stream
-
-
-def read_test_list(path):
-    """Read a list of test ids from a file on disk.
-
-    :param path: Path to the file
-    :return: Sequence of test ids
-    """
-    f = open(path, 'rb')
-    try:
-        return [l.rstrip("\n") for l in f.readlines()]
-    finally:
-        f.close()
-
-
-def _make_stream_binary(stream):
-    """Ensure that a stream will be binary safe. See _make_binary_on_windows.
-    
-    :return: A binary version of the same stream (some streams cannot be
-        'fixed' but can be unwrapped).
-    """
-    try:
-        fileno = stream.fileno()
-    except _UnsupportedOperation:
-        pass
-    else:
-        _make_binary_on_windows(fileno)
-    return _unwrap_text(stream)
-
-def _make_binary_on_windows(fileno):
-    """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
-    if sys.platform == "win32":
-        import msvcrt
-        msvcrt.setmode(fileno, os.O_BINARY)
-
-
-def _unwrap_text(stream):
-    """Unwrap stream if it is a text stream to get the original buffer."""
-    if sys.version_info > (3, 0):
-        try:
-            # Read streams
-            if type(stream.read(0)) is str:
-                return stream.buffer
-        except (_UnsupportedOperation, IOError):
-            # Cannot read from the stream: try via writes
-            try:
-                stream.write(_b(''))
-            except TypeError:
-                return stream.buffer
-    return stream