subunit: Update to latest upstream snapshot.
[nivanova/samba-autobuild/.git] / lib / subunit / python / subunit / test_results.py
index 1c91daadc6a470dce78be0cfd89eb1dc20e715f1..33fb50e07306ad60a27739473d2cbe9bf579c4c8 100644 (file)
 
 import datetime
 
-import iso8601
 import testtools
 
+from subunit import iso8601
+
 
 # NOT a TestResult, because we are implementing the interface, not inheriting
 # it.
@@ -81,8 +82,12 @@ class TestResultDecorator(object):
     def stop(self):
         return self.decorated.stop()
 
+    @property
+    def testsRun(self):
+        return self.decorated.testsRun
+
     def tags(self, new_tags, gone_tags):
-        return self.decorated.time(new_tags, gone_tags)
+        return self.decorated.tags(new_tags, gone_tags)
 
     def time(self, a_datetime):
         return self.decorated.time(a_datetime)
@@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
         return self.decorated.time(a_datetime)
 
 
+class TagCollapsingDecorator(TestResultDecorator):
+    """Collapses many 'tags' calls into one where possible."""
+
+    def __init__(self, result):
+        super(TagCollapsingDecorator, self).__init__(result)
+        # The (new, gone) tags for the current test.
+        self._current_test_tags = None
+
+    def startTest(self, test):
+        """Start a test.
+
+        Not directly passed to the client, but used for handling of tags
+        correctly.
+        """
+        self.decorated.startTest(test)
+        self._current_test_tags = set(), set()
+
+    def stopTest(self, test):
+        """Stop a test.
+
+        Not directly passed to the client, but used for handling of tags
+        correctly.
+        """
+        # Tags to output for this test.
+        if self._current_test_tags[0] or self._current_test_tags[1]:
+            self.decorated.tags(*self._current_test_tags)
+        self.decorated.stopTest(test)
+        self._current_test_tags = None
+
+    def tags(self, new_tags, gone_tags):
+        """Handle tag instructions.
+
+        Adds and removes tags as appropriate. If a test is currently running,
+        tags are not affected for subsequent tests.
+
+        :param new_tags: Tags to add,
+        :param gone_tags: Tags to remove.
+        """
+        if self._current_test_tags is not None:
+            # gather the tags until the test stops.
+            self._current_test_tags[0].update(new_tags)
+            self._current_test_tags[0].difference_update(gone_tags)
+            self._current_test_tags[1].update(gone_tags)
+            self._current_test_tags[1].difference_update(new_tags)
+        else:
+            return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+    """Only pass on the first and last of a consecutive sequence of times."""
+
+    def __init__(self, decorated):
+        super(TimeCollapsingDecorator, self).__init__(decorated)
+        self._last_received_time = None
+        self._last_sent_time = None
+
+    def _before_event(self):
+        if self._last_received_time is None:
+            return
+        if self._last_received_time != self._last_sent_time:
+            self.decorated.time(self._last_received_time)
+            self._last_sent_time = self._last_received_time
+        self._last_received_time = None
+
+    def time(self, a_time):
+        # Don't upcall, because we don't want to call _before_event, it's only
+        # for non-time events.
+        if self._last_received_time is None:
+            self.decorated.time(a_time)
+            self._last_sent_time = a_time
+        self._last_received_time = a_time
+
+
+def all_true(bools):
+    """Return True if all of 'bools' are True. False otherwise."""
+    for b in bools:
+        if not b:
+            return False
+    return True
+
+
 class TestResultFilter(TestResultDecorator):
     """A pyunit TestResult interface implementation which filters tests.
 
@@ -208,82 +294,110 @@ class TestResultFilter(TestResultDecorator):
     """
 
     def __init__(self, result, filter_error=False, filter_failure=False,
-        filter_success=True, filter_skip=False,
-        filter_predicate=None):
+        filter_success=True, filter_skip=False, filter_xfail=False,
+        filter_predicate=None, fixup_expected_failures=None):
         """Create a FilterResult object filtering to result.
 
         :param filter_error: Filter out errors.
         :param filter_failure: Filter out failures.
         :param filter_success: Filter out successful tests.
         :param filter_skip: Filter out skipped tests.
+        :param filter_xfail: Filter out expected failure tests.
         :param filter_predicate: A callable taking (test, outcome, err,
             details) and returning True if the result should be passed
             through.  err and details may be none if no error or extra
             metadata is available. outcome is the name of the outcome such
             as 'success' or 'failure'.
+        :param fixup_expected_failures: Set of test ids to consider known
+            failing.
         """
-        TestResultDecorator.__init__(self, result)
-        self._filter_error = filter_error
-        self._filter_failure = filter_failure
-        self._filter_success = filter_success
-        self._filter_skip = filter_skip
-        if filter_predicate is None:
-            filter_predicate = lambda test, outcome, err, details: True
-        self.filter_predicate = filter_predicate
+        super(TestResultFilter, self).__init__(result)
+        self.decorated = TimeCollapsingDecorator(
+            TagCollapsingDecorator(self.decorated))
+        predicates = []
+        if filter_error:
+            predicates.append(lambda t, outcome, e, d: outcome != 'error')
+        if filter_failure:
+            predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+        if filter_success:
+            predicates.append(lambda t, outcome, e, d: outcome != 'success')
+        if filter_skip:
+            predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+        if filter_xfail:
+            predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
+        if filter_predicate is not None:
+            predicates.append(filter_predicate)
+        self.filter_predicate = (
+            lambda test, outcome, err, details:
+                all_true(p(test, outcome, err, details) for p in predicates))
         # The current test (for filtering tags)
         self._current_test = None
         # Has the current test been filtered (for outputting test tags)
         self._current_test_filtered = None
