subunit: Update to latest upstream snapshot.
authorJelmer Vernooij <jelmer@samba.org>
Sat, 27 Aug 2011 14:07:38 +0000 (16:07 +0200)
committerJelmer Vernooij <jelmer@samba.org>
Sat, 27 Aug 2011 14:07:38 +0000 (16:07 +0200)
22 files changed:
lib/subunit/INSTALL
lib/subunit/NEWS
lib/subunit/README
lib/subunit/c/tests/test_child.c
lib/subunit/filters/subunit-filter
lib/subunit/filters/subunit-ls
lib/subunit/python/subunit/__init__.py
lib/subunit/python/subunit/chunked.py
lib/subunit/python/subunit/details.py
lib/subunit/python/subunit/iso8601.py
lib/subunit/python/subunit/run.py
lib/subunit/python/subunit/test_results.py
lib/subunit/python/subunit/tests/TestUtil.py
lib/subunit/python/subunit/tests/sample-script.py
lib/subunit/python/subunit/tests/test_chunked.py
lib/subunit/python/subunit/tests/test_details.py
lib/subunit/python/subunit/tests/test_subunit_filter.py
lib/subunit/python/subunit/tests/test_subunit_stats.py
lib/subunit/python/subunit/tests/test_subunit_tags.py
lib/subunit/python/subunit/tests/test_tap2subunit.py
lib/subunit/python/subunit/tests/test_test_protocol.py
lib/subunit/python/subunit/tests/test_test_results.py

