``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
which can be used instead of the usual python unittest parameter.
When used the value of details should be a dict from ``string`` to
-``subunit.content.Content`` objects. This is a draft API being worked on with
+``testtools.content.Content`` objects. This is a draft API being worked on with
the Python Testing In Python mail list, with the goal of permitting a common
way to provide additional data beyond a traceback, such as captured data from
disk, logging messages etc.
---------------
* subunit.chunked contains HTTP chunked encoding/decoding logic.
-* subunit.content contains a minimal assumptions MIME content representation.
-* subunit.content_type contains a MIME Content-Type representation.
+* testtools.content contains a minimal assumptions MIME content representation.
+* testtools.content_type contains a MIME Content-Type representation.
* subunit.test_results contains TestResult helper classes.
"""
import unittest
import iso8601
+from testtools import content, content_type, ExtendedToOriginalDecorator
+from testtools.testresult import _StringException
-import chunked, content, content_type, details, test_results
+import chunked, details, test_results
PROGRESS_SET = 0
of mixed protocols. By default, sys.stdout will be used for
convenience.
"""
- self.client = test_results.ExtendedToOriginalDecorator(client)
+ self.client = ExtendedToOriginalDecorator(client)
if stream is None:
stream = sys.stdout
self._stream = stream
self._stream.write(line)
-class RemoteException(Exception):
- """An exception that occured remotely to Python."""
-
- def __eq__(self, other):
- try:
- return self.args == other.args
- except AttributeError:
- return False
+try:
+ from testtools.testresult import _StringException as RemoteException
+ _remote_exception_str = '_StringException' # For testing.
+except ImportError:
+ raise ImportError ("testtools.testresult does not contain _StringException, check your version.")
class TestProtocolClient(unittest.TestResult):
def RemoteError(description=""):
- if description == "":
- description = "\n"
- return (RemoteException, RemoteException(description), None)
+ return (_StringException, _StringException(description), None)
class RemotedTestCase(unittest.TestCase):
def wasSuccessful(self):
"""Tells whether or not this result was a success"""
return self.failed_tests == 0
-
-
-class TestResultFilter(unittest.TestResult):
- """A pyunit TestResult interface implementation which filters tests.
-
- Tests that pass the filter are handed on to another TestResult instance
- for further processing/reporting. To obtain the filtered results,
- the other instance must be interrogated.
-
- :ivar result: The result that tests are passed to after filtering.
- :ivar filter_predicate: The callback run to decide whether to pass
- a result.
- """
-
- def __init__(self, result, filter_error=False, filter_failure=False,
- filter_success=True, filter_skip=False,
- filter_predicate=None):
- """Create a FilterResult object filtering to result.
-
- :param filter_error: Filter out errors.
- :param filter_failure: Filter out failures.
- :param filter_success: Filter out successful tests.
- :param filter_skip: Filter out skipped tests.
- :param filter_predicate: A callable taking (test, err) and
- returning True if the result should be passed through.
- err is None for success.
- """
- unittest.TestResult.__init__(self)
- self.result = result
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- if filter_predicate is None:
- filter_predicate = lambda test, err: True
- self.filter_predicate = filter_predicate
- # The current test (for filtering tags)
- self._current_test = None
- # Has the current test been filtered (for outputting test tags)
- self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
-
- def addError(self, test, err):
- if not self._filter_error and self.filter_predicate(test, err):
- self.result.startTest(test)
- self.result.addError(test, err)
-
- def addFailure(self, test, err):
- if not self._filter_failure and self.filter_predicate(test, err):
- self.result.startTest(test)
- self.result.addFailure(test, err)
-
- def addSkip(self, test, reason):
- if not self._filter_skip and self.filter_predicate(test, reason):
- self.result.startTest(test)
- # This is duplicated, it would be nice to have on a 'calls
- # TestResults' mixin perhaps.
- addSkip = getattr(self.result, 'addSkip', None)
- if not callable(addSkip):
- self.result.addError(test, RemoteError(reason))
- else:
- self.result.addSkip(test, reason)
-
- def addSuccess(self, test):
- if not self._filter_success and self.filter_predicate(test, None):
- self.result.startTest(test)
- self.result.addSuccess(test)
-
- def startTest(self, test):
- """Start a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- self._current_test = test
- self._current_test_filtered = False
- self._current_test_tags = set(), set()
-
- def stopTest(self, test):
- """Stop a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- if not self._current_test_filtered:
- # Tags to output for this test.
- if self._current_test_tags[0] or self._current_test_tags[1]:
- tags_method = getattr(self.result, 'tags', None)
- if callable(tags_method):
- self.result.tags(*self._current_test_tags)
- self.result.stopTest(test)
- self._current_test = None
- self._current_test_filtered = None
- self._current_test_tags = None
-
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
- if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- tags_method = getattr(self.result, 'tags', None)
- if tags_method is None:
- return
- return tags_method(new_tags, gone_tags)
-
- def id_to_orig_id(self, id):
- if id.startswith("subunit.RemotedTestCase."):
- return id[len("subunit.RemotedTestCase."):]
- return id