-        # The (new, gone) tags for the current test.
-        self._current_test_tags = None
+        # Calls to this result that we don't know whether to forward on yet.
+        self._buffered_calls = []
+        if fixup_expected_failures is None:
+            self._fixup_expected_failures = frozenset()
+        else:
+            self._fixup_expected_failures = fixup_expected_failures
 
     def addError(self, test, err=None, details=None):
-        if (not self._filter_error and
-            self.filter_predicate(test, 'error', err, details)):
-            self.decorated.startTest(test)
-            self.decorated.addError(test, err, details=details)
+        if (self.filter_predicate(test, 'error', err, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addExpectedFailure', [test, err], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addError', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addFailure(self, test, err=None, details=None):
-        if (not self._filter_failure and
-            self.filter_predicate(test, 'failure', err, details)):
-            self.decorated.startTest(test)
-            self.decorated.addFailure(test, err, details=details)
+        if (self.filter_predicate(test, 'failure', err, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addExpectedFailure', [test, err], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addSkip(self, test, reason=None, details=None):
-        if (not self._filter_skip and
-            self.filter_predicate(test, 'skip', reason, details)):
-            self.decorated.startTest(test)
-            self.decorated.addSkip(test, reason, details=details)
+        if (self.filter_predicate(test, 'skip', reason, details)):
+            self._buffered_calls.append(
+                ('addSkip', [test, reason], {'details': details}))
         else:
             self._filtered()
 
     def addSuccess(self, test, details=None):
-        if (not self._filter_success and
-            self.filter_predicate(test, 'success', None, details)):
-            self.decorated.startTest(test)
-            self.decorated.addSuccess(test, details=details)
+        if (self.filter_predicate(test, 'success', None, details)):
+            if self._failure_expected(test):
+                self._buffered_calls.append(
+                    ('addUnexpectedSuccess', [test], {'details': details}))
+            else:
+                self._buffered_calls.append(
+                    ('addSuccess', [test], {'details': details}))
         else:
             self._filtered()
 
     def addExpectedFailure(self, test, err=None, details=None):
         if self.filter_predicate(test, 'expectedfailure', err, details):
-            self.decorated.startTest(test)
-            return self.decorated.addExpectedFailure(test, err,
-                details=details)
+            self._buffered_calls.append(
+                ('addExpectedFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addUnexpectedSuccess(self, test, details=None):
-        self.decorated.startTest(test)
-        return self.decorated.addUnexpectedSuccess(test, details=details)
+        self._buffered_calls.append(
+            ('addUnexpectedSuccess', [test], {'details': details}))
 
     def _filtered(self):
         self._current_test_filtered = True
 
+    def _failure_expected(self, test):
+        return (test.id() in self._fixup_expected_failures)
+
     def startTest(self, test):
         """Start a test.
 
@@ -292,7 +406,7 @@ class TestResultFilter(TestResultDecorator):
         """
         self._current_test = test
         self._current_test_filtered = False
-        self._current_test_tags = set(), set()
+        self._buffered_calls.append(('startTest', [test], {}))
 
     def stopTest(self, test):
         """Stop a test.
@@ -302,29 +416,18 @@ class TestResultFilter(TestResultDecorator):
         """
         if not self._current_test_filtered:
             # Tags to output for this test.
-            if self._current_test_tags[0] or self._current_test_tags[1]:
-                self.decorated.tags(*self._current_test_tags)
+            for method, args, kwargs in self._buffered_calls:
+                getattr(self.decorated, method)(*args, **kwargs)
             self.decorated.stopTest(test)
         self._current_test = None
         self._current_test_filtered = None
-        self._current_test_tags = None
+        self._buffered_calls = []
 
-    def tags(self, new_tags, gone_tags):
-        """Handle tag instructions.
-
-        Adds and removes tags as appropriate. If a test is currently running,
-        tags are not affected for subsequent tests.
-
-        :param new_tags: Tags to add,
-        :param gone_tags: Tags to remove.
-        """
+    def time(self, a_time):
         if self._current_test is not None:
-            # gather the tags until the test stops.
-            self._current_test_tags[0].update(new_tags)
-            self._current_test_tags[0].difference_update(gone_tags)
-            self._current_test_tags[1].update(gone_tags)
-            self._current_test_tags[1].difference_update(new_tags)
-        return self.decorated.tags(new_tags, gone_tags)
+            self._buffered_calls.append(('time', [a_time], {}))
+        else:
+            return self.decorated.time(a_time)
 
     def id_to_orig_id(self, id):
         if id.startswith("subunit.RemotedTestCase."):
@@ -336,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult):
 
     def __init__(self, stream, show_times=False):
         """Create a FilterResult object outputting to stream."""
-        testtools.TestResult.__init__(self)
+        super(TestIdPrintingResult, self).__init__()
         self._stream = stream
         self.failed_tests = 0
-        self.__time = 0
+        self.__time = None
         self.show_times = show_times
         self._test = None
         self._test_duration = 0
@@ -355,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult):
     def addSuccess(self, test):
         self._test = test
 
+    def addSkip(self, test, reason=None, details=None):
+        self._test = test
+
+    def addUnexpectedSuccess(self, test, details=None):
+        self.failed_tests += 1
+        self._test = test
+
+    def addExpectedFailure(self, test, err=None, details=None):
+        self._test = test
+
     def reportTest(self, test, duration):
         if self.show_times:
             seconds = duration.seconds