index 06122552eda893b7fe3ec22e2c7a005a393d586e..2a053d8ad6db18fa1067dc10354e5e3358d5eddb 100644 (file)
@@ -14,7 +14,7 @@ Dependencies
 * Python for the filters
 * 'testtools' (On Debian and Ubuntu systems the 'python-testtools' package,
   the testtools package on pypi, or https://launchpad.net/testtools) for
-  the extended test API which permits attachments. Version 0.9.8 or newer is 
+  the extended test API which permits attachments. Version 0.9.11 or newer is 
   required. Of particular note, http://testtools.python-hosting.com/ is not
   the testtools you want.
 * A C compiler for the C bindings
index f1fd9ce06fbdadc04c7fda7b64d3f5cf4e6e66bd..f91b11a00ef1146399cf20a590280e77e99d13a6 100644 (file)
@@ -10,13 +10,74 @@ test ids and also filter via a test id list file thanks to improvements in
 ``testtools.run``. See the testtools manual, or testrepository - a major
 user of such functionality.
 
+Additionally the protocol now has a keyword uxsuccess for Unexpected Success
+reporting. Older parsers will report tests with this status code as 'lost
+connection'.
+
 IMPROVEMENTS
 ~~~~~~~~~~~~
 
+* Add ``TimeCollapsingDecorator`` which collapses multiple sequential time()
+  calls into just the first and last. (Jonathan Lange)
+
+* Add ``TagCollapsingDecorator`` which collapses many tags() calls into one
+  where possible. (Jonathan Lange, Robert Collins)
+
+* Force flush of writes to stdout in c/tests/test_child.
+  (Jelmer Vernooij, #687611)
+
+* Provisional Python 3.x support.
+  (Robert Collins, Tres Seaver, Martin[gz], #666819)
+
+* ``subunit.chunked.Decoder`` Python class takes a new ``strict`` option,
+  which defaults to ``True``. When ``False``, the ``Decoder`` will accept
+  incorrect input that is still unambiguous. i.e. subunit will not barf if
+  a \r is missing from the input. (Martin Pool)
+
+* ``subunit-filter`` preserves the relative ordering of ``time:`` statements,
+  so you can now use filtered streams to gather data about how long it takes
+  to run a test. (Jonathan Lange, #716554)
+
+* ``subunit-ls`` now handles a stream with time: instructions that start
+  partway through the stream (which may lead to strange times) more gracefully.
+  (Robert Collins, #785954)
+
+* ``subunit-ls`` should handle the new test outcomes in Python2.7 better.
+  (Robert Collins, #785953)
+
+* ``TestResultFilter`` now collapses sequential calls to time().
+  (Jonathan Lange, #567150)
+
+* ``TestResultDecorator.tags()`` now actually works, and is no longer a buggy
+  copy/paste of ``TestResultDecorator.time()``. (Jonathan Lange, #681828)
+
+* ``TestResultFilter`` now supports a ``fixup_expected_failures``
+  argument. (Jelmer Vernooij, #755241)
+
 * The ``subunit.run`` Python module supports ``-l`` and ``--load-list`` as
   per ``testtools.run``. This required a dependency bump due to a small
   API change in ``testtools``. (Robert Collins)
 
+* The help for subunit-filter was confusing about the behaviour of ``-f`` /
+  ``--no-failure``. (Robert Collins, #703392)
+
+* The Python2.7 / testtools addUnexpectedSuccess API is now supported. This
+  required adding a new status code to the protocol. (Robert Collins, #654474)
+
+BUG FIXES
+~~~~~~~~~
+
+* Add 'subunit --no-xfail', which will omit expected failures from the subunit
+  stream. (John Arbash Meinel, #623642)
+
+* Add 'subunit -F/--only-genuine-failures' which sets all of '--no-skips',
+  '--no-xfail', '--no-passthrough, '--no-success', and gives you just the
+  failure stream. (John Arbash Meinel)
+
+CHANGES
+~~~~~~~
+
+* Newer testtools is needed as part of the Python 3 support. (Robert Collins)
 
 0.0.6
 -----
index 6ac258485fc80e623ff07f9d124520bc24422745..4818a057bff2c0343f025332e900ec0fa8365b38 100644 (file)
@@ -142,23 +142,26 @@ line orientated and consists of either directives and their parameters, or
 when outside a DETAILS region unexpected lines which are not interpreted by
 the parser - they should be forwarded unaltered.
 
-test|testing|test:|testing: test label
-success|success:|successful|successful: test label
-success|success:|successful|successful: test label DETAILS
-failure: test label
-failure: test label DETAILS
-error: test label
-error: test label DETAILS
-skip[:] test label
-skip[:] test label DETAILS
-xfail[:] test label
-xfail[:] test label DETAILS
+test|testing|test:|testing: test LABEL
+success|success:|successful|successful: test LABEL
+success|success:|successful|successful: test LABEL DETAILS
+failure: test LABEL
+failure: test LABEL DETAILS
+error: test LABEL
+error: test LABEL DETAILS
+skip[:] test LABEL
+skip[:] test LABEL DETAILS
+xfail[:] test LABEL
+xfail[:] test LABEL DETAILS
+uxsuccess[:] test LABEL
+uxsuccess[:] test LABEL DETAILS
 progress: [+|-]X
 progress: push
 progress: pop
 tags: [-]TAG ...
 time: YYYY-MM-DD HH:MM:SSZ
 
+LABEL: UTF8*
 DETAILS ::= BRACKETED | MULTIPART
 BRACKETED ::= '[' CR UTF8-lines ']' CR
 MULTIPART ::= '[ multipart' CR PART* ']' CR
@@ -200,13 +203,14 @@ directive for the most recently started test).
 The time directive acts as a clock event - it sets the time for all future
 events. The value should be a valid ISO8601 time.
 
-The skip result is used to indicate a test that was found by the runner but not
-fully executed due to some policy or dependency issue. This is represented in
-python using the addSkip interface that testtools
-(https://edge.launchpad.net/testtools) defines. When communicating with a non
-skip aware test result, the test is reported as an error.
-The xfail result is used to indicate a test that was expected to fail failing
-in the expected manner. As this is a normal condition for such tests it is
-represented as a successful test in Python.
-In future, skip and xfail results will be represented semantically in Python,
-but some discussion is underway on the right way to do this.
+The skip, xfail and uxsuccess outcomes are not supported by all testing
+environments. In Python the testttools (https://launchpad.net/testtools)
+library is used to translate these automatically if an older Python version
+that does not support them is in use. See the testtools documentation for the
+translation policy.
+
+skip is used to indicate a test was discovered but not executed. xfail is used
+to indicate a test that errored in some expected fashion (also know as "TODO"
+tests in some frameworks). uxsuccess is used to indicate and unexpected success
+where a test though to be failing actually passes. It is complementary to
+xfail.
index 0744599b9f96103faf57602de241d6f5223e5849..1318322ab20dfb121511245f7031596757b1a2e4 100644 (file)
@@ -16,6 +16,7 @@
  **/
 
 #include <stdlib.h>
+#include <stdio.h>
 #include <unistd.h>
 #include <string.h>
 #include <check.h>
@@ -57,6 +58,8 @@ test_stdout_function(char const * expected,
      * DEAL.
      */
     function();
+    /* flush writes on FILE object to file descriptor */
+    fflush(stdout);
     /* restore stdout now */
     if (dup2(old_stdout, 1) != 1) {
       close(old_stdout);
index c06a03a827f5580c49eb43ff386ab3b70c1c8429..7f5620f151d0bae68d5424dd99b1f6f344ca2772 100755 (executable)
@@ -28,13 +28,13 @@ Remember to quote shell metacharacters.
 
 from optparse import OptionParser
 import sys
-import unittest
 import re
 
 from subunit import (
     DiscardStream,
     ProtocolTestCase,
     TestProtocolClient,
+    read_test_list,
     )
 from subunit.test_results import TestResultFilter
 
@@ -46,24 +46,43 @@ parser.add_option("-e", "--no-error", action="store_true",
 parser.add_option("--failure", action="store_false",
     help="include failures", default=False, dest="failure")
 parser.add_option("-f", "--no-failure", action="store_true",
-    help="include failures", dest="failure")
+    help="exclude failures", dest="failure")
+parser.add_option("--passthrough", action="store_false",
+    help="Show all non subunit input.", default=False, dest="no_passthrough")
 parser.add_option("--no-passthrough", action="store_true",
     help="Hide all non subunit input.", default=False, dest="no_passthrough")
 parser.add_option("-s", "--success", action="store_false",
     help="include successes", dest="success")
-parser.add_option("--no-skip", action="store_true",
-    help="exclude skips", dest="skip")
 parser.add_option("--no-success", action="store_true",
     help="exclude successes", default=True, dest="success")
+parser.add_option("--no-skip", action="store_true",
+    help="exclude skips", dest="skip")
+parser.add_option("--xfail", action="store_false",
+    help="include expected falures", default=True, dest="xfail")
+parser.add_option("--no-xfail", action="store_true",
+    help="exclude expected falures", default=True, dest="xfail")
 parser.add_option("-m", "--with", type=str,
     help="regexp to include (case-sensitive by default)",
     action="append", dest="with_regexps")
+parser.add_option("--fixup-expected-failures", type=str,
+    help="File with list of test ids that are expected to fail; on failure "
+         "their result will be changed to xfail; on success they will be "
+         "changed to error.", dest="fixup_expected_failures", action="append")
 parser.add_option("--without", type=str,
     help="regexp to exclude (case-sensitive by default)",
     action="append", dest="without_regexps")
 
-(options, args) = parser.parse_args()
+def only_genuine_failures_callback(option, opt, value, parser):
+    parser.rargs.insert(0, '--no-passthrough')
+    parser.rargs.insert(0, '--no-xfail')
+    parser.rargs.insert(0, '--no-skip')
+    parser.rargs.insert(0, '--no-success')
+
+parser.add_option("-F", "--only-genuine-failures", action="callback",
+    callback=only_genuine_failures_callback,
+    help="Only pass through failures and exceptions.")
 
+(options, args) = parser.parse_args()
 
 def _compile_re_from_list(l):
     return re.compile("|".join(l), re.MULTILINE)
@@ -91,11 +110,15 @@ def _make_regexp_filter(with_regexps, without_regexps):
 
 regexp_filter = _make_regexp_filter(options.with_regexps,
         options.without_regexps)
+fixup_expected_failures = set()
+for path in options.fixup_expected_failures or ():
+    fixup_expected_failures.update(read_test_list(path))
 result = TestProtocolClient(sys.stdout)
 result = TestResultFilter(result, filter_error=options.error,
     filter_failure=options.failure, filter_success=options.success,
-    filter_skip=options.skip,
-    filter_predicate=regexp_filter)
+    filter_skip=options.skip, filter_xfail=options.xfail,
+    filter_predicate=regexp_filter,
+    fixup_expected_failures=fixup_expected_failures)
 if options.no_passthrough:
     passthrough_stream = DiscardStream()
 else:
index 86461347d36b047ddbdf83b61a5e23dff3a4819a..82db4c371ac1e820472d6869c60c86ee549d5815 100755 (executable)
@@ -20,7 +20,10 @@ from optparse import OptionParser
 import sys
 
 from subunit import DiscardStream, ProtocolTestCase
-from subunit.test_results import TestIdPrintingResult
+from subunit.test_results import (
+    AutoTimingTestResultDecorator,
+    TestIdPrintingResult,
+    )
 
 
 parser = OptionParser(description=__doc__)
@@ -30,7 +33,8 @@ parser.add_option("--times", action="store_true",
 parser.add_option("--no-passthrough", action="store_true",
     help="Hide all non subunit input.", default=False, dest="no_passthrough")
 (options, args) = parser.parse_args()
-result = TestIdPrintingResult(sys.stdout, options.times)
+result = AutoTimingTestResultDecorator(
+    TestIdPrintingResult(sys.stdout, options.times))
 if options.no_passthrough:
     passthrough_stream = DiscardStream()
 else:
index b2c7a29237c900566b01450769a328c5cc7e5cbd..b4c939756f268277e07ff92fff000eed7b69d424 100644 (file)
@@ -6,7 +6,7 @@
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
@@ -49,7 +49,7 @@ details, tags, timestamping and progress markers).
 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
 which can be used instead of the usual python unittest parameter.
-When used the value of details should be a dict from ``string`` to 
+When used the value of details should be a dict from ``string`` to
 ``testtools.content.Content`` objects. This is a draft API being worked on with
 the Python Testing In Python mail list, with the goal of permitting a common
 way to provide additional data beyond a traceback, such as captured data from
@@ -58,13 +58,13 @@ and newer).
 
 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
 remove tags in the test run that is currently executing. If called when no
-test is in progress (that is, if called outside of the ``startTest``, 
-``stopTest`` pair), the the tags apply to all sebsequent tests. If called
+test is in progress (that is, if called outside of the ``startTest``,
+``stopTest`` pair), the the tags apply to all subsequent tests. If called
 when a test is in progress, then the tags only apply to that test.
 
 The ``time(a_datetime)`` method is called (if present) when a ``time:``
 directive is encountered in a Subunit stream. This is used to tell a TestResult
-about the time that events in the stream occured at, to allow reconstructing
+about the time that events in the stream occurred at, to allow reconstructing
 test timing from a stream.
 
 The ``progress(offset, whence)`` method controls progress data for a stream.
@@ -87,7 +87,7 @@ tests, allowing isolation between the test runner and some tests.
 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
 tests that will fork() before that individual test is run.
 
-`ExecTestCase`` is a convenience wrapper for running an external 
+`ExecTestCase`` is a convenience wrapper for running an external
 program to get a Subunit stream and then report that back to an arbitrary
 result object::
 
@@ -98,7 +98,7 @@ result object::
 
      def test_script_two(self):
          './bin/script_two'
+
  # Normally your normal test loading would take of this automatically,
  # It is only spelt out in detail here for clarity.
  suite = unittest.TestSuite([AggregateTests("test_script_one"),
@@ -116,26 +116,30 @@ Utility modules
 * subunit.test_results contains TestResult helper classes.
 """
 
-import datetime
 import os
 import re
-from StringIO import StringIO
 import subprocess
 import sys
 import unittest
 
-import iso8601
 from testtools import content, content_type, ExtendedToOriginalDecorator
+from testtools.compat import _b, _u, BytesIO, StringIO
 try:
     from testtools.testresult.real import _StringException
     RemoteException = _StringException
-    _remote_exception_str = '_StringException' # For testing.
+    # For testing: different pythons have different str() implementations.
+    if sys.version_info > (3, 0):
+        _remote_exception_str = "testtools.testresult.real._StringException"
+        _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
+    else:
+        _remote_exception_str = "_StringException" 
+        _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
 except ImportError:
     raise ImportError ("testtools.testresult.real does not contain "
         "_StringException, check your version.")
 from testtools import testresult
 
-import chunked, details, test_results
+from subunit import chunked, details, iso8601, test_results
 
 
 PROGRESS_SET = 0
@@ -187,6 +191,19 @@ class _ParserState(object):
 
     def __init__(self, parser):
         self.parser = parser
+        self._test_sym = (_b('test'), _b('testing'))
+        self._colon_sym = _b(':')
+        self._error_sym = (_b('error'),)
+        self._failure_sym = (_b('failure'),)
+        self._progress_sym = (_b('progress'),)
+        self._skip_sym = _b('skip')
+        self._success_sym = (_b('success'), _b('successful'))
+        self._tags_sym = (_b('tags'),)
+        self._time_sym = (_b('time'),)
+        self._xfail_sym = (_b('xfail'),)
+        self._uxsuccess_sym = (_b('uxsuccess'),)
+        self._start_simple = _u(" [")
+        self._start_multipart = _u(" [ multipart")
 
     def addError(self, offset, line):
         """An 'error:' directive has been read."""
@@ -214,27 +231,29 @@ class _ParserState(object):
         if len(parts) == 2 and line.startswith(parts[0]):
             cmd, rest = parts
             offset = len(cmd) + 1
-            cmd = cmd.rstrip(':')
-            if cmd in ('test', 'testing'):
+            cmd = cmd.rstrip(self._colon_sym)
+            if cmd in self._test_sym:
                 self.startTest(offset, line)
-            elif cmd == 'error':
+            elif cmd in self._error_sym:
                 self.addError(offset, line)
-            elif cmd == 'failure':
+            elif cmd in self._failure_sym:
                 self.addFailure(offset, line)
-            elif cmd == 'progress':
+            elif cmd in self._progress_sym:
                 self.parser._handleProgress(offset, line)
-            elif cmd == 'skip':
+            elif cmd in self._skip_sym:
                 self.addSkip(offset, line)
-            elif cmd in ('success', 'successful'):
+            elif cmd in self._success_sym:
                 self.addSuccess(offset, line)
-            elif cmd in ('tags',):
+            elif cmd in self._tags_sym:
                 self.parser._handleTags(offset, line)
                 self.parser.subunitLineReceived(line)
-            elif cmd in ('time',):
+            elif cmd in self._time_sym:
                 self.parser._handleTime(offset, line)
                 self.parser.subunitLineReceived(line)
-            elif cmd == 'xfail':
+            elif cmd in self._xfail_sym:
                 self.addExpectedFail(offset, line)
+            elif cmd in self._uxsuccess_sym:
+                self.addUnexpectedSuccess(offset, line)
             else:
                 self.parser.stdOutLineReceived(line)
         else:
@@ -242,7 +261,7 @@ class _ParserState(object):
 
     def lostConnection(self):
         """Connection lost."""
-        self.parser._lostConnectionInTest(u'unknown state of ')
+        self.parser._lostConnectionInTest(_u('unknown state of '))
 
     def startTest(self, offset, line):
         """A test start command received."""
@@ -254,24 +273,26 @@ class _InTest(_ParserState):
 
     def _outcome(self, offset, line, no_details, details_state):
         """An outcome directive has been read.
-        
+
         :param no_details: Callable to call when no details are presented.
         :param details_state: The state to switch to for details
             processing of this outcome.
         """
-        if self.parser.current_test_description == line[offset:-1]:
+        test_name = line[offset:-1].decode('utf8')
+        if self.parser.current_test_description == test_name:
             self.parser._state = self.parser._outside_test
             self.parser.current_test_description = None
             no_details()
             self.parser.client.stopTest(self.parser._current_test)
             self.parser._current_test = None
             self.parser.subunitLineReceived(line)
-        elif self.parser.current_test_description + " [" == line[offset:-1]:
+        elif self.parser.current_test_description + self._start_simple == \
+            test_name:
             self.parser._state = details_state
             details_state.set_simple()
             self.parser.subunitLineReceived(line)
-        elif self.parser.current_test_description + " [ multipart" == \
-            line[offset:-1]:
+        elif self.parser.current_test_description + self._start_multipart == \
+            test_name:
             self.parser._state = details_state
             details_state.set_multipart()
             self.parser.subunitLineReceived(line)
@@ -296,6 +317,14 @@ class _InTest(_ParserState):
         self._outcome(offset, line, self._xfail,
             self.parser._reading_xfail_details)
 
+    def _uxsuccess(self):
+        self.parser.client.addUnexpectedSuccess(self.parser._current_test)
+
+    def addUnexpectedSuccess(self, offset, line):
+        """A 'uxsuccess:' directive has been read."""
+        self._outcome(offset, line, self._uxsuccess,
+            self.parser._reading_uxsuccess_details)
+
     def _failure(self):
         self.parser.client.addFailure(self.parser._current_test, details={})
 
@@ -322,7 +351,7 @@ class _InTest(_ParserState):
 
     def lostConnection(self):
         """Connection lost."""
-        self.parser._lostConnectionInTest(u'')
+        self.parser._lostConnectionInTest(_u(''))
 
 
 class _OutSideTest(_ParserState):
@@ -334,8 +363,9 @@ class _OutSideTest(_ParserState):
     def startTest(self, offset, line):
         """A test start command received."""
         self.parser._state = self.parser._in_test
-        self.parser._current_test = RemotedTestCase(line[offset:-1])
-        self.parser.current_test_description = line[offset:-1]
+        test_name = line[offset:-1].decode('utf8')
+        self.parser._current_test = RemotedTestCase(test_name)
+        self.parser.current_test_description = test_name
         self.parser.client.startTest(self.parser._current_test)
         self.parser.subunitLineReceived(line)
 
@@ -357,7 +387,7 @@ class _ReadingDetails(_ParserState):
 
     def lostConnection(self):
         """Connection lost."""
-        self.parser._lostConnectionInTest(u'%s report of ' %
+        self.parser._lostConnectionInTest(_u('%s report of ') %
             self._outcome_label())
 
     def _outcome_label(self):
@@ -382,7 +412,7 @@ class _ReadingFailureDetails(_ReadingDetails):
 
     def _outcome_label(self):
         return "failure"
+
 
 class _ReadingErrorDetails(_ReadingDetails):
     """State for the subunit parser when reading error details."""
@@ -406,6 +436,17 @@ class _ReadingExpectedFailureDetails(_ReadingDetails):
         return "xfail"
 
 
+class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
+    """State for the subunit parser when reading uxsuccess details."""
+
+    def _report_outcome(self):
+        self.parser.client.addUnexpectedSuccess(self.parser._current_test,
+            details=self.details_parser.get_details())
+
+    def _outcome_label(self):
+        return "uxsuccess"
+
+
 class _ReadingSkipDetails(_ReadingDetails):
     """State for the subunit parser when reading skip details."""
 
@@ -430,7 +471,7 @@ class _ReadingSuccessDetails(_ReadingDetails):
 
 class TestProtocolServer(object):
     """A parser for subunit.
-    
+
     :ivar tags: The current tags associated with the protocol stream.
     """
 
@@ -441,8 +482,8 @@ class TestProtocolServer(object):
         :param stream: The stream that lines received which are not part of the
             subunit protocol should be written to. This allows custom handling
             of mixed protocols. By default, sys.stdout will be used for
-            convenience.
-        :param forward_stream: A stream to forward subunit lines to. This 
+            convenience. It should accept bytes to its write() method.
+        :param forward_stream: A stream to forward subunit lines to. This
             allows a filter to forward the entire stream while still parsing
             and acting on it. By default forward_stream is set to
             DiscardStream() and no forwarding happens.
@@ -450,6 +491,8 @@ class TestProtocolServer(object):
         self.client = ExtendedToOriginalDecorator(client)
         if stream is None:
             stream = sys.stdout
+            if sys.version_info > (3, 0):
+                stream = stream.buffer
         self._stream = stream
         self._forward_stream = forward_stream or DiscardStream()
         # state objects we can switch too
@@ -460,19 +503,24 @@ class TestProtocolServer(object):
         self._reading_skip_details = _ReadingSkipDetails(self)
         self._reading_success_details = _ReadingSuccessDetails(self)
         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
+        self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
         # start with outside test.
         self._state = self._outside_test
+        # Avoid casts on every call
+        self._plusminus = _b('+-')
+        self._push_sym = _b('push')
+        self._pop_sym = _b('pop')
 
     def _handleProgress(self, offset, line):
         """Process a progress directive."""
         line = line[offset:].strip()
-        if line[0] in '+-':
+        if line[0] in self._plusminus:
             whence = PROGRESS_CUR
             delta = int(line)
-        elif line == "push":
+        elif line == self._push_sym:
             whence = PROGRESS_PUSH
             delta = None
-        elif line == "pop":
+        elif line == self._pop_sym:
             whence = PROGRESS_POP
             delta = None
         else:
@@ -482,7 +530,7 @@ class TestProtocolServer(object):
 
     def _handleTags(self, offset, line):
         """Process a tags command."""
-        tags = line[offset:].split()
+        tags = line[offset:].decode('utf8').split()
         new_tags, gone_tags = tags_to_new_gone(tags)
         self.client.tags(new_tags, gone_tags)
 
@@ -490,8 +538,9 @@ class TestProtocolServer(object):
         # Accept it, but do not do anything with it yet.
         try:
             event_time = iso8601.parse_date(line[offset:-1])
-        except TypeError, e:
-            raise TypeError("Failed to parse %r, got %r" % (line, e))
+        except TypeError:
+            raise TypeError(_u("Failed to parse %r, got %r")
+                % (line, sys.exec_info[1]))
         self.client.time(event_time)
 
     def lineReceived(self, line):
@@ -499,7 +548,7 @@ class TestProtocolServer(object):
         self._state.lineReceived(line)
 
     def _lostConnectionInTest(self, state_string):
-        error_string = u"lost connection during %stest '%s'" % (
+        error_string = _u("lost connection during %stest '%s'") % (
             state_string, self.current_test_description)
         self.client.addError(self._current_test, RemoteError(error_string))
         self.client.stopTest(self._current_test)
@@ -510,7 +559,7 @@ class TestProtocolServer(object):
 
     def readFrom(self, pipe):
         """Blocking convenience API to parse an entire stream.
-        
+
         :param pipe: A file-like object supporting readlines().
         :return: None.
         """
@@ -531,10 +580,11 @@ class TestProtocolServer(object):
 
 class TestProtocolClient(testresult.TestResult):
     """A TestResult which generates a subunit stream for a test run.
-    
+
     # Get a TestSuite or TestCase to run
     suite = make_suite()
-    # Create a stream (any object with a 'write' method)
+    # Create a stream (any object with a 'write' method). This should accept
+    # bytes not strings: subunit is a byte orientated protocol.
     stream = file('tests.log', 'wb')
     # Create a subunit result object which will output to the stream
     result = subunit.TestProtocolClient(stream)
@@ -551,10 +601,18 @@ class TestProtocolClient(testresult.TestResult):
         testresult.TestResult.__init__(self)
         self._stream = stream
         _make_stream_binary(stream)
+        self._progress_fmt = _b("progress: ")
+        self._bytes_eol = _b("\n")
+        self._progress_plus = _b("+")
+        self._progress_push = _b("push")
+        self._progress_pop = _b("pop")
+        self._empty_bytes = _b("")
+        self._start_simple = _b(" [\n")
+        self._end_simple = _b("]\n")
 
     def addError(self, test, error=None, details=None):
         """Report an error in test test.
-        
+
         Only one of error and details should be provided: conceptually there
         are two separate methods:
             addError(self, test, error)
@@ -569,7 +627,7 @@ class TestProtocolClient(testresult.TestResult):
 
     def addExpectedFailure(self, test, error=None, details=None):
         """Report an expected failure in test test.
-        
+
         Only one of error and details should be provided: conceptually there
         are two separate methods:
             addError(self, test, error)
@@ -584,7 +642,7 @@ class TestProtocolClient(testresult.TestResult):
 
     def addFailure(self, test, error=None, details=None):
         """Report a failure in test test.
-        
+
         Only one of error and details should be provided: conceptually there
         are two separate methods:
             addFailure(self, test, error)
@@ -597,9 +655,10 @@ class TestProtocolClient(testresult.TestResult):
         """
         self._addOutcome("failure", test, error=error, details=details)
 
-    def _addOutcome(self, outcome, test, error=None, details=None):
+    def _addOutcome(self, outcome, test, error=None, details=None,
+        error_permitted=True):
         """Report a failure in test test.
-        
+
         Only one of error and details should be provided: conceptually there
         are two separate methods:
             addOutcome(self, test, error)
@@ -611,43 +670,60 @@ class TestProtocolClient(testresult.TestResult):
             exc_info tuple.
         :param details: New Testing-in-python drafted API; a dict from string
             to subunit.Content objects.
-        """
-        self._stream.write("%s: %s" % (outcome, test.id()))
-        if error is None and details is None:
-            raise ValueError
+        :param error_permitted: If True then one and only one of error or
+            details must be supplied. If False then error must not be supplied
+            and details is still optional.  """
+        self._stream.write(_b("%s: %s" % (outcome, test.id())))
+        if error_permitted:
+            if error is None and details is None:
+                raise ValueError
+        else:
+            if error is not None:
+                raise ValueError
         if error is not None:
-            self._stream.write(" [\n")
+            self._stream.write(self._start_simple)
             # XXX: this needs to be made much stricter, along the lines of
             # Martin[gz]'s work in testtools. Perhaps subunit can use that?
             for line in self._exc_info_to_unicode(error, test).splitlines():
                 self._stream.write(("%s\n" % line).encode('utf8'))
-        else:
+        elif details is not None:
             self._write_details(details)
-        self._stream.write("]\n")
+        else:
+            self._stream.write(_b("\n"))
+        if details is not None or error is not None:
+            self._stream.write(self._end_simple)
 
     def addSkip(self, test, reason=None, details=None):
         """Report a skipped test."""
         if reason is None:
             self._addOutcome("skip", test, error=None, details=details)
         else:
-            self._stream.write("skip: %s [\n" % test.id())
-            self._stream.write("%s\n" % reason)
-            self._stream.write("]\n")
+            self._stream.write(_b("skip: %s [\n" % test.id()))
+            self._stream.write(_b("%s\n" % reason))
+            self._stream.write(self._end_simple)
 
     def addSuccess(self, test, details=None):
         """Report a success in a test."""
-        self._stream.write("successful: %s" % test.id())
-        if not details:
-            self._stream.write("\n")
-        else:
-            self._write_details(details)
-            self._stream.write("]\n")
-    addUnexpectedSuccess = addSuccess
+        self._addOutcome("successful", test, details=details, error_permitted=False)
+
+    def addUnexpectedSuccess(self, test, details=None):
+        """Report an unexpected success in test test.
+
+        Details can optionally be provided: conceptually there
+        are two separate methods:
+            addError(self, test)
+            addError(self, test, details)
+
+        :param details: New Testing-in-python drafted API; a dict from string
+            to subunit.Content objects.
+        """
+        self._addOutcome("uxsuccess", test, details=details,
+            error_permitted=False)
 
     def startTest(self, test):
         """Mark a test as starting its test run."""
         super(TestProtocolClient, self).startTest(test)
-        self._stream.write("test: %s\n" % test.id())
+        self._stream.write(_b("test: %s\n" % test.id()))
         self._stream.flush()
 
     def stopTest(self, test):
@@ -665,16 +741,19 @@ class TestProtocolClient(testresult.TestResult):
             PROGRESS_POP.
         """
         if whence == PROGRESS_CUR and offset > -1:
-            prefix = "+"
+            prefix = self._progress_plus
+            offset = _b(str(offset))
         elif whence == PROGRESS_PUSH:
-            prefix = ""
-            offset = "push"
+            prefix = self._empty_bytes
+            offset = self._progress_push
         elif whence == PROGRESS_POP:
-            prefix = ""
-            offset = "pop"
+            prefix = self._empty_bytes
+            offset = self._progress_pop
         else:
-            prefix = ""
-        self._stream.write("progress: %s%s\n" % (prefix, offset))
+            prefix = self._empty_bytes
+            offset = _b(str(offset))
+        self._stream.write(self._progress_fmt + prefix + offset +
+            self._bytes_eol)
 
     def time(self, a_datetime):
         """Inform the client of the time.
@@ -682,42 +761,42 @@ class TestProtocolClient(testresult.TestResult):
         ":param datetime: A datetime.datetime object.
         """
         time = a_datetime.astimezone(iso8601.Utc())
-        self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
+        self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
             time.year, time.month, time.day, time.hour, time.minute,
-            time.second, time.microsecond))
+            time.second, time.microsecond)))
 
     def _write_details(self, details):
         """Output details to the stream.
 
         :param details: An extended details dict for a test outcome.
         """
-        self._stream.write(" [ multipart\n")
-        for name, content in sorted(details.iteritems()):
-            self._stream.write("Content-Type: %s/%s" %
-                (content.content_type.type, content.content_type.subtype))
+        self._stream.write(_b(" [ multipart\n"))
+        for name, content in sorted(details.items()):
+            self._stream.write(_b("Content-Type: %s/%s" %
+                (content.content_type.type, content.content_type.subtype)))
             parameters = content.content_type.parameters
             if parameters:
-                self._stream.write(";")
+                self._stream.write(_b(";"))
                 param_strs = []
-                for param, value in parameters.iteritems():
+                for param, value in parameters.items():
                     param_strs.append("%s=%s" % (param, value))
-                self._stream.write(",".join(param_strs))
-            self._stream.write("\n%s\n" % name)
+                self._stream.write(_b(",".join(param_strs)))
+            self._stream.write(_b("\n%s\n" % name))
             encoder = chunked.Encoder(self._stream)
-            map(encoder.write, content.iter_bytes())
+            list(map(encoder.write, content.iter_bytes()))
             encoder.close()
 
     def done(self):
         """Obey the testtools result.done() interface."""
 
 
-def RemoteError(description=u""):
+def RemoteError(description=_u("")):
     return (_StringException, _StringException(description), None)
 
 
 class RemotedTestCase(unittest.TestCase):
     """A class to represent test cases run in child processes.
-    
+
     Instances of this class are used to provide the Python test API a TestCase
     that can be printed to the screen, introspected for metadata and so on.
     However, as they are a simply a memoisation of a test that was actually
@@ -761,7 +840,7 @@ class RemotedTestCase(unittest.TestCase):
     def run(self, result=None):
         if result is None: result = self.defaultTestResult()
         result.startTest(self)
-        result.addError(self, RemoteError(u"Cannot run RemotedTestCases.\n"))
+        result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
         result.stopTest(self)
 
     def _strclass(self):
@@ -795,14 +874,16 @@ class ExecTestCase(unittest.TestCase):
 
     def _run(self, result):
         protocol = TestProtocolServer(result)
-        output = subprocess.Popen(self.script, shell=True,
-            stdout=subprocess.PIPE).communicate()[0]
-        protocol.readFrom(StringIO(output))
+        process = subprocess.Popen(self.script, shell=True,
+            stdout=subprocess.PIPE)
+        _make_stream_binary(process.stdout)
+        output = process.communicate()[0]
+        protocol.readFrom(BytesIO(output))
 
 
 class IsolatedTestCase(unittest.TestCase):
     """A TestCase which executes in a forked process.
-    
+
     Each test gets its own process, which has a performance overhead but will
     provide excellent isolation from global state (such as django configs,
     zope utilities and so on).
@@ -815,7 +896,7 @@ class IsolatedTestCase(unittest.TestCase):
 
 class IsolatedTestSuite(unittest.TestSuite):
     """A TestSuite which runs its tests in a forked process.
-    
+
     This decorator that will fork() before running the tests and report the
     results from the child process using a Subunit stream.  This is useful for
     handling tests that mutate global state, or are testing C extensions that
@@ -846,10 +927,10 @@ def run_isolated(klass, self, result):
         # at this point, sys.stdin is redirected, now we want
         # to filter it to escape ]'s.
         ### XXX: test and write that bit.
-
-        result = TestProtocolClient(sys.stdout)
+        stream = os.fdopen(1, 'wb')
+        result = TestProtocolClient(stream)
         klass.run(self, result)
-        sys.stdout.flush()
+        stream.flush()
         sys.stderr.flush()
         # exit HARD, exit NOW.
         os._exit(0)
@@ -859,7 +940,8 @@ def run_isolated(klass, self, result):
         os.close(c2pwrite)
         # hookup a protocol engine
         protocol = TestProtocolServer(result)
-        protocol.readFrom(os.fdopen(c2pread, 'rU'))
+        fileobj = os.fdopen(c2pread, 'rb')
+        protocol.readFrom(fileobj)
         os.waitpid(pid, 0)
         # TODO return code evaluation.
     return result
@@ -867,7 +949,7 @@ def run_isolated(klass, self, result):
 
 def TAP2SubUnit(tap, subunit):
     """Filter a TAP pipe into a subunit pipe.
-    
+
     :param tap: A tap pipe/stream/file object.
     :param subunit: A pipe/stream/file object to write subunit results to.
     :return: The exit code to exit with.
@@ -875,7 +957,6 @@ def TAP2SubUnit(tap, subunit):
     BEFORE_PLAN = 0
     AFTER_PLAN = 1
     SKIP_STREAM = 2
-    client = TestProtocolClient(subunit)
     state = BEFORE_PLAN
     plan_start = 1
     plan_stop = 0
@@ -1025,11 +1106,11 @@ class ProtocolTestCase(object):
     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
     and ``countTestCases`` methods are not supported because there isn't a
     sensible mapping for those methods.
-    
+
     # Get a stream (any object with a readline() method), in this case the
     # stream output by the example from ``subunit.TestProtocolClient``.
     stream = file('tests.log', 'rb')
-    # Create a parser which will read from the stream and emit 
+    # Create a parser which will read from the stream and emit
     # activity to a unittest.TestResult when run() is called.
     suite = subunit.ProtocolTestCase(stream)
     # Create a result object to accept the contents of that stream.
@@ -1055,7 +1136,6 @@ class ProtocolTestCase(object):
         _make_stream_binary(stream)
         self._passthrough = passthrough
         self._forward = forward
-        _make_stream_binary(forward)
 
     def __call__(self, result=None):
         return self.run(result)
@@ -1073,7 +1153,7 @@ class ProtocolTestCase(object):
 
 class TestResultStats(testresult.TestResult):
     """A pyunit TestResult interface implementation for making statistics.
-    
+
     :ivar total_tests: The total tests seen.
     :ivar passed_tests: The tests that passed.
     :ivar failed_tests: The tests that failed.
@@ -1124,20 +1204,44 @@ class TestResultStats(testresult.TestResult):
 
 def get_default_formatter():
     """Obtain the default formatter to write to.
-    
+
     :return: A file-like object.
     """
     formatter = os.getenv("SUBUNIT_FORMATTER")
     if formatter:
         return os.popen(formatter, "w")
     else:
-        return sys.stdout
+        stream = sys.stdout
+        if sys.version_info > (3, 0):
+            stream = stream.buffer
+        return stream
+
+
+if sys.version_info > (3, 0):
+    from io import UnsupportedOperation as _NoFilenoError
+else:
+    _NoFilenoError = AttributeError
+
+def read_test_list(path):
+    """Read a list of test ids from a file on disk.
+
+    :param path: Path to the file
+    :return: Sequence of test ids
+    """
+    f = open(path, 'rb')
+    try:
+        return [l.rstrip("\n") for l in f.readlines()]
+    finally:
+        f.close()
 
 
 def _make_stream_binary(stream):
     """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
-    if getattr(stream, 'fileno', None) is not None:
-        _make_binary_on_windows(stream.fileno())
+    try:
+        fileno = stream.fileno()
+    except _NoFilenoError:
+        return
+    _make_binary_on_windows(fileno)
 
 def _make_binary_on_windows(fileno):
     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
index 82e4b0ddfc53a744acc45a09e611e511d6a64323..b9921291ea2d25a4e2a0c45ab7ea560241783507 100644 (file)
@@ -1,12 +1,13 @@
 #
 #  subunit: extensions to python unittest to get test results from subprocesses.
 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
+#  Copyright (C) 2011  Martin Pool <mbp@sourcefrog.net>
 #
 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 
 """Encoder/decoder for http style chunked encoding."""
 
+from testtools.compat import _b
+
+empty = _b('')
+
 class Decoder(object):
     """Decode chunked content to a byte stream."""
 
-    def __init__(self, output):
+    def __init__(self, output, strict=True):
         """Create a decoder decoding to output.
 
         :param output: A file-like object. Bytes written to the Decoder are
             decoded to strip off the chunking and written to the output.
-            Up to a full write worth of data or a single control line  may be
+            Up to a full write worth of data or a single control line may be
             buffered (whichever is larger). The close method should be called
             when no more data is available, to detect short streams; the
             write method will return none-None when the end of a stream is
-            detected.
+            detected. The output object must accept bytes objects.
+
+        :param strict: If True (the default), the decoder will not knowingly
+            accept input that is not conformant to the HTTP specification.
+            (This does not imply that it will catch every nonconformance.)
+            If False, it will accept incorrect input that is still
+            unambiguous.
         """
         self.output = output
         self.buffered_bytes = []
         self.state = self._read_length
         self.body_length = 0
+        self.strict = strict
+        self._match_chars = _b("0123456789abcdefABCDEF\r\n")
+        self._slash_n = _b('\n')
+        self._slash_r = _b('\r')
+        self._slash_rn = _b('\r\n')
+        self._slash_nr = _b('\n\r')
 
     def close(self):
         """Close the decoder.
@@ -48,7 +65,7 @@ class Decoder(object):
         if self.buffered_bytes:
             buffered_bytes = self.buffered_bytes
             self.buffered_bytes = []
-            return ''.join(buffered_bytes)
+            return empty.join(buffered_bytes)
         else:
             raise ValueError("stream is finished")
 
@@ -72,22 +89,26 @@ class Decoder(object):
 
     def _read_length(self):
         """Try to decode a length from the bytes."""
-        count = -1
-        match_chars = "0123456789abcdefABCDEF\r\n"
         count_chars = []
         for bytes in self.buffered_bytes:
-            for byte in bytes:
-                if byte not in match_chars:
+            for pos in range(len(bytes)):
+                byte = bytes[pos:pos+1]
+                if byte not in self._match_chars:
                     break
                 count_chars.append(byte)
-                if byte == '\n':
+                if byte == self._slash_n:
                     break
         if not count_chars:
             return
-        if count_chars[-1][-1] != '\n':
+        if count_chars[-1] != self._slash_n:
             return
-        count_str = ''.join(count_chars)
-        self.body_length = int(count_str[:-2], 16)
+        count_str = empty.join(count_chars)
+        if self.strict:
+            if count_str[-2:] != self._slash_rn:
+                raise ValueError("chunk header invalid: %r" % count_str)
+            if self._slash_r in count_str[:-2]:
+                raise ValueError("too many CRs in chunk header %r" % count_str)
+        self.body_length = int(count_str.rstrip(self._slash_nr), 16)
         excess_bytes = len(count_str)
         while excess_bytes:
             if excess_bytes >= len(self.buffered_bytes[0]):
@@ -100,14 +121,14 @@ class Decoder(object):
             self.state = self._finished
             if not self.buffered_bytes:
                 # May not call into self._finished with no buffered data.
-                return ''
+                return empty
         else:
             self.state = self._read_body
         return self.state()
 
     def write(self, bytes):
         """Decode bytes to the output stream.
-        
+
         :raises ValueError: If the stream has already seen the end of file
             marker.
         :returns: None, or the excess bytes beyond the end of file marker.
@@ -133,7 +154,7 @@ class Encoder(object):
 
     def flush(self, extra_len=0):
         """Flush the encoder to the output stream.
-        
+
         :param extra_len: Increase the size of the chunk by this many bytes
             to allow for a subsequent write.
         """
@@ -143,9 +164,9 @@ class Encoder(object):
         buffer_size = self.buffer_size
         self.buffered_bytes = []
         self.buffer_size = 0
-        self.output.write("%X\r\n" % (buffer_size + extra_len))
+        self.output.write(_b("%X\r\n" % (buffer_size + extra_len)))
         if buffer_size:
-            self.output.write(''.join(buffered_bytes))
+            self.output.write(empty.join(buffered_bytes))
         return True
 
     def write(self, bytes):
@@ -161,4 +182,4 @@ class Encoder(object):
     def close(self):
         """Finish the stream. This does not close the output stream."""
         self.flush()
-        self.output.write("0\r\n")
+        self.output.write(_b("0\r\n"))
index a37b2acb932dd958baa29ec5be7e5d92cba8c3e1..9e5e005864c907d01de6bc497534c7f6553ff263 100644 (file)
 
 """Handlers for outcome details."""
 
-from cStringIO import StringIO
-
 from testtools import content, content_type
+from testtools.compat import _b, BytesIO
+
+from subunit import chunked
 
-import chunked
+end_marker = _b("]\n")
+quoted_marker = _b(" ]")
+empty = _b('')
 
 
 class DetailsParser(object):
@@ -31,14 +34,14 @@ class SimpleDetailsParser(DetailsParser):
     """Parser for single-part [] delimited details."""
 
     def __init__(self, state):
-        self._message = ""
+        self._message = _b("")
         self._state = state
 
     def lineReceived(self, line):
-        if line == "]\n":
+        if line == end_marker:
             self._state.endDetails()
             return
-        if line[0:2] == " ]":
+        if line[0:2] == quoted_marker:
             # quoted ] start
             self._message += line[1:]
         else:
@@ -77,18 +80,21 @@ class MultipartDetailsParser(DetailsParser):
         self._parse_state = self._look_for_content
 
     def _look_for_content(self, line):
-        if line == "]\n":
+        if line == end_marker:
             self._state.endDetails()
             return
         # TODO error handling
-        field, value = line[:-1].split(' ', 1)
-        main, sub = value.split('/')
+        field, value = line[:-1].decode('utf8').split(' ', 1)
+        try:
+            main, sub = value.split('/')
+        except ValueError:
+            raise ValueError("Invalid MIME type %r" % value)
         self._content_type = content_type.ContentType(main, sub)
         self._parse_state = self._get_name
 
     def _get_name(self, line):
-        self._name = line[:-1]
-        self._body = StringIO()
+        self._name = line[:-1].decode('utf8')
+        self._body = BytesIO()
         self._chunk_parser = chunked.Decoder(self._body)
         self._parse_state = self._feed_chunks
 
@@ -96,7 +102,7 @@ class MultipartDetailsParser(DetailsParser):
         residue = self._chunk_parser.write(line)
         if residue is not None:
             # Line based use always ends on no residue.
-            assert residue == '', 'residue: %r' % (residue,)
+            assert residue == empty, 'residue: %r' % (residue,)
             body = self._body
             self._details[self._name] = content.Content(
                 self._content_type, lambda:[body.getvalue()])
index 93c92fb516327833e1794fd955b1bbf543265c83..cbe9a3b3ebd9e28e78936ffa115a8d5b601e910f 100644 (file)
@@ -31,15 +31,25 @@ datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
 
 from datetime import datetime, timedelta, tzinfo
 import re
+import sys
 
 __all__ = ["parse_date", "ParseError"]
 
 # Adapted from http://delete.me.uk/2005/03/iso8601.html
-ISO8601_REGEX = re.compile(r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})"
+ISO8601_REGEX_PATTERN = (r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})"
     r"((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?"
     r"(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?"
 )
-TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})")
+TIMEZONE_REGEX_PATTERN = "(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})"
+ISO8601_REGEX = re.compile(ISO8601_REGEX_PATTERN.encode('utf8'))
+TIMEZONE_REGEX = re.compile(TIMEZONE_REGEX_PATTERN.encode('utf8'))
+
+zulu = "Z".encode('latin-1')
+minus = "-".encode('latin-1')
+
+if sys.version_info < (3, 0):
+    bytes = str
+
 
 class ParseError(Exception):
     """Raised when there is a problem parsing a date string"""
@@ -84,7 +94,7 @@ def parse_timezone(tzstring, default_timezone=UTC):
     """Parses ISO 8601 time zone specs into tzinfo offsets
     
     """
-    if tzstring == "Z":
+    if tzstring == zulu:
         return default_timezone
     # This isn't strictly correct, but it's common to encounter dates without
     # timezones so I'll assume the default (which defaults to UTC).
@@ -94,7 +104,7 @@ def parse_timezone(tzstring, default_timezone=UTC):
     m = TIMEZONE_REGEX.match(tzstring)
     prefix, hours, minutes = m.groups()
     hours, minutes = int(hours), int(minutes)
-    if prefix == "-":
+    if prefix == minus:
         hours = -hours
         minutes = -minutes
     return FixedOffset(hours, minutes, tzstring)
@@ -107,8 +117,8 @@ def parse_date(datestring, default_timezone=UTC):
     default timezone specified in default_timezone is used. This is UTC by
     default.
     """
-    if not isinstance(datestring, basestring):
-        raise ParseError("Expecting a string %r" % datestring)
+    if not isinstance(datestring, bytes):
+        raise ParseError("Expecting bytes %r" % datestring)
     m = ISO8601_REGEX.match(datestring)
     if not m:
         raise ParseError("Unable to parse date string %r" % datestring)
index b390de33f783f90f532716235f1d26f6ec4c64df..51d6837aab71c9661b2ff7b8479a7b2eceb0a6e6 100755 (executable)
@@ -49,7 +49,7 @@ class SubunitTestProgram(TestProgram):
 
     def usageExit(self, msg=None):
         if msg:
-            print msg
+            print (msg)
         usage = {'progName': self.progName, 'catchbreak': '', 'failfast': '',
                  'buffer': ''}
         if self.failfast != False:
index 1c91daadc6a470dce78be0cfd89eb1dc20e715f1..33fb50e07306ad60a27739473d2cbe9bf579c4c8 100644 (file)
 
 import datetime
 
-import iso8601
 import testtools
 
+from subunit import iso8601
+
 
 # NOT a TestResult, because we are implementing the interface, not inheriting
 # it.
@@ -81,8 +82,12 @@ class TestResultDecorator(object):
     def stop(self):
         return self.decorated.stop()
 
+    @property
+    def testsRun(self):
+        return self.decorated.testsRun
+
     def tags(self, new_tags, gone_tags):
-        return self.decorated.time(new_tags, gone_tags)
+        return self.decorated.tags(new_tags, gone_tags)
 
     def time(self, a_datetime):
         return self.decorated.time(a_datetime)
@@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
         return self.decorated.time(a_datetime)
 
 
+class TagCollapsingDecorator(TestResultDecorator):
+    """Collapses many 'tags' calls into one where possible."""
+
+    def __init__(self, result):
+        super(TagCollapsingDecorator, self).__init__(result)
+        # The (new, gone) tags for the current test.
+        self._current_test_tags = None
+
+    def startTest(self, test):
+        """Start a test.
+
+        Not directly passed to the client, but used for handling of tags
+        correctly.
+        """
+        self.decorated.startTest(test)
+        self._current_test_tags = set(), set()
+
+    def stopTest(self, test):
+        """Stop a test.
+
+        Not directly passed to the client, but used for handling of tags
+        correctly.
+        """
+        # Tags to output for this test.
+        if self._current_test_tags[0] or self._current_test_tags[1]:
+            self.decorated.tags(*self._current_test_tags)
+        self.decorated.stopTest(test)
+        self._current_test_tags = None
+
+    def tags(self, new_tags, gone_tags):
+        """Handle tag instructions.
+
+        Adds and removes tags as appropriate. If a test is currently running,
+        tags are not affected for subsequent tests.
+
+        :param new_tags: Tags to add,
+        :param gone_tags: Tags to remove.
+        """
+        if self._current_test_tags is not None:
+            # gather the tags until the test stops.
+            self._current_test_tags[0].update(new_tags)
+            self._current_test_tags[0].difference_update(gone_tags)
+            self._current_test_tags[1].update(gone_tags)
+            self._current_test_tags[1].difference_update(new_tags)
+        else:
+            return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+    """Only pass on the first and last of a consecutive sequence of times."""
+
+    def __init__(self, decorated):
+        super(TimeCollapsingDecorator, self).__init__(decorated)
+        self._last_received_time = None
+        self._last_sent_time = None
+
+    def _before_event(self):
+        if self._last_received_time is None:
+            return
+        if self._last_received_time != self._last_sent_time:
+            self.decorated.time(self._last_received_time)
+            self._last_sent_time = self._last_received_time
+        self._last_received_time = None
+
+    def time(self, a_time):
+        # Don't upcall, because we don't want to call _before_event, it's only
+        # for non-time events.
+        if self._last_received_time is None:
+            self.decorated.time(a_time)
+            self._last_sent_time = a_time
+        self._last_received_time = a_time
+
+
+def all_true(bools):
+    """Return True if all of 'bools' are True. False otherwise."""
+    for b in bools:
+        if not b:
+            return False
+    return True
+
+
 class TestResultFilter(TestResultDecorator):
     """A pyunit TestResult interface implementation which filters tests.
 
@@ -208,82 +294,110 @@ class TestResultFilter(TestResultDecorator):
     """
 
     def __init__(self, result, filter_error=False, filter_failure=False,
-        filter_success=True, filter_skip=False,
-        filter_predicate=None):
+        filter_success=True, filter_skip=False, filter_xfail=False,
+        filter_predicate=None, fixup_expected_failures=None):
         """Create a FilterResult object filtering to result.
 
         :param filter_error: Filter out errors.
         :param filter_failure: Filter out failures.
         :param filter_success: Filter out successful tests.
         :param filter_skip: Filter out skipped tests.
+        :param filter_xfail: Filter out expected failure tests.
         :param filter_predicate: A callable taking (test, outcome, err,
             details) and returning True if the result should be passed
             through.  err and details may be none if no error or extra
             metadata is available. outcome is the name of the outcome such
             as 'success' or 'failure'.
+        :param fixup_expected_failures: Set of test ids to consider known
+            failing.
         """
-        TestResultDecorator.__init__(self, result)
-        self._filter_error = filter_error
-        self._filter_failure = filter_failure
-        self._filter_success = filter_success
-        self._filter_skip = filter_skip
-        if filter_predicate is None:
-            filter_predicate = lambda test, outcome, err, details: True
-        self.filter_predicate = filter_predicate
+        super(TestResultFilter, self).__init__(result)
+        self.decorated = TimeCollapsingDecorator(
+            TagCollapsingDecorator(self.decorated))
+        predicates = []
+        if filter_error:
+            predicates.append(lambda t, outcome, e, d: outcome != 'error')
+        if filter_failure:
+            predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+        if filter_success:
+            predicates.append(lambda t, outcome, e, d: outcome != 'success')
+        if filter_skip:
+            predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+        if filter_xfail:
+            predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
+        if filter_predicate is not None:
+            predicates.append(filter_predicate)
+        self.filter_predicate = (
+            lambda test, outcome, err, details:
+                all_true(p(test, outcome, err, details) for p in predicates))
         # The current test (for filtering tags)
         self._current_test = None
         # Has the current test been filtered (for outputting test tags)
         self._current_test_filtered = None
-        # The (new, gone) tags for the current test.
-        self._current_test_tags = None
+        # Calls to this result that we don't know whether to forward on yet.
+        self._buffered_calls = []
+        if fixup_expected_failures is None:
+            self._fixup_expected_failures = frozenset()
+        else:
+            self._fixup_expected_failures = fixup_expected_failures
 
     def addError(self, test, err=None, details=None):
-        if (not self._filter_error and
-            self.filter_predicate(test, 'error', err, details)):
-            self.decorated.startTest(test)
-            self.decorated.addError(test, err, details=details)
+        if (self.filter_predicate(test, 'error', err, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addExpectedFailure', [test, err], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addError', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addFailure(self, test, err=None, details=None):
-        if (not self._filter_failure and
-            self.filter_predicate(test, 'failure', err, details)):
-            self.decorated.startTest(test)
-            self.decorated.addFailure(test, err, details=details)
+        if (self.filter_predicate(test, 'failure', err, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addExpectedFailure', [test, err], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addSkip(self, test, reason=None, details=None):
-        if (not self._filter_skip and
-            self.filter_predicate(test, 'skip', reason, details)):
-            self.decorated.startTest(test)
-            self.decorated.addSkip(test, reason, details=details)
+        if (self.filter_predicate(test, 'skip', reason, details)):
+            self._buffered_calls.append(
+                ('addSkip', [test, reason], {'details': details}))
         else:
             self._filtered()
 
     def addSuccess(self, test, details=None):
-        if (not self._filter_success and
-            self.filter_predicate(test, 'success', None, details)):
-            self.decorated.startTest(test)
-            self.decorated.addSuccess(test, details=details)
+        if (self.filter_predicate(test, 'success', None, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addUnexpectedSuccess', [test], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addSuccess', [test], {'details': details}))
         else:
             self._filtered()
 
     def addExpectedFailure(self, test, err=None, details=None):
         if self.filter_predicate(test, 'expectedfailure', err, details):
-            self.decorated.startTest(test)
-            return self.decorated.addExpectedFailure(test, err,
-                details=details)
+            self._buffered_calls.append(
+                ('addExpectedFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addUnexpectedSuccess(self, test, details=None):
-        self.decorated.startTest(test)
-        return self.decorated.addUnexpectedSuccess(test, details=details)
+        self._buffered_calls.append(
+            ('addUnexpectedSuccess', [test], {'details': details}))
 
     def _filtered(self):
         self._current_test_filtered = True
 
+    def _failure_expected(self, test):
+        return (test.id() in self._fixup_expected_failures)
+
     def startTest(self, test):
         """Start a test.
 
@@ -292,7 +406,7 @@ class TestResultFilter(TestResultDecorator):
         """
         self._current_test = test
         self._current_test_filtered = False
-        self._current_test_tags = set(), set()
+        self._buffered_calls.append(('startTest', [test], {}))
 
     def stopTest(self, test):
         """Stop a test.
@@ -302,29 +416,18 @@ class TestResultFilter(TestResultDecorator):
         """
         if not self._current_test_filtered:
             # Tags to output for this test.
-            if self._current_test_tags[0] or self._current_test_tags[1]:
-                self.decorated.tags(*self._current_test_tags)
+            for method, args, kwargs in self._buffered_calls:
+                getattr(self.decorated, method)(*args, **kwargs)
             self.decorated.stopTest(test)
         self._current_test = None
         self._current_test_filtered = None
-        self._current_test_tags = None
+        self._buffered_calls = []
 
-    def tags(self, new_tags, gone_tags):
-        """Handle tag instructions.
-
-        Adds and removes tags as appropriate. If a test is currently running,
-        tags are not affected for subsequent tests.
-
-        :param new_tags: Tags to add,
-        :param gone_tags: Tags to remove.
-        """
+    def time(self, a_time):
         if self._current_test is not None:
-            # gather the tags until the test stops.
-            self._current_test_tags[0].update(new_tags)
-            self._current_test_tags[0].difference_update(gone_tags)
-            self._current_test_tags[1].update(gone_tags)
-            self._current_test_tags[1].difference_update(new_tags)
-        return self.decorated.tags(new_tags, gone_tags)
+            self._buffered_calls.append(('time', [a_time], {}))
+        else:
+            return self.decorated.time(a_time)
 
     def id_to_orig_id(self, id):
         if id.startswith("subunit.RemotedTestCase."):
@@ -336,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult):
 
     def __init__(self, stream, show_times=False):
         """Create a FilterResult object outputting to stream."""
-        testtools.TestResult.__init__(self)
+        super(TestIdPrintingResult, self).__init__()
         self._stream = stream
         self.failed_tests = 0
-        self.__time = 0
+        self.__time = None
         self.show_times = show_times
         self._test = None
         self._test_duration = 0
@@ -355,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult):
     def addSuccess(self, test):
         self._test = test
 
+    def addSkip(self, test, reason=None, details=None):
+        self._test = test
+
+    def addUnexpectedSuccess(self, test, details=None):
+        self.failed_tests += 1
+        self._test = test
+
+    def addExpectedFailure(self, test, err=None, details=None):
+        self._test = test
+
     def reportTest(self, test, duration):
         if self.show_times:
             seconds = duration.seconds
index 1b5ba9c29372ec458ffda36e45c610e254546b91..39d901e0a9b2b3a854b23334b277713d72298fae 100644 (file)
@@ -53,7 +53,7 @@ def visitTests(suite, visitor):
                 visitor.visitSuite(test)
                 visitTests(test, visitor)
             else:
-                print "unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__)
+                print ("unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__))
 
 
 class TestSuite(unittest.TestSuite):
index 0ee019ae4aef40411f8903d47969fb0acc4555af..618e4952d7938c36dc0df5b41dd1be118155bdf7 100755 (executable)
@@ -1,5 +1,8 @@
 #!/usr/bin/env python
 import sys
+if sys.platform == "win32":
+    import msvcrt, os
+    msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
 if len(sys.argv) == 2:
     # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args 
     # uses this code path to be sure that the arguments were passed to
index a24e31e0c2cf10c286afc6eaa16f55d7926bc8c2..e0742f1af3648ff6e110c33a8a31e5a288947764 100644 (file)
@@ -1,6 +1,7 @@
 #
 #  subunit: extensions to python unittest to get test results from subprocesses.
 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
+#  Copyright (C) 2011  Martin Pool <mbp@sourcefrog.net>
 #
 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
 #  license at the users choice. A copy of both licenses are available in the
 #  limitations under that license.
 #
 
-from cStringIO import StringIO
 import unittest
 
+from testtools.compat import _b, BytesIO
+
 import subunit.chunked
 
 
@@ -30,98 +32,121 @@ class TestDecode(unittest.TestCase):
 
     def setUp(self):
         unittest.TestCase.setUp(self)
-        self.output = StringIO()
+        self.output = BytesIO()
         self.decoder = subunit.chunked.Decoder(self.output)
 
     def test_close_read_length_short_errors(self):
         self.assertRaises(ValueError, self.decoder.close)
 
     def test_close_body_short_errors(self):
-        self.assertEqual(None, self.decoder.write('2\r\na'))
+        self.assertEqual(None, self.decoder.write(_b('2\r\na')))
         self.assertRaises(ValueError, self.decoder.close)
 
     def test_close_body_buffered_data_errors(self):
-        self.assertEqual(None, self.decoder.write('2\r'))
+        self.assertEqual(None, self.decoder.write(_b('2\r')))
         self.assertRaises(ValueError, self.decoder.close)
 
     def test_close_after_finished_stream_safe(self):
-        self.assertEqual(None, self.decoder.write('2\r\nab'))
-        self.assertEqual('', self.decoder.write('0\r\n'))
+        self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
+        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
         self.decoder.close()
 
     def test_decode_nothing(self):
-        self.assertEqual('', self.decoder.write('0\r\n'))
-        self.assertEqual('', self.output.getvalue())
+        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
+        self.assertEqual(_b(''), self.output.getvalue())
 
     def test_decode_serialised_form(self):
-        self.assertEqual(None, self.decoder.write("F\r\n"))
-        self.assertEqual(None, self.decoder.write("serialised\n"))
-        self.assertEqual('', self.decoder.write("form0\r\n"))
+        self.assertEqual(None, self.decoder.write(_b("F\r\n")))
+        self.assertEqual(None, self.decoder.write(_b("serialised\n")))
+        self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
 
     def test_decode_short(self):
-        self.assertEqual('', self.decoder.write('3\r\nabc0\r\n'))
-        self.assertEqual('abc', self.output.getvalue())
+        self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
+        self.assertEqual(_b('abc'), self.output.getvalue())
 
     def test_decode_combines_short(self):
-        self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n'))
-        self.assertEqual('abcdef', self.output.getvalue())
+        self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
+        self.assertEqual(_b('abcdef'), self.output.getvalue())
 
     def test_decode_excess_bytes_from_write(self):
-        self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
-        self.assertEqual('abc', self.output.getvalue())
+        self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
+        self.assertEqual(_b('abc'), self.output.getvalue())
 
     def test_decode_write_after_finished_errors(self):
-        self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
-        self.assertRaises(ValueError, self.decoder.write, '')
+        self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
+        self.assertRaises(ValueError, self.decoder.write, _b(''))
 
     def test_decode_hex(self):
-        self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n'))
-        self.assertEqual('1234567890', self.output.getvalue())
+        self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
+        self.assertEqual(_b('1234567890'), self.output.getvalue())
 
     def test_decode_long_ranges(self):
-        self.assertEqual(None, self.decoder.write('10000\r\n'))
-        self.assertEqual(None, self.decoder.write('1' * 65536))
-        self.assertEqual(None, self.decoder.write('10000\r\n'))
-        self.assertEqual(None, self.decoder.write('2' * 65536))
-        self.assertEqual('', self.decoder.write('0\r\n'))
-        self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
+        self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
+        self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
+        self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
+        self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
+        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
+        self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
+
+    def test_decode_newline_nonstrict(self):
+        """Tolerate chunk markers with no CR character."""
+        # From <http://pad.lv/505078>
+        self.decoder = subunit.chunked.Decoder(self.output, strict=False)
+        self.assertEqual(None, self.decoder.write(_b('a\n')))
+        self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
+        self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
+        self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
+
+    def test_decode_strict_newline_only(self):
+        """Reject chunk markers with no CR character in strict mode."""
+        # From <http://pad.lv/505078>
+        self.assertRaises(ValueError,
+            self.decoder.write, _b('a\n'))
+
+    def test_decode_strict_multiple_crs(self):
+        self.assertRaises(ValueError,
+            self.decoder.write, _b('a\r\r\n'))
+
+    def test_decode_short_header(self):
+        self.assertRaises(ValueError,
+            self.decoder.write, _b('\n'))
 
 
 class TestEncode(unittest.TestCase):
 
     def setUp(self):
         unittest.TestCase.setUp(self)
-        self.output = StringIO()
+        self.output = BytesIO()
         self.encoder = subunit.chunked.Encoder(self.output)
 
     def test_encode_nothing(self):
         self.encoder.close()
-        self.assertEqual('0\r\n', self.output.getvalue())
+        self.assertEqual(_b('0\r\n'), self.output.getvalue())
 
     def test_encode_empty(self):
-        self.encoder.write('')
+        self.encoder.write(_b(''))
         self.encoder.close()
-        self.assertEqual('0\r\n', self.output.getvalue())
+        self.assertEqual(_b('0\r\n'), self.output.getvalue())
 
     def test_encode_short(self):
-        self.encoder.write('abc')
+        self.encoder.write(_b('abc'))
         self.encoder.close()
-        self.assertEqual('3\r\nabc0\r\n', self.output.getvalue())
+        self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
 
     def test_encode_combines_short(self):
-        self.encoder.write('abc')
-        self.encoder.write('def')
+        self.encoder.write(_b('abc'))
+        self.encoder.write(_b('def'))
         self.encoder.close()
-        self.assertEqual('6\r\nabcdef0\r\n', self.output.getvalue())
+        self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
 
     def test_encode_over_9_is_in_hex(self):
-        self.encoder.write('1234567890')
+        self.encoder.write(_b('1234567890'))
         self.encoder.close()
-        self.assertEqual('A\r\n12345678900\r\n', self.output.getvalue())
+        self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
 
     def test_encode_long_ranges_not_combined(self):
-        self.encoder.write('1' * 65536)
-        self.encoder.write('2' * 65536)
+        self.encoder.write(_b('1' * 65536))
+        self.encoder.write(_b('2' * 65536))
         self.encoder.close()
-        self.assertEqual('10000\r\n' + '1' * 65536 + '10000\r\n' +
-            '2' * 65536 + '0\r\n', self.output.getvalue())
+        self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
+            '2' * 65536 + '0\r\n'), self.output.getvalue())
index 41c32129d04f5c02bce69b8d136a84ced6e571a2..746aa041e59cb3753e1dd8abad822c44593aafab 100644 (file)
 #  limitations under that license.
 #
 
-from cStringIO import StringIO
 import unittest
 
+from testtools.compat import _b, StringIO
+
 import subunit.tests
 from subunit import content, content_type, details
 
@@ -31,20 +32,20 @@ class TestSimpleDetails(unittest.TestCase):
 
     def test_lineReceived(self):
         parser = details.SimpleDetailsParser(None)
-        parser.lineReceived("foo\n")
-        parser.lineReceived("bar\n")
-        self.assertEqual("foo\nbar\n", parser._message)
+        parser.lineReceived(_b("foo\n"))
+        parser.lineReceived(_b("bar\n"))
+        self.assertEqual(_b("foo\nbar\n"), parser._message)
 
     def test_lineReceived_escaped_bracket(self):
         parser = details.SimpleDetailsParser(None)
-        parser.lineReceived("foo\n")
-        parser.lineReceived(" ]are\n")
-        parser.lineReceived("bar\n")
-        self.assertEqual("foo\n]are\nbar\n", parser._message)
+        parser.lineReceived(_b("foo\n"))
+        parser.lineReceived(_b(" ]are\n"))
+        parser.lineReceived(_b("bar\n"))
+        self.assertEqual(_b("foo\n]are\nbar\n"), parser._message)
 
     def test_get_message(self):
         parser = details.SimpleDetailsParser(None)
-        self.assertEqual("", parser.get_message())
+        self.assertEqual(_b(""), parser.get_message())
 
     def test_get_details(self):
         parser = details.SimpleDetailsParser(None)
@@ -53,13 +54,13 @@ class TestSimpleDetails(unittest.TestCase):
         expected['traceback'] = content.Content(
             content_type.ContentType("text", "x-traceback",
                 {'charset': 'utf8'}),
-            lambda:[""])
+            lambda:[_b("")])
         found = parser.get_details()
         self.assertEqual(expected.keys(), found.keys())
         self.assertEqual(expected['traceback'].content_type,
             found['traceback'].content_type)
-        self.assertEqual(''.join(expected['traceback'].iter_bytes()),
-            ''.join(found['traceback'].iter_bytes()))
+        self.assertEqual(_b('').join(expected['traceback'].iter_bytes()),
+            _b('').join(found['traceback'].iter_bytes()))
 
     def test_get_details_skip(self):
         parser = details.SimpleDetailsParser(None)
@@ -67,7 +68,7 @@ class TestSimpleDetails(unittest.TestCase):
         expected = {}
         expected['reason'] = content.Content(
             content_type.ContentType("text", "plain"),
-            lambda:[""])
+            lambda:[_b("")])
         found = parser.get_details("skip")
         self.assertEqual(expected, found)
 
@@ -77,7 +78,7 @@ class TestSimpleDetails(unittest.TestCase):
         expected = {}
         expected['message'] = content.Content(
             content_type.ContentType("text", "plain"),
-            lambda:[""])
+            lambda:[_b("")])
         found = parser.get_details("success")
         self.assertEqual(expected, found)
 
@@ -94,18 +95,18 @@ class TestMultipartDetails(unittest.TestCase):
 
     def test_parts(self):
         parser = details.MultipartDetailsParser(None)
-        parser.lineReceived("Content-Type: text/plain\n")
-        parser.lineReceived("something\n")
-        parser.lineReceived("F\r\n")
-        parser.lineReceived("serialised\n")
-        parser.lineReceived("form0\r\n")
+        parser.lineReceived(_b("Content-Type: text/plain\n"))
+        parser.lineReceived(_b("something\n"))
+        parser.lineReceived(_b("F\r\n"))
+        parser.lineReceived(_b("serialised\n"))
+        parser.lineReceived(_b("form0\r\n"))
         expected = {}
         expected['something'] = content.Content(
             content_type.ContentType("text", "plain"),
-            lambda:["serialised\nform"])
+            lambda:[_b("serialised\nform")])
         found = parser.get_details()
         self.assertEqual(expected.keys(), found.keys())
         self.assertEqual(expected['something'].content_type,
             found['something'].content_type)
-        self.assertEqual(''.join(expected['something'].iter_bytes()),
-            ''.join(found['something'].iter_bytes()))
+        self.assertEqual(_b('').join(expected['something'].iter_bytes()),
+            _b('').join(found['something'].iter_bytes()))
index 3c65ed3afc10b3b70c9967182a6c1596b2d76e7b..06754840eb94735db420c75b06b33b2646e1a311 100644 (file)
@@ -6,7 +6,7 @@
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 
 """Tests for subunit.TestResultFilter."""
 
+from datetime import datetime
+from subunit import iso8601
 import unittest
-from StringIO import StringIO
+
+from testtools import TestCase
+from testtools.compat import _b, BytesIO, StringIO
+from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
 from subunit.test_results import TestResultFilter
 
 
-class TestTestResultFilter(unittest.TestCase):
+class TestTestResultFilter(TestCase):
     """Test for TestResultFilter, a TestResult object which filters tests."""
 
-    def _setUp(self):
-        self.output = StringIO()
+    # While TestResultFilter works on python objects, using a subunit stream
+    # is an easy pithy way of getting a series of test objects to call into
+    # the TestResult, and as TestResultFilter is intended for use with subunit
+    # also has the benefit of detecting any interface skew issues.
+    example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+    def run_tests(self, result_filter, input_stream=None):
+        """Run tests through the given filter.
+
+        :param result_filter: A filtering TestResult object.
+        :param input_stream: Bytes of subunit stream data. If not provided,
+            uses TestTestResultFilter.example_subunit_stream.
+        """
+        if input_stream is None:
+            input_stream = self.example_subunit_stream
+        test = subunit.ProtocolTestCase(BytesIO(input_stream))
+        test.run(result_filter)
 
     def test_default(self):
         """The default is to exclude success and include everything else."""
-        self.filtered_result = unittest.TestResult()
-        self.filter = TestResultFilter(self.filtered_result)
-        self.run_tests()
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result)
+        self.run_tests(result_filter)
         # skips are seen as success by default python TestResult.
         self.assertEqual(['error'],
-            [error[0].id() for error in self.filtered_result.errors])
+            [error[0].id() for error in filtered_result.errors])
         self.assertEqual(['failed'],
             [failure[0].id() for failure in
-            self.filtered_result.failures])
-        self.assertEqual(4, self.filtered_result.testsRun)
+            filtered_result.failures])
+        self.assertEqual(4, filtered_result.testsRun)
 
     def test_exclude_errors(self):
-        self.filtered_result = unittest.TestResult()
-        self.filter = TestResultFilter(self.filtered_result,
-            filter_error=True)
-        self.run_tests()
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result, filter_error=True)
+        self.run_tests(result_filter)
         # skips are seen as errors by default python TestResult.
-        self.assertEqual([], self.filtered_result.errors)
+        self.assertEqual([], filtered_result.errors)
         self.assertEqual(['failed'],
             [failure[0].id() for failure in
-            self.filtered_result.failures])
-        self.assertEqual(3, self.filtered_result.testsRun)
+            filtered_result.failures])
+        self.assertEqual(3, filtered_result.testsRun)
+
+    def test_fixup_expected_failures(self):
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result,
+            fixup_expected_failures=set(["failed"]))
+        self.run_tests(result_filter)
+        self.assertEqual(['failed', 'todo'],
+            [failure[0].id() for failure in filtered_result.expectedFailures])
+        self.assertEqual([], filtered_result.failures)
+        self.assertEqual(4, filtered_result.testsRun)
+
+    def test_fixup_expected_errors(self):
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result,
+            fixup_expected_failures=set(["error"]))
+        self.run_tests(result_filter)
+        self.assertEqual(['error', 'todo'],
+            [failure[0].id() for failure in filtered_result.expectedFailures])
+        self.assertEqual([], filtered_result.errors)
+        self.assertEqual(4, filtered_result.testsRun)
+
+    def test_fixup_unexpected_success(self):
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result, filter_success=False,
+            fixup_expected_failures=set(["passed"]))
+        self.run_tests(result_filter)
+        self.assertEqual(['passed'],
+            [passed.id() for passed in filtered_result.unexpectedSuccesses])
+        self.assertEqual(5, filtered_result.testsRun)
 
     def test_exclude_failure(self):
-        self.filtered_result = unittest.TestResult()
-        self.filter = TestResultFilter(self.filtered_result,
-            filter_failure=True)
-        self.run_tests()
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result, filter_failure=True)
+        self.run_tests(result_filter)
         self.assertEqual(['error'],
-            [error[0].id() for error in self.filtered_result.errors])
+            [error[0].id() for error in filtered_result.errors])
         self.assertEqual([],
             [failure[0].id() for failure in
-            self.filtered_result.failures])
-        self.assertEqual(3, self.filtered_result.testsRun)
+            filtered_result.failures])
+        self.assertEqual(3, filtered_result.testsRun)
 
     def test_exclude_skips(self):
-        self.filtered_result = subunit.TestResultStats(None)
-        self.filter = TestResultFilter(self.filtered_result,
-            filter_skip=True)
-        self.run_tests()
-        self.assertEqual(0, self.filtered_result.skipped_tests)
-        self.assertEqual(2, self.filtered_result.failed_tests)
-        self.assertEqual(3, self.filtered_result.testsRun)
+        filtered_result = subunit.TestResultStats(None)
+        result_filter = TestResultFilter(filtered_result, filter_skip=True)
+        self.run_tests(result_filter)
+        self.assertEqual(0, filtered_result.skipped_tests)
+        self.assertEqual(2, filtered_result.failed_tests)
+        self.assertEqual(3, filtered_result.testsRun)
 
     def test_include_success(self):
-        """Success's can be included if requested."""
-        self.filtered_result = unittest.TestResult()
-        self.filter = TestResultFilter(self.filtered_result,
+        """Successes can be included if requested."""
+        filtered_result = unittest.TestResult()
+        result_filter = TestResultFilter(filtered_result,
             filter_success=False)
-        self.run_tests()
+        self.run_tests(result_filter)
         self.assertEqual(['error'],
-            [error[0].id() for error in self.filtered_result.errors])
+            [error[0].id() for error in filtered_result.errors])
         self.assertEqual(['failed'],
             [failure[0].id() for failure in
-            self.filtered_result.failures])
-        self.assertEqual(5, self.filtered_result.testsRun)
+            filtered_result.failures])
+        self.assertEqual(5, filtered_result.testsRun)
 
     def test_filter_predicate(self):
         """You can filter by predicate callbacks"""
-        self.filtered_result = unittest.TestResult()
+        filtered_result = unittest.TestResult()
         def filter_cb(test, outcome, err, details):
             return outcome == 'success'
-        self.filter = TestResultFilter(self.filtered_result,
+        result_filter = TestResultFilter(filtered_result,
             filter_predicate=filter_cb,
             filter_success=False)
-        self.run_tests()
+        self.run_tests(result_filter)
         # Only success should pass
-        self.assertEqual(1, self.filtered_result.testsRun)
-
-    def run_tests(self):
-        self.setUpTestStream()
-        self.test = subunit.ProtocolTestCase(self.input_stream)
-        self.test.run(self.filter)
-
-    def setUpTestStream(self):
-        # While TestResultFilter works on python objects, using a subunit
-        # stream is an easy pithy way of getting a series of test objects to
-        # call into the TestResult, and as TestResultFilter is intended for
-        # use with subunit also has the benefit of detecting any interface
-        # skew issues.
-        self.input_stream = StringIO()
-        self.input_stream.write("""tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
-        self.input_stream.seek(0)
-    
+        self.assertEqual(1, filtered_result.testsRun)
+
+    def test_time_ordering_preserved(self):
+        # Passing a subunit stream through TestResultFilter preserves the
+        # relative ordering of 'time' directives and any other subunit
+        # directives that are still included.
+        date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
+        date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
+        date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
+        subunit_stream = _b('\n'.join([
+            "time: %s",
+            "test: foo",
+            "time: %s",
+            "error: foo",
+            "time: %s",
+            ""]) % (date_a, date_b, date_c))
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(result)
+        self.run_tests(result_filter, subunit_stream)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEquals(
+            [('time', date_a),
+             ('startTest', foo),
+             ('time', date_b),
+             ('addError', foo, {}),
+             ('stopTest', foo),
+             ('time', date_c)], result._events)
+
+    def test_skip_preserved(self):
+        subunit_stream = _b('\n'.join([
+            "test: foo",
+            "skip: foo",
+            ""]))
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(result)
+        self.run_tests(result_filter, subunit_stream)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEquals(
+            [('startTest', foo),
+             ('addSkip', foo, {}),
+             ('stopTest', foo), ], result._events)
+
 
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
index a7f8fca675fbf5c3734e5100a1413943a0bd52af..6fd33010601461c06a5e49dbe899114e5375b121 100644 (file)
@@ -17,7 +17,8 @@
 """Tests for subunit.TestResultStats."""
 
 import unittest
-from StringIO import StringIO
+
+from testtools.compat import _b, BytesIO, StringIO
 
 import subunit
 
@@ -28,7 +29,7 @@ class TestTestResultStats(unittest.TestCase):
     def setUp(self):
         self.output = StringIO()
         self.result = subunit.TestResultStats(self.output)
-        self.input_stream = StringIO()
+        self.input_stream = BytesIO()
         self.test = subunit.ProtocolTestCase(self.input_stream)
 
     def test_stats_empty(self):
@@ -39,7 +40,7 @@ class TestTestResultStats(unittest.TestCase):
         self.assertEqual(set(), self.result.seen_tags)
 
     def setUpUsedStream(self):
-        self.input_stream.write("""tags: global
+        self.input_stream.write(_b("""tags: global
 test passed
 success passed
 test failed
@@ -51,7 +52,7 @@ test skipped
 skip skipped
 test todo
 xfail todo
-""")
+"""))
         self.input_stream.seek(0)
         self.test.run(self.result)
     
index 227e2b7475159bf137ccef265bb447bc1b229aff..c98506a737ce4c95a94c03d4f77fc8ecc461a2e5 100644 (file)
@@ -17,7 +17,8 @@
 """Tests for subunit.tag_stream."""
 
 import unittest
-from StringIO import StringIO
+
+from testtools.compat import StringIO
 
 import subunit
 import subunit.test_results
index c4ca4cdb3af3f4e4c1320dc3403e0a3adebe6329..11bc1916b34a147608eecd549dc1b3a1d5c2029f 100644 (file)
@@ -17,7 +17,9 @@
 """Tests for TAP2SubUnit."""
 
 import unittest
-from StringIO import StringIO
+
+from testtools.compat import StringIO
+
 import subunit
 
 
index e1287b6c8111549f180568ac97004671993ef4da..03d921abf11ab3871778a68b081a5a107d1da7b8 100644 (file)
@@ -6,7 +6,7 @@
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 
 import datetime
 import unittest
-from StringIO import StringIO
 import os
-import sys
 
+from testtools import skipIf, TestCase
+from testtools.compat import _b, _u, BytesIO, StringIO
 from testtools.content import Content, TracebackContent
 from testtools.content_type import ContentType
 from testtools.tests.helpers import (
@@ -29,7 +29,7 @@ from testtools.tests.helpers import (
     )
 
 import subunit
-from subunit import _remote_exception_str
+from subunit import _remote_exception_str, _remote_exception_str_chunked
 import subunit.iso8601 as iso8601
 
 
@@ -56,41 +56,39 @@ class TestProtocolServerForward(unittest.TestCase):
 
     def test_story(self):
         client = unittest.TestResult()
-        out = StringIO()
+        out = BytesIO()
         protocol = subunit.TestProtocolServer(client, forward_stream=out)
-        pipe = StringIO("test old mcdonald\n"
-                        "success old mcdonald\n")
+        pipe = BytesIO(_b("test old mcdonald\n"
+                        "success old mcdonald\n"))
         protocol.readFrom(pipe)
-        mcdonald = subunit.RemotedTestCase("old mcdonald")
         self.assertEqual(client.testsRun, 1)
         self.assertEqual(pipe.getvalue(), out.getvalue())
 
     def test_not_command(self):
         client = unittest.TestResult()
-        out = StringIO()
+        out = BytesIO()
         protocol = subunit.TestProtocolServer(client,
             stream=subunit.DiscardStream(), forward_stream=out)
-        pipe = StringIO("success old mcdonald\n")
+        pipe = BytesIO(_b("success old mcdonald\n"))
         protocol.readFrom(pipe)
         self.assertEqual(client.testsRun, 0)
-        self.assertEqual("", out.getvalue())
-        
+        self.assertEqual(_b(""), out.getvalue())
+
 
 class TestTestProtocolServerPipe(unittest.TestCase):
 
     def test_story(self):
         client = unittest.TestResult()
         protocol = subunit.TestProtocolServer(client)
-        pipe = StringIO("test old mcdonald\n"
+        pipe = BytesIO(_b("test old mcdonald\n"
                         "success old mcdonald\n"
                         "test bing crosby\n"
                         "failure bing crosby [\n"
                         "foo.c:53:ERROR invalid state\n"
                         "]\n"
                         "test an error\n"
-                        "error an error\n")
+                        "error an error\n"))
         protocol.readFrom(pipe)
-        mcdonald = subunit.RemotedTestCase("old mcdonald")
         bing = subunit.RemotedTestCase("bing crosby")
         an_error = subunit.RemotedTestCase("an error")
         self.assertEqual(client.errors,
@@ -110,29 +108,32 @@ class TestTestProtocolServerStartTest(unittest.TestCase):
 
     def setUp(self):
         self.client = Python26TestResult()
-        self.protocol = subunit.TestProtocolServer(self.client)
+        self.stream = BytesIO()
+        self.protocol = subunit.TestProtocolServer(self.client, self.stream)
 
     def test_start_test(self):
-        self.protocol.lineReceived("test old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
         self.assertEqual(self.client._events,
             [('startTest', subunit.RemotedTestCase("old mcdonald"))])
 
     def test_start_testing(self):
-        self.protocol.lineReceived("testing old mcdonald\n")
+        self.protocol.lineReceived(_b("testing old mcdonald\n"))
         self.assertEqual(self.client._events,
             [('startTest', subunit.RemotedTestCase("old mcdonald"))])
 
     def test_start_test_colon(self):
-        self.protocol.lineReceived("test: old mcdonald\n")
+        self.protocol.lineReceived(_b("test: old mcdonald\n"))
         self.assertEqual(self.client._events,
             [('startTest', subunit.RemotedTestCase("old mcdonald"))])
 
     def test_indented_test_colon_ignored(self):
-        self.protocol.lineReceived(" test: old mcdonald\n")
+        ignored_line = _b(" test: old mcdonald\n")
+        self.protocol.lineReceived(ignored_line)
         self.assertEqual([], self.client._events)
+        self.assertEqual(self.stream.getvalue(), ignored_line)
 
     def test_start_testing_colon(self):
-        self.protocol.lineReceived("testing: old mcdonald\n")
+        self.protocol.lineReceived(_b("testing: old mcdonald\n"))
         self.assertEqual(self.client._events,
             [('startTest', subunit.RemotedTestCase("old mcdonald"))])
 
@@ -140,22 +141,22 @@ class TestTestProtocolServerStartTest(unittest.TestCase):
 class TestTestProtocolServerPassThrough(unittest.TestCase):
 
     def setUp(self):
-        self.stdout = StringIO()
+        self.stdout = BytesIO()
         self.test = subunit.RemotedTestCase("old mcdonald")
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client, self.stdout)
 
     def keywords_before_test(self):
-        self.protocol.lineReceived("failure a\n")
-        self.protocol.lineReceived("failure: a\n")
-        self.protocol.lineReceived("error a\n")
-        self.protocol.lineReceived("error: a\n")
-        self.protocol.lineReceived("success a\n")
-        self.protocol.lineReceived("success: a\n")
-        self.protocol.lineReceived("successful a\n")
-        self.protocol.lineReceived("successful: a\n")
-        self.protocol.lineReceived("]\n")
-        self.assertEqual(self.stdout.getvalue(), "failure a\n"
+        self.protocol.lineReceived(_b("failure a\n"))
+        self.protocol.lineReceived(_b("failure: a\n"))
+        self.protocol.lineReceived(_b("error a\n"))
+        self.protocol.lineReceived(_b("error: a\n"))
+        self.protocol.lineReceived(_b("success a\n"))
+        self.protocol.lineReceived(_b("success: a\n"))
+        self.protocol.lineReceived(_b("successful a\n"))
+        self.protocol.lineReceived(_b("successful: a\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.assertEqual(self.stdout.getvalue(), _b("failure a\n"
                                                  "failure: a\n"
                                                  "error a\n"
                                                  "error: a\n"
@@ -163,15 +164,15 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
                                                  "success: a\n"
                                                  "successful a\n"
                                                  "successful: a\n"
-                                                 "]\n")
+                                                 "]\n"))
 
     def test_keywords_before_test(self):
         self.keywords_before_test()
         self.assertEqual(self.client._events, [])
 
     def test_keywords_after_error(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("error old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("error old mcdonald\n"))
         self.keywords_before_test()
         self.assertEqual([
             ('startTest', self.test),
@@ -180,8 +181,8 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
             ], self.client._events)
 
     def test_keywords_after_failure(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("failure old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("failure old mcdonald\n"))
         self.keywords_before_test()
         self.assertEqual(self.client._events, [
             ('startTest', self.test),
@@ -190,8 +191,8 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
             ])
 
     def test_keywords_after_success(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("success old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("success old mcdonald\n"))
         self.keywords_before_test()
         self.assertEqual([
             ('startTest', self.test),
@@ -200,19 +201,19 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
             ], self.client._events)
 
     def test_keywords_after_test(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("failure a\n")
-        self.protocol.lineReceived("failure: a\n")
-        self.protocol.lineReceived("error a\n")
-        self.protocol.lineReceived("error: a\n")
-        self.protocol.lineReceived("success a\n")
-        self.protocol.lineReceived("success: a\n")
-        self.protocol.lineReceived("successful a\n")
-        self.protocol.lineReceived("successful: a\n")
-        self.protocol.lineReceived("]\n")
-        self.protocol.lineReceived("failure old mcdonald\n")
-        self.assertEqual(self.stdout.getvalue(), "test old mcdonald\n"
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("failure a\n"))
+        self.protocol.lineReceived(_b("failure: a\n"))
+        self.protocol.lineReceived(_b("error a\n"))
+        self.protocol.lineReceived(_b("error: a\n"))
+        self.protocol.lineReceived(_b("success a\n"))
+        self.protocol.lineReceived(_b("success: a\n"))
+        self.protocol.lineReceived(_b("successful a\n"))
+        self.protocol.lineReceived(_b("successful: a\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.protocol.lineReceived(_b("failure old mcdonald\n"))
+        self.assertEqual(self.stdout.getvalue(), _b("test old mcdonald\n"
                                                  "failure a\n"
                                                  "failure: a\n"
                                                  "error a\n"
@@ -221,7 +222,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
                                                  "success: a\n"
                                                  "successful a\n"
                                                  "successful: a\n"
-                                                 "]\n")
+                                                 "]\n"))
         self.assertEqual(self.client._events, [
             ('startTest', self.test),
             ('addFailure', self.test, {}),
@@ -231,24 +232,24 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
     def test_keywords_during_failure(self):
         # A smoke test to make sure that the details parsers have control
         # appropriately.
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("failure: old mcdonald [\n")
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("failure a\n")
-        self.protocol.lineReceived("failure: a\n")
-        self.protocol.lineReceived("error a\n")
-        self.protocol.lineReceived("error: a\n")
-        self.protocol.lineReceived("success a\n")
-        self.protocol.lineReceived("success: a\n")
-        self.protocol.lineReceived("successful a\n")
-        self.protocol.lineReceived("successful: a\n")
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
-        self.assertEqual(self.stdout.getvalue(), "")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("failure: old mcdonald [\n"))
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("failure a\n"))
+        self.protocol.lineReceived(_b("failure: a\n"))
+        self.protocol.lineReceived(_b("error a\n"))
+        self.protocol.lineReceived(_b("error: a\n"))
+        self.protocol.lineReceived(_b("success a\n"))
+        self.protocol.lineReceived(_b("success: a\n"))
+        self.protocol.lineReceived(_b("successful a\n"))
+        self.protocol.lineReceived(_b("successful: a\n"))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.assertEqual(self.stdout.getvalue(), _b(""))
         details = {}
         details['traceback'] = Content(ContentType("text", "x-traceback",
             {'charset': 'utf8'}),
-            lambda:[
+            lambda:[_b(
             "test old mcdonald\n"
             "failure a\n"
             "failure: a\n"
@@ -258,7 +259,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
             "success: a\n"
             "successful a\n"
             "successful: a\n"
-            "]\n"])
+            "]\n")])
         self.assertEqual(self.client._events, [
             ('startTest', self.test),
             ('addFailure', self.test, details),
@@ -269,7 +270,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
         """Lines received which cannot be interpreted as any protocol action
         should be passed through to sys.stdout.
         """
-        bytes = "randombytes\n"
+        bytes = _b("randombytes\n")
         self.protocol.lineReceived(bytes)
         self.assertEqual(self.stdout.getvalue(), bytes)
 
@@ -286,10 +287,10 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
         self.assertEqual([], self.client._events)
 
     def test_lost_connection_after_start(self):
-        self.protocol.lineReceived("test old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
         self.protocol.lostConnection()
         failure = subunit.RemoteError(
-            u"lost connection during test 'old mcdonald'")
+            _u("lost connection during test 'old mcdonald'"))
         self.assertEqual([
             ('startTest', self.test),
             ('addError', self.test, failure),
@@ -297,21 +298,21 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
             ], self.client._events)
 
     def test_lost_connected_after_error(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("error old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("error old mcdonald\n"))
         self.protocol.lostConnection()
         self.assertEqual([
             ('startTest', self.test),
-            ('addError', self.test, subunit.RemoteError(u"")),
+            ('addError', self.test, subunit.RemoteError(_u(""))),
             ('stopTest', self.test),
             ], self.client._events)
 
     def do_connection_lost(self, outcome, opening):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening))
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("%s old mcdonald %s" % (outcome, opening)))
         self.protocol.lostConnection()
         failure = subunit.RemoteError(
-            u"lost connection during %s report of test 'old mcdonald'" % 
+            _u("lost connection during %s report of test 'old mcdonald'") %
             outcome)
         self.assertEqual([
             ('startTest', self.test),
@@ -326,12 +327,12 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
         self.do_connection_lost("error", "[ multipart\n")
 
     def test_lost_connected_after_failure(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("failure old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("failure old mcdonald\n"))
         self.protocol.lostConnection()
         self.assertEqual([
             ('startTest', self.test),
-            ('addFailure', self.test, subunit.RemoteError(u"")),
+            ('addFailure', self.test, subunit.RemoteError(_u(""))),
             ('stopTest', self.test),
             ], self.client._events)
 
@@ -342,8 +343,8 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
         self.do_connection_lost("failure", "[ multipart\n")
 
     def test_lost_connection_after_success(self):
-        self.protocol.lineReceived("test old mcdonald\n")
-        self.protocol.lineReceived("success old mcdonald\n")
+        self.protocol.lineReceived(_b("test old mcdonald\n"))
+        self.protocol.lineReceived(_b("success old mcdonald\n"))
         self.protocol.lostConnection()
         self.assertEqual([
             ('startTest', self.test),
@@ -369,18 +370,24 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
     def test_lost_connection_during_xfail_details(self):
         self.do_connection_lost("xfail", "[ multipart\n")
 
+    def test_lost_connection_during_uxsuccess(self):
+        self.do_connection_lost("uxsuccess", "[\n")
+
+    def test_lost_connection_during_uxsuccess_details(self):
+        self.do_connection_lost("uxsuccess", "[ multipart\n")
+
 
 class TestInTestMultipart(unittest.TestCase):
 
     def setUp(self):
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
-        self.test = subunit.RemotedTestCase("mcdonalds farm")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
+        self.test = subunit.RemotedTestCase(_u("mcdonalds farm"))
 
     def test__outcome_sets_details_parser(self):
         self.protocol._reading_success_details.details_parser = None
-        self.protocol._state._outcome(0, "mcdonalds farm [ multipart\n",
+        self.protocol._state._outcome(0, _b("mcdonalds farm [ multipart\n"),
             None, self.protocol._reading_success_details)
         parser = self.protocol._reading_success_details.details_parser
         self.assertNotEqual(None, parser)
@@ -393,11 +400,11 @@ class TestTestProtocolServerAddError(unittest.TestCase):
     def setUp(self):
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         self.test = subunit.RemotedTestCase("mcdonalds farm")
 
     def simple_error_keyword(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
         details = {}
         self.assertEqual([
             ('startTest', self.test),
@@ -412,11 +419,11 @@ class TestTestProtocolServerAddError(unittest.TestCase):
         self.simple_error_keyword("error:")
 
     def test_error_empty_message(self):
-        self.protocol.lineReceived("error mcdonalds farm [\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("error mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['traceback'] = Content(ContentType("text", "x-traceback",
-            {'charset': 'utf8'}), lambda:[""])
+            {'charset': 'utf8'}), lambda:[_b("")])
         self.assertEqual([
             ('startTest', self.test),
             ('addError', self.test, details),
@@ -424,12 +431,12 @@ class TestTestProtocolServerAddError(unittest.TestCase):
             ], self.client._events)
 
     def error_quoted_bracket(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['traceback'] = Content(ContentType("text", "x-traceback",
-            {'charset': 'utf8'}), lambda:["]\n"])
+            {'charset': 'utf8'}), lambda:[_b("]\n")])
         self.assertEqual([
             ('startTest', self.test),
             ('addError', self.test, details),
@@ -448,7 +455,7 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
     def setUp(self):
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         self.test = subunit.RemotedTestCase("mcdonalds farm")
 
     def assertFailure(self, details):
@@ -459,7 +466,7 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
             ], self.client._events)
 
     def simple_failure_keyword(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
         details = {}
         self.assertFailure(details)
 
@@ -470,20 +477,20 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
         self.simple_failure_keyword("failure:")
 
     def test_failure_empty_message(self):
-        self.protocol.lineReceived("failure mcdonalds farm [\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("failure mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['traceback'] = Content(ContentType("text", "x-traceback",
-            {'charset': 'utf8'}), lambda:[""])
+            {'charset': 'utf8'}), lambda:[_b("")])
         self.assertFailure(details)
 
     def failure_quoted_bracket(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['traceback'] = Content(ContentType("text", "x-traceback",
-            {'charset': 'utf8'}), lambda:["]\n"])
+            {'charset': 'utf8'}), lambda:[_b("]\n")])
         self.assertFailure(details)
 
     def test_failure_quoted_bracket(self):
@@ -521,11 +528,11 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
     def setup_protocol(self):
         """Setup the protocol based on self.client."""
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         self.test = self.client._events[-1][-1]
 
     def simple_xfail_keyword(self, keyword, as_success):
-        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
         self.check_success_or_xfail(as_success)
 
     def check_success_or_xfail(self, as_success, error_message=None):
@@ -540,13 +547,14 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
             if error_message is not None:
                 details['traceback'] = Content(
                     ContentType("text", "x-traceback", {'charset': 'utf8'}),
-                    lambda:[error_message])
+                    lambda:[_b(error_message)])
             if isinstance(self.client, ExtendedTestResult):
                 value = details
             else:
                 if error_message is not None:
-                    value = subunit.RemoteError(u'Text attachment: traceback\n'
-                        '------------\n' + error_message + '------------\n')
+                    value = subunit.RemoteError(_u("Text attachment: traceback\n"
+                        "------------\n") + _u(error_message) +
+                        _u("------------\n"))
                 else:
                     value = subunit.RemoteError()
             self.assertEqual([
@@ -580,16 +588,16 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
         self.empty_message(False, error_message="")
 
     def empty_message(self, as_success, error_message="\n"):
-        self.protocol.lineReceived("xfail mcdonalds farm [\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("xfail mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
         self.check_success_or_xfail(as_success, error_message)
 
     def xfail_quoted_bracket(self, keyword, as_success):
         # This tests it is accepted, but cannot test it is used today, because
         # of not having a way to expose it in Python so far.
-        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
         self.check_success_or_xfail(as_success, "]\n")
 
     def test_xfail_quoted_bracket(self):
@@ -609,6 +617,121 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
         self.xfail_quoted_bracket("xfail:", False)
 
 
+class TestTestProtocolServerAddunexpectedSuccess(TestCase):
+    """Tests for the uxsuccess keyword."""
+
+    def capture_expected_failure(self, test, err):
+        self._events.append((test, err))
+
+    def setup_python26(self):
+        """Setup a test object ready to be xfailed and thunk to success."""
+        self.client = Python26TestResult()
+        self.setup_protocol()
+
+    def setup_python27(self):
+        """Setup a test object ready to be xfailed."""
+        self.client = Python27TestResult()
+        self.setup_protocol()
+
+    def setup_python_ex(self):
+        """Setup a test object ready to be xfailed with details."""
+        self.client = ExtendedTestResult()
+        self.setup_protocol()
+
+    def setup_protocol(self):
+        """Setup the protocol based on self.client."""
+        self.protocol = subunit.TestProtocolServer(self.client)
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
+        self.test = self.client._events[-1][-1]
+
+    def simple_uxsuccess_keyword(self, keyword, as_fail):
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
+        self.check_fail_or_uxsuccess(as_fail)
+
+    def check_fail_or_uxsuccess(self, as_fail, error_message=None):
+        details = {}
+        if error_message is not None:
+            details['traceback'] = Content(
+                ContentType("text", "x-traceback", {'charset': 'utf8'}),
+                lambda:[_b(error_message)])
+        if isinstance(self.client, ExtendedTestResult):
+            value = details
+        else:
+            value = None
+        if as_fail:
+            self.client._events[1] = self.client._events[1][:2]
+            # The value is generated within the extended to original decorator:
+            # todo use the testtools matcher to check on this.
+            self.assertEqual([
+                ('startTest', self.test),
+                ('addFailure', self.test),
+                ('stopTest', self.test),
+                ], self.client._events)
+        elif value:
+            self.assertEqual([
+                ('startTest', self.test),
+                ('addUnexpectedSuccess', self.test, value),
+                ('stopTest', self.test),
+                ], self.client._events)
+        else:
+            self.assertEqual([
+                ('startTest', self.test),
+                ('addUnexpectedSuccess', self.test),
+                ('stopTest', self.test),
+                ], self.client._events)
+
+    def test_simple_uxsuccess(self):
+        self.setup_python26()
+        self.simple_uxsuccess_keyword("uxsuccess", True)
+        self.setup_python27()
+        self.simple_uxsuccess_keyword("uxsuccess",  False)
+        self.setup_python_ex()
+        self.simple_uxsuccess_keyword("uxsuccess",  False)
+
+    def test_simple_uxsuccess_colon(self):
+        self.setup_python26()
+        self.simple_uxsuccess_keyword("uxsuccess:", True)
+        self.setup_python27()
+        self.simple_uxsuccess_keyword("uxsuccess:", False)
+        self.setup_python_ex()
+        self.simple_uxsuccess_keyword("uxsuccess:", False)
+
+    def test_uxsuccess_empty_message(self):
+        self.setup_python26()
+        self.empty_message(True)
+        self.setup_python27()
+        self.empty_message(False)
+        self.setup_python_ex()
+        self.empty_message(False, error_message="")
+
+    def empty_message(self, as_fail, error_message="\n"):
+        self.protocol.lineReceived(_b("uxsuccess mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.check_fail_or_uxsuccess(as_fail, error_message)
+
+    def uxsuccess_quoted_bracket(self, keyword, as_fail):
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.check_fail_or_uxsuccess(as_fail, "]\n")
+
+    def test_uxsuccess_quoted_bracket(self):
+        self.setup_python26()
+        self.uxsuccess_quoted_bracket("uxsuccess", True)
+        self.setup_python27()
+        self.uxsuccess_quoted_bracket("uxsuccess", False)
+        self.setup_python_ex()
+        self.uxsuccess_quoted_bracket("uxsuccess", False)
+
+    def test_uxsuccess_colon_quoted_bracket(self):
+        self.setup_python26()
+        self.uxsuccess_quoted_bracket("uxsuccess:", True)
+        self.setup_python27()
+        self.uxsuccess_quoted_bracket("uxsuccess:", False)
+        self.setup_python_ex()
+        self.uxsuccess_quoted_bracket("uxsuccess:", False)
+
+
 class TestTestProtocolServerAddSkip(unittest.TestCase):
     """Tests for the skip keyword.
 
@@ -620,7 +743,7 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
         """Setup a test object ready to be skipped."""
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         self.test = self.client._events[-1][-1]
 
     def assertSkip(self, reason):
@@ -635,7 +758,7 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
             ], self.client._events)
 
     def simple_skip_keyword(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
         self.assertSkip(None)
 
     def test_simple_skip(self):
@@ -645,17 +768,17 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
         self.simple_skip_keyword("skip:")
 
     def test_skip_empty_message(self):
-        self.protocol.lineReceived("skip mcdonalds farm [\n")
-        self.protocol.lineReceived("]\n")
-        self.assertSkip("")
+        self.protocol.lineReceived(_b("skip mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.assertSkip(_b(""))
 
     def skip_quoted_bracket(self, keyword):
         # This tests it is accepted, but cannot test it is used today, because
         # of not having a way to expose it in Python so far.
-        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
-        self.assertSkip("]\n")
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
+        self.assertSkip(_b("]\n"))
 
     def test_skip_quoted_bracket(self):
         self.skip_quoted_bracket("skip")
@@ -669,23 +792,17 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
     def setUp(self):
         self.client = ExtendedTestResult()
         self.protocol = subunit.TestProtocolServer(self.client)
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         self.test = subunit.RemotedTestCase("mcdonalds farm")
 
     def simple_success_keyword(self, keyword):
-        self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+        self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
         self.assertEqual([
             ('startTest', self.test),
             ('addSuccess', self.test),
             ('stopTest', self.test),
             ], self.client._events)
 
-    def test_simple_success(self):
-        self.simple_success_keyword("failure")
-
-    def test_simple_success_colon(self):
-        self.simple_success_keyword("failure:")
-
     def test_simple_success(self):
         self.simple_success_keyword("successful")
 
@@ -700,22 +817,22 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
             ], self.client._events)
 
     def test_success_empty_message(self):
-        self.protocol.lineReceived("success mcdonalds farm [\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("success mcdonalds farm [\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['message'] = Content(ContentType("text", "plain"),
-            lambda:[""])
+            lambda:[_b("")])
         self.assertSuccess(details)
 
     def success_quoted_bracket(self, keyword):
         # This tests it is accepted, but cannot test it is used today, because
         # of not having a way to expose it in Python so far.
-        self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
-        self.protocol.lineReceived(" ]\n")
-        self.protocol.lineReceived("]\n")
+        self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+        self.protocol.lineReceived(_b(" ]\n"))
+        self.protocol.lineReceived(_b("]\n"))
         details = {}
         details['message'] = Content(ContentType("text", "plain"),
-            lambda:["]\n"])
+            lambda:[_b("]\n")])
         self.assertSuccess(details)
 
     def test_success_quoted_bracket(self):
@@ -730,26 +847,26 @@ class TestTestProtocolServerProgress(unittest.TestCase):
 
     def test_progress_accepted_stdlib(self):
         self.result = Python26TestResult()
-        self.stream = StringIO()
+        self.stream = BytesIO()
         self.protocol = subunit.TestProtocolServer(self.result,
             stream=self.stream)
-        self.protocol.lineReceived("progress: 23")
-        self.protocol.lineReceived("progress: -2")
-        self.protocol.lineReceived("progress: +4")
-        self.assertEqual("", self.stream.getvalue())
+        self.protocol.lineReceived(_b("progress: 23"))
+        self.protocol.lineReceived(_b("progress: -2"))
+        self.protocol.lineReceived(_b("progress: +4"))
+        self.assertEqual(_b(""), self.stream.getvalue())
 
     def test_progress_accepted_extended(self):
         # With a progress capable TestResult, progress events are emitted.
         self.result = ExtendedTestResult()
-        self.stream = StringIO()
+        self.stream = BytesIO()
         self.protocol = subunit.TestProtocolServer(self.result,
             stream=self.stream)
-        self.protocol.lineReceived("progress: 23")
-        self.protocol.lineReceived("progress: push")
-        self.protocol.lineReceived("progress: -2")
-        self.protocol.lineReceived("progress: pop")
-        self.protocol.lineReceived("progress: +4")
-        self.assertEqual("", self.stream.getvalue())
+        self.protocol.lineReceived(_b("progress: 23"))
+        self.protocol.lineReceived(_b("progress: push"))
+        self.protocol.lineReceived(_b("progress: -2"))
+        self.protocol.lineReceived(_b("progress: pop"))
+        self.protocol.lineReceived(_b("progress: +4"))
+        self.assertEqual(_b(""), self.stream.getvalue())
         self.assertEqual([
             ('progress', 23, subunit.PROGRESS_SET),
             ('progress', None, subunit.PROGRESS_PUSH),
@@ -767,33 +884,33 @@ class TestTestProtocolServerStreamTags(unittest.TestCase):
         self.protocol = subunit.TestProtocolServer(self.client)
 
     def test_initial_tags(self):
-        self.protocol.lineReceived("tags: foo bar:baz  quux\n")
+        self.protocol.lineReceived(_b("tags: foo bar:baz  quux\n"))
         self.assertEqual([
             ('tags', set(["foo", "bar:baz", "quux"]), set()),
             ], self.client._events)
 
     def test_minus_removes_tags(self):
-        self.protocol.lineReceived("tags: -bar quux\n")
+        self.protocol.lineReceived(_b("tags: -bar quux\n"))
         self.assertEqual([
             ('tags', set(["quux"]), set(["bar"])),
             ], self.client._events)
 
     def test_tags_do_not_get_set_on_test(self):
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         test = self.client._events[0][-1]
         self.assertEqual(None, getattr(test, 'tags', None))
 
     def test_tags_do_not_get_set_on_global_tags(self):
-        self.protocol.lineReceived("tags: foo bar\n")
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("tags: foo bar\n"))
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         test = self.client._events[-1][-1]
         self.assertEqual(None, getattr(test, 'tags', None))
 
     def test_tags_get_set_on_test_tags(self):
-        self.protocol.lineReceived("test mcdonalds farm\n")
+        self.protocol.lineReceived(_b("test mcdonalds farm\n"))
         test = self.client._events[-1][-1]
-        self.protocol.lineReceived("tags: foo bar\n")
-        self.protocol.lineReceived("success mcdonalds farm\n")
+        self.protocol.lineReceived(_b("tags: foo bar\n"))
+        self.protocol.lineReceived(_b("success mcdonalds farm\n"))
         self.assertEqual(None, getattr(test, 'tags', None))
 
 
@@ -802,19 +919,19 @@ class TestTestProtocolServerStreamTime(unittest.TestCase):
 
     def test_time_accepted_stdlib(self):
         self.result = Python26TestResult()
-        self.stream = StringIO()
+        self.stream = BytesIO()
         self.protocol = subunit.TestProtocolServer(self.result,
             stream=self.stream)
-        self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n")
-        self.assertEqual("", self.stream.getvalue())
+        self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
+        self.assertEqual(_b(""), self.stream.getvalue())
 
     def test_time_accepted_extended(self):
         self.result = ExtendedTestResult()
-        self.stream = StringIO()
+        self.stream = BytesIO()
         self.protocol = subunit.TestProtocolServer(self.result,
             stream=self.stream)
-        self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n")
-        self.assertEqual("", self.stream.getvalue())
+        self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
+        self.assertEqual(_b(""), self.stream.getvalue())
         self.assertEqual([
             ('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0,
             iso8601.Utc()))
@@ -850,15 +967,15 @@ class TestRemotedTestCase(unittest.TestCase):
 class TestRemoteError(unittest.TestCase):
 
     def test_eq(self):
-        error = subunit.RemoteError(u"Something went wrong")
-        another_error = subunit.RemoteError(u"Something went wrong")
-        different_error = subunit.RemoteError(u"boo!")
+        error = subunit.RemoteError(_u("Something went wrong"))
+        another_error = subunit.RemoteError(_u("Something went wrong"))
+        different_error = subunit.RemoteError(_u("boo!"))
         self.assertEqual(error, another_error)
         self.assertNotEqual(error, different_error)
         self.assertNotEqual(different_error, another_error)
 
     def test_empty_constructor(self):
-        self.assertEqual(subunit.RemoteError(), subunit.RemoteError(u""))
+        self.assertEqual(subunit.RemoteError(), subunit.RemoteError(_u("")))
 
 
 class TestExecTestCase(unittest.TestCase):
@@ -893,7 +1010,7 @@ class TestExecTestCase(unittest.TestCase):
         bing = subunit.RemotedTestCase("bing crosby")
         bing_details = {}
         bing_details['traceback'] = Content(ContentType("text", "x-traceback",
-            {'charset': 'utf8'}), lambda:["foo.c:53:ERROR invalid state\n"])
+            {'charset': 'utf8'}), lambda:[_b("foo.c:53:ERROR invalid state\n")])
         an_error = subunit.RemotedTestCase("an error")
         error_details = {}
         self.assertEqual([
@@ -917,7 +1034,8 @@ class TestExecTestCase(unittest.TestCase):
 
     def test_join_dir(self):
         sibling = subunit.join_dir(__file__, 'foo')
-        expected = '%s/foo' % (os.path.split(__file__)[0],)
+        filedir = os.path.abspath(os.path.dirname(__file__))
+        expected = os.path.join(filedir, 'foo')
         self.assertEqual(sibling, expected)
 
 
@@ -927,7 +1045,7 @@ class DoExecTestCase(subunit.ExecTestCase):
         """sample-two-script.py"""
 
 
-class TestIsolatedTestCase(unittest.TestCase):
+class TestIsolatedTestCase(TestCase):
 
     class SampleIsolatedTestCase(subunit.IsolatedTestCase):
 
@@ -946,8 +1064,9 @@ class TestIsolatedTestCase(unittest.TestCase):
 
 
     def test_construct(self):
-        test = self.SampleIsolatedTestCase("test_sets_global_state")
+        self.SampleIsolatedTestCase("test_sets_global_state")
 
+    @skipIf(os.name != "posix", "Need a posix system for forking tests")
     def test_run(self):
         result = unittest.TestResult()
         test = self.SampleIsolatedTestCase("test_sets_global_state")
@@ -963,7 +1082,7 @@ class TestIsolatedTestCase(unittest.TestCase):
         #test.debug()
 
 
-class TestIsolatedTestSuite(unittest.TestCase):
+class TestIsolatedTestSuite(TestCase):
 
     class SampleTestToIsolate(unittest.TestCase):
 
@@ -982,8 +1101,9 @@ class TestIsolatedTestSuite(unittest.TestCase):
 
 
     def test_construct(self):
-        suite = subunit.IsolatedTestSuite()
+        subunit.IsolatedTestSuite()
 
+    @skipIf(os.name != "posix", "Need a posix system for forking tests")
     def test_run(self):
         result = unittest.TestResult()
         suite = subunit.IsolatedTestSuite()
@@ -1002,48 +1122,48 @@ class TestIsolatedTestSuite(unittest.TestCase):
 class TestTestProtocolClient(unittest.TestCase):
 
     def setUp(self):
-        self.io = StringIO()
+        self.io = BytesIO()
         self.protocol = subunit.TestProtocolClient(self.io)
         self.test = TestTestProtocolClient("test_start_test")
         self.sample_details = {'something':Content(
-            ContentType('text', 'plain'), lambda:['serialised\nform'])}
+            ContentType('text', 'plain'), lambda:[_b('serialised\nform')])}
         self.sample_tb_details = dict(self.sample_details)
         self.sample_tb_details['traceback'] = TracebackContent(
-            subunit.RemoteError(u"boo qux"), self.test)
+            subunit.RemoteError(_u("boo qux")), self.test)
 
     def test_start_test(self):
         """Test startTest on a TestProtocolClient."""
         self.protocol.startTest(self.test)
-        self.assertEqual(self.io.getvalue(), "test: %s\n" % self.test.id())
+        self.assertEqual(self.io.getvalue(), _b("test: %s\n" % self.test.id()))
 
     def test_stop_test(self):
         # stopTest doesn't output anything.
         self.protocol.stopTest(self.test)
-        self.assertEqual(self.io.getvalue(), "")
+        self.assertEqual(self.io.getvalue(), _b(""))
 
     def test_add_success(self):
         """Test addSuccess on a TestProtocolClient."""
         self.protocol.addSuccess(self.test)
         self.assertEqual(
-            self.io.getvalue(), "successful: %s\n" % self.test.id())
+            self.io.getvalue(), _b("successful: %s\n" % self.test.id()))
 
     def test_add_success_details(self):
         """Test addSuccess on a TestProtocolClient with details."""
         self.protocol.addSuccess(self.test, details=self.sample_details)
         self.assertEqual(
-            self.io.getvalue(), "successful: %s [ multipart\n"
+            self.io.getvalue(), _b("successful: %s [ multipart\n"
                 "Content-Type: text/plain\n"
                 "something\n"
-                "F\r\nserialised\nform0\r\n]\n" % self.test.id())
+                "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
 
     def test_add_failure(self):
         """Test addFailure on a TestProtocolClient."""
         self.protocol.addFailure(
-            self.test, subunit.RemoteError(u"boo qux"))
+            self.test, subunit.RemoteError(_u("boo qux")))
         self.assertEqual(
             self.io.getvalue(),
-            ('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n')
-            % self.test.id())
+            _b(('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n')
+            % self.test.id()))
 
     def test_add_failure_details(self):
         """Test addFailure on a TestProtocolClient with details."""
@@ -1051,24 +1171,23 @@ class TestTestProtocolClient(unittest.TestCase):
             self.test, details=self.sample_tb_details)
         self.assertEqual(
             self.io.getvalue(),
-            ("failure: %s [ multipart\n"
+            _b(("failure: %s [ multipart\n"
             "Content-Type: text/plain\n"
             "something\n"
             "F\r\nserialised\nform0\r\n"
             "Content-Type: text/x-traceback;charset=utf8,language=python\n"
-            "traceback\n"
-            "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
-            "]\n") % self.test.id())
+            "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+            "]\n") % self.test.id()))
 
     def test_add_error(self):
         """Test stopTest on a TestProtocolClient."""
         self.protocol.addError(
-            self.test, subunit.RemoteError(u"phwoar crikey"))
+            self.test, subunit.RemoteError(_u("phwoar crikey")))
         self.assertEqual(
             self.io.getvalue(),
-            ('error: %s [\n' +
+            _b(('error: %s [\n' +
             _remote_exception_str + ": phwoar crikey\n"
-            "]\n") % self.test.id())
+            "]\n") % self.test.id()))
 
     def test_add_error_details(self):
         """Test stopTest on a TestProtocolClient with details."""
@@ -1076,24 +1195,23 @@ class TestTestProtocolClient(unittest.TestCase):
             self.test, details=self.sample_tb_details)
         self.assertEqual(
             self.io.getvalue(),
-            ("error: %s [ multipart\n"
+            _b(("error: %s [ multipart\n"
             "Content-Type: text/plain\n"
             "something\n"
             "F\r\nserialised\nform0\r\n"
             "Content-Type: text/x-traceback;charset=utf8,language=python\n"
-            "traceback\n"
-            "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
-            "]\n") % self.test.id())
+            "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+            "]\n") % self.test.id()))
 
     def test_add_expected_failure(self):
         """Test addExpectedFailure on a TestProtocolClient."""
         self.protocol.addExpectedFailure(
-            self.test, subunit.RemoteError(u"phwoar crikey"))
+            self.test, subunit.RemoteError(_u("phwoar crikey")))
         self.assertEqual(
             self.io.getvalue(),
-            ('xfail: %s [\n' +
+            _b(('xfail: %s [\n' +
             _remote_exception_str + ": phwoar crikey\n"
-            "]\n") % self.test.id())
+            "]\n") % self.test.id()))
 
     def test_add_expected_failure_details(self):
         """Test addExpectedFailure on a TestProtocolClient with details."""
@@ -1101,14 +1219,14 @@ class TestTestProtocolClient(unittest.TestCase):
             self.test, details=self.sample_tb_details)
         self.assertEqual(
             self.io.getvalue(),
-            ("xfail: %s [ multipart\n"
+            _b(("xfail: %s [ multipart\n"
             "Content-Type: text/plain\n"
             "something\n"
             "F\r\nserialised\nform0\r\n"
             "Content-Type: text/x-traceback;charset=utf8,language=python\n"
-            "traceback\n"
-            "1A\r\n"+ _remote_exception_str + ": boo qux\n0\r\n"
-            "]\n") % self.test.id())
+            "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+            "]\n") % self.test.id()))
+
 
     def test_add_skip(self):
         """Test addSkip on a TestProtocolClient."""
@@ -1116,64 +1234,63 @@ class TestTestProtocolClient(unittest.TestCase):
             self.test, "Has it really?")
         self.assertEqual(
             self.io.getvalue(),
-            'skip: %s [\nHas it really?\n]\n' % self.test.id())
-    
+            _b('skip: %s [\nHas it really?\n]\n' % self.test.id()))
+
     def test_add_skip_details(self):
         """Test addSkip on a TestProtocolClient with details."""
         details = {'reason':Content(
-            ContentType('text', 'plain'), lambda:['Has it really?'])}
-        self.protocol.addSkip(
-            self.test, details=details)
+            ContentType('text', 'plain'), lambda:[_b('Has it really?')])}
+        self.protocol.addSkip(self.test, details=details)
         self.assertEqual(
             self.io.getvalue(),
-            "skip: %s [ multipart\n"
+            _b("skip: %s [ multipart\n"
             "Content-Type: text/plain\n"
             "reason\n"
             "E\r\nHas it really?0\r\n"
-            "]\n" % self.test.id())
+            "]\n" % self.test.id()))
 
     def test_progress_set(self):
         self.protocol.progress(23, subunit.PROGRESS_SET)
-        self.assertEqual(self.io.getvalue(), 'progress: 23\n')
+        self.assertEqual(self.io.getvalue(), _b('progress: 23\n'))
 
     def test_progress_neg_cur(self):
         self.protocol.progress(-23, subunit.PROGRESS_CUR)
-        self.assertEqual(self.io.getvalue(), 'progress: -23\n')
+        self.assertEqual(self.io.getvalue(), _b('progress: -23\n'))
 
     def test_progress_pos_cur(self):
         self.protocol.progress(23, subunit.PROGRESS_CUR)
-        self.assertEqual(self.io.getvalue(), 'progress: +23\n')
+        self.assertEqual(self.io.getvalue(), _b('progress: +23\n'))
 
     def test_progress_pop(self):
         self.protocol.progress(1234, subunit.PROGRESS_POP)
-        self.assertEqual(self.io.getvalue(), 'progress: pop\n')
+        self.assertEqual(self.io.getvalue(), _b('progress: pop\n'))
 
     def test_progress_push(self):
         self.protocol.progress(1234, subunit.PROGRESS_PUSH)
-        self.assertEqual(self.io.getvalue(), 'progress: push\n')
+        self.assertEqual(self.io.getvalue(), _b('progress: push\n'))
 
     def test_time(self):
         # Calling time() outputs a time signal immediately.
         self.protocol.time(
             datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()))
         self.assertEqual(
-            "time: 2009-10-11 12:13:14.000015Z\n",
+            _b("time: 2009-10-11 12:13:14.000015Z\n"),
             self.io.getvalue())
 
     def test_add_unexpected_success(self):
         """Test addUnexpectedSuccess on a TestProtocolClient."""
         self.protocol.addUnexpectedSuccess(self.test)
         self.assertEqual(
-            self.io.getvalue(), "successful: %s\n" % self.test.id())
+            self.io.getvalue(), _b("uxsuccess: %s\n" % self.test.id()))
 
     def test_add_unexpected_success_details(self):
         """Test addUnexpectedSuccess on a TestProtocolClient with details."""
         self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details)
         self.assertEqual(
-            self.io.getvalue(), "successful: %s [ multipart\n"
+            self.io.getvalue(), _b("uxsuccess: %s [ multipart\n"
                 "Content-Type: text/plain\n"
                 "something\n"
-                "F\r\nserialised\nform0\r\n]\n" % self.test.id())
+                "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
 
 
 def test_suite():
index fe82c04b065d81502de6977ad60fc25fd63b4895..94d22748e80889376bf639d6f361492d38138f03 100644 (file)
@@ -6,7 +6,7 @@
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 
 import datetime
 import unittest
-from StringIO import StringIO
-import os
-import sys
 
-from testtools.content_type import ContentType
-from testtools.content import Content
+from testtools import TestCase
+from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
 import subunit.iso8601 as iso8601
@@ -82,22 +79,22 @@ class TestHookedTestResultDecorator(unittest.TestCase):
 
     def test_startTest(self):
         self.result.startTest(self)
-        
+
     def test_startTestRun(self):
         self.result.startTestRun()
-        
+
     def test_stopTest(self):
         self.result.stopTest(self)
-        
+
     def test_stopTestRun(self):
         self.result.stopTestRun()
 
     def test_addError(self):
         self.result.addError(self, subunit.RemoteError())
-        
+
     def test_addError_details(self):
         self.result.addError(self, details={})
-        
+
     def test_addFailure(self):
         self.result.addFailure(self, subunit.RemoteError())
 
@@ -142,7 +139,7 @@ class TestHookedTestResultDecorator(unittest.TestCase):
 
     def test_time(self):
         self.result.time(None)
+
 
 class TestAutoTimingTestResultDecorator(unittest.TestCase):
 
@@ -193,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
         self.assertNotEqual(None, self.decorated._calls[2])
 
 
+class TestTagCollapsingDecorator(TestCase):
+
+    def test_tags_forwarded_outside_of_tests(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.tags(set(['a', 'b']), set())
+        self.assertEquals(
+            [('tags', set(['a', 'b']), set([]))], result._events)
+
+    def test_tags_collapsed_inside_of_tests(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set(['a']))
+        tag_collapser.tags(set(['c']), set())
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['b', 'c']), set(['a'])),
+             ('stopTest', test)],
+            result._events)
+
+    def test_tags_collapsed_inside_of_tests_different_ordering(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(), set(['a']))
+        tag_collapser.tags(set(['a', 'b']), set())
+        tag_collapser.tags(set(['c']), set())
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['a', 'b', 'c']), set()),
+             ('stopTest', test)],
+            result._events)
+
+
+class TestTimeCollapsingDecorator(TestCase):
+
+    def make_time(self):
+        # Heh heh.
+        return datetime.datetime(
+            2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
+
+    def test_initial_time_forwarded(self):
+        # We always forward the first time event we see.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        self.assertEquals([('time', a_time)], result._events)
+
+    def test_time_collapsed_to_first_and_last(self):
+        # If there are many consecutive time events, only the first and last
+        # are sent through.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        times = [self.make_time() for i in range(5)]
+        for a_time in times:
+            tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals(
+            [('time', times[0]), ('time', times[-1])], result._events[:-1])
+
+    def test_only_one_time_sent(self):
+        # If we receive a single time event followed by a non-time event, we
+        # send exactly one time event.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals([('time', a_time)], result._events[:-1])
+
+    def test_duplicate_times_not_sent(self):
+        # Many time events with the exact same time are collapsed into one
+        # time event.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        for i in range(5):
+            tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals([('time', a_time)], result._events[:-1])
+
+    def test_no_times_inserted(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        foo = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(foo)
+        tag_collapser.addSuccess(foo)
+        tag_collapser.stopTest(foo)
+        self.assertEquals(
+            [('time', a_time),
+             ('startTest', foo),
+             ('addSuccess', foo),
+             ('stopTest', foo)], result._events)
+
+
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
     result = loader.loadTestsFromName(__name__)