Merge updates from tag-collapsing-rigor.
authorJonathan Lange <jml@mumak.net>
Fri, 20 Apr 2012 11:34:31 +0000 (12:34 +0100)
committerJonathan Lange <jml@mumak.net>
Fri, 20 Apr 2012 11:34:31 +0000 (12:34 +0100)
filters/subunit-filter
python/subunit/test_results.py
python/subunit/tests/test_subunit_filter.py

index 7f5620f151d0bae68d5424dd99b1f6f344ca2772..945f80d40a5789d825bcbb9009e95601ac7deb66 100755 (executable)
@@ -36,7 +36,11 @@ from subunit import (
     TestProtocolClient,
     read_test_list,
     )
-from subunit.test_results import TestResultFilter
+from subunit.test_results import (
+    and_predicates,
+    _make_tag_filter,
+    TestResultFilter,
+    )
 
 parser = OptionParser(description=__doc__)
 parser.add_option("--error", action="store_false",
@@ -61,6 +65,12 @@ parser.add_option("--xfail", action="store_false",
     help="include expected falures", default=True, dest="xfail")
 parser.add_option("--no-xfail", action="store_true",
     help="exclude expected falures", default=True, dest="xfail")
+parser.add_option(
+    "--with-tag", type=str,
+    help="include tests with these tags", action="append", dest="with_tags")
+parser.add_option(
+    "--without-tag", type=str,
+    help="exclude tests with these tags", action="append", dest="without_tags")
 parser.add_option("-m", "--with", type=str,
     help="regexp to include (case-sensitive by default)",
     action="append", dest="with_regexps")
@@ -97,7 +107,7 @@ def _make_regexp_filter(with_regexps, without_regexps):
     with_re = with_regexps and _compile_re_from_list(with_regexps)
     without_re = without_regexps and _compile_re_from_list(without_regexps)
 
-    def check_regexps(test, outcome, err, details):
+    def check_regexps(test, outcome, err, details, tags):
         """Check if this test and error match the regexp filters."""
         test_str = str(test) + outcome + str(err) + str(details)
         if with_re and not with_re.search(test_str):
@@ -108,21 +118,37 @@ def _make_regexp_filter(with_regexps, without_regexps):
     return check_regexps
 
 
+
+def make_result(output, options, predicate):
+    """Make the result that we'll send the test outcomes to."""
+    fixup_expected_failures = set()
+    for path in options.fixup_expected_failures or ():
+        fixup_expected_failures.update(read_test_list(path))
+    return TestResultFilter(
+        TestProtocolClient(output),
+        filter_error=options.error,
+        filter_failure=options.failure,
+        filter_success=options.success,
+        filter_skip=options.skip,
+        filter_xfail=options.xfail,
+        filter_predicate=predicate,
+        fixup_expected_failures=fixup_expected_failures)
+
+
 regexp_filter = _make_regexp_filter(options.with_regexps,
         options.without_regexps)
-fixup_expected_failures = set()
-for path in options.fixup_expected_failures or ():
-    fixup_expected_failures.update(read_test_list(path))
-result = TestProtocolClient(sys.stdout)
-result = TestResultFilter(result, filter_error=options.error,
-    filter_failure=options.failure, filter_success=options.success,
-    filter_skip=options.skip, filter_xfail=options.xfail,
-    filter_predicate=regexp_filter,
-    fixup_expected_failures=fixup_expected_failures)
+tag_filter = _make_tag_filter(options.with_tags, options.without_tags)
+
+filter_predicate = and_predicates([regexp_filter, tag_filter])
+
 if options.no_passthrough:
     passthrough_stream = DiscardStream()
 else:
     passthrough_stream = None
+
+result = make_result(sys.stdout, options, tag_filter)
 test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream)
+result.startTestRun()
 test.run(result)
+result.stopTestRun()
 sys.exit(0)
index 8db40e15a8d31536794b81879c7374abb1f4f648..fea3b0780be3c9e1944453f30959c63cdd762983 100644 (file)
@@ -20,6 +20,7 @@ import csv
 import datetime
 
 import testtools
+from testtools.compat import all
 from testtools.content import (
     text_content,
     TracebackContent,
@@ -39,6 +40,9 @@ class TestResultDecorator(object):
     or features by degrading them.
     """
 
+    # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
+    # we should gut this and just use that.
+
     def __init__(self, decorated):
         """Create a TestResultDecorator forwarding to decorated."""
         # Make every decorator degrade gracefully.
@@ -288,93 +292,60 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
         self._last_received_time = a_time
 
 
-def all_true(bools):
-    """Return True if all of 'bools' are True. False otherwise."""
-    for b in bools:
-        if not b:
-            return False
-    return True
+def and_predicates(predicates):
+    """Return a predicate that is true iff all predicates are true."""
+    # XXX: Should probably be in testtools to be better used by matchers. jml
+    return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
 
 
-class TestResultFilter(TestResultDecorator):
-    """A pyunit TestResult interface implementation which filters tests.
+def _make_tag_filter(with_tags, without_tags):
+    """Make a callback that checks tests against tags."""
 
-    Tests that pass the filter are handed on to another TestResult instance
-    for further processing/reporting. To obtain the filtered results,
-    the other instance must be interrogated.
+    with_tags = with_tags and set(with_tags) or None
+    without_tags = without_tags and set(without_tags) or None
 
-    :ivar result: The result that tests are passed to after filtering.
-    :ivar filter_predicate: The callback run to decide whether to pass
-        a result.
-    """
+    def check_tags(test, outcome, err, details, tags):
+        if with_tags and not with_tags <= tags:
+            return False
+        if without_tags and bool(without_tags & tags):
+            return False
+        return True
 
-    def __init__(self, result, filter_error=False, filter_failure=False,
-        filter_success=True, filter_skip=False, filter_xfail=False,
-        filter_predicate=None, fixup_expected_failures=None):
-        """Create a FilterResult object filtering to result.
+    return check_tags
 
-        :param filter_error: Filter out errors.
-        :param filter_failure: Filter out failures.
-        :param filter_success: Filter out successful tests.
-        :param filter_skip: Filter out skipped tests.
-        :param filter_xfail: Filter out expected failure tests.
-        :param filter_predicate: A callable taking (test, outcome, err,
-            details) and returning True if the result should be passed
-            through.  err and details may be none if no error or extra
-            metadata is available. outcome is the name of the outcome such
-            as 'success' or 'failure'.
-        :param fixup_expected_failures: Set of test ids to consider known
-            failing.
-        """
-        super(TestResultFilter, self).__init__(result)
+
+class _PredicateFilter(TestResultDecorator):
+
+    def __init__(self, result, predicate):
+        super(_PredicateFilter, self).__init__(result)
         self.decorated = TimeCollapsingDecorator(
             TagCollapsingDecorator(self.decorated))
-        predicates = []
-        if filter_error:
-            predicates.append(lambda t, outcome, e, d: outcome != 'error')
-        if filter_failure:
-            predicates.append(lambda t, outcome, e, d: outcome != 'failure')
-        if filter_success:
-            predicates.append(lambda t, outcome, e, d: outcome != 'success')
-        if filter_skip:
-            predicates.append(lambda t, outcome, e, d: outcome != 'skip')
-        if filter_xfail:
-            predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
-        if filter_predicate is not None:
-            predicates.append(filter_predicate)
-        self.filter_predicate = (
-            lambda test, outcome, err, details:
-                all_true(p(test, outcome, err, details) for p in predicates))
+        self._predicate = predicate
+        self._current_tags = set()
         # The current test (for filtering tags)
         self._current_test = None
         # Has the current test been filtered (for outputting test tags)
         self._current_test_filtered = None
         # Calls to this result that we don't know whether to forward on yet.
         self._buffered_calls = []
-        if fixup_expected_failures is None:
-            self._fixup_expected_failures = frozenset()
-        else:
-            self._fixup_expected_failures = fixup_expected_failures
+
+    def filter_predicate(self, test, outcome, error, details):
+        # XXX: ExtendedToOriginalDecorator doesn't properly wrap current_tags.
+        # https://bugs.launchpad.net/testtools/+bug/978027
+        return self._predicate(
+            test, outcome, error, details, self._current_tags)
 
     def addError(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'error', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addError', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addError', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addFailure(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'failure', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addFailure', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
@@ -385,17 +356,6 @@ class TestResultFilter(TestResultDecorator):
         else:
             self._filtered()
 
-    def addSuccess(self, test, details=None):
-        if (self.filter_predicate(test, 'success', None, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addUnexpectedSuccess', [test], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addSuccess', [test], {'details': details}))
-        else:
-            self._filtered()
-
     def addExpectedFailure(self, test, err=None, details=None):
         if self.filter_predicate(test, 'expectedfailure', err, details):
             self._buffered_calls.append(
@@ -407,12 +367,16 @@ class TestResultFilter(TestResultDecorator):
         self._buffered_calls.append(
             ('addUnexpectedSuccess', [test], {'details': details}))
 
+    def addSuccess(self, test, details=None):
+        if (self.filter_predicate(test, 'success', None, details)):
+            self._buffered_calls.append(
+                ('addSuccess', [test], {'details': details}))
+        else:
+            self._filtered()
+
     def _filtered(self):
         self._current_test_filtered = True
 
-    def _failure_expected(self, test):
-        return (test.id() in self._fixup_expected_failures)
-
     def startTest(self, test):
         """Start a test.
 
@@ -430,7 +394,6 @@ class TestResultFilter(TestResultDecorator):
         correctly.
         """
         if not self._current_test_filtered:
-            # Tags to output for this test.
             for method, args, kwargs in self._buffered_calls:
                 getattr(self.decorated, method)(*args, **kwargs)
             self.decorated.stopTest(test)
@@ -438,6 +401,15 @@ class TestResultFilter(TestResultDecorator):
         self._current_test_filtered = None
         self._buffered_calls = []
 
+    def tags(self, new_tags, gone_tags):
+        new_tags, gone_tags = set(new_tags), set(gone_tags)
+        self._current_tags.update(new_tags)
+        self._current_tags.difference_update(gone_tags)
+        if self._current_test is not None:
+            self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
+        else:
+            return super(_PredicateFilter, self).tags(new_tags, gone_tags)
+
     def time(self, a_time):
         if self._current_test is not None:
             self._buffered_calls.append(('time', [a_time], {}))
@@ -450,6 +422,93 @@ class TestResultFilter(TestResultDecorator):
         return id
 
 
+class TestResultFilter(TestResultDecorator):
+    """A pyunit TestResult interface implementation which filters tests.
+
+    Tests that pass the filter are handed on to another TestResult instance
+    for further processing/reporting. To obtain the filtered results,
+    the other instance must be interrogated.
+
+    :ivar result: The result that tests are passed to after filtering.
+    :ivar filter_predicate: The callback run to decide whether to pass
+        a result.
+    """
+
+    def __init__(self, result, filter_error=False, filter_failure=False,
+        filter_success=True, filter_skip=False, filter_xfail=False,
+        filter_predicate=None, fixup_expected_failures=None):
+        """Create a FilterResult object filtering to result.
+
+        :param filter_error: Filter out errors.
+        :param filter_failure: Filter out failures.
+        :param filter_success: Filter out successful tests.
+        :param filter_skip: Filter out skipped tests.
+        :param filter_xfail: Filter out expected failure tests.
+        :param filter_predicate: A callable taking (test, outcome, err,
+            details) and returning True if the result should be passed
+            through.  err and details may be none if no error or extra
+            metadata is available. outcome is the name of the outcome such
+            as 'success' or 'failure'.
+        :param fixup_expected_failures: Set of test ids to consider known
+            failing.
+        """
+        predicates = []
+        if filter_error:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'error')
+        if filter_failure:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'failure')
+        if filter_success:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'success')
+        if filter_skip:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'skip')
+        if filter_xfail:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
+        if filter_predicate is not None:
+            def compat(test, outcome, error, details, tags):
+                # 0.0.7 and earlier did not support the 'tags' parameter.
+                try:
+                    return filter_predicate(
+                        test, outcome, error, details, tags)
+                except TypeError:
+                    return filter_predicate(test, outcome, error, details)
+            predicates.append(compat)
+        predicate = and_predicates(predicates)
+        super(TestResultFilter, self).__init__(
+            _PredicateFilter(result, predicate))
+        if fixup_expected_failures is None:
+            self._fixup_expected_failures = frozenset()
+        else:
+            self._fixup_expected_failures = fixup_expected_failures
+
+    def addError(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addError(
+                test, err=err, details=details)
+
+    def addFailure(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addFailure(
+                test, err=err, details=details)
+
+    def addSuccess(self, test, details=None):
+        if self._failure_expected(test):
+            self.addUnexpectedSuccess(test, details=details)
+        else:
+            super(TestResultFilter, self).addSuccess(test, details=details)
+
+    def _failure_expected(self, test):
+        return (test.id() in self._fixup_expected_failures)
+
+
 class TestIdPrintingResult(testtools.TestResult):
 
     def __init__(self, stream, show_times=False):
@@ -513,7 +572,7 @@ class TestIdPrintingResult(testtools.TestResult):
 class TestByTestResult(testtools.TestResult):
     """Call something every time a test completes."""
 
-    # XXX: Arguably belongs in testtools.
+# XXX: Arguably belongs in testtools.
 
     def __init__(self, on_test):
         """Construct a ``TestByTestResult``.
index 06754840eb94735db420c75b06b33b2646e1a311..35d4603563f8126a9e346a8206249f896ac065bd 100644 (file)
 """Tests for subunit.TestResultFilter."""
 
 from datetime import datetime
+import os
+import subprocess
+import sys
 from subunit import iso8601
 import unittest
 
 from testtools import TestCase
-from testtools.compat import _b, BytesIO, StringIO
+from testtools.compat import _b, BytesIO
 from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
-from subunit.test_results import TestResultFilter
+from subunit.test_results import _make_tag_filter, TestResultFilter
 
 
 class TestTestResultFilter(TestCase):
@@ -77,6 +80,21 @@ xfail todo
             filtered_result.failures])
         self.assertEqual(4, filtered_result.testsRun)
 
+    def test_tag_filter(self):
+        tag_filter = _make_tag_filter(['global'], ['local'])
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(
+            result, filter_success=False, filter_predicate=tag_filter)
+        self.run_tests(result_filter)
+        test = subunit.RemotedTestCase('passed')
+        self.assertEquals(
+            [('tags', set(['global']), set()),
+             ('startTest', test),
+             ('addSuccess', test),
+             ('stopTest', test),
+             ],
+            result._events)
+
     def test_exclude_errors(self):
         filtered_result = unittest.TestResult()
         result_filter = TestResultFilter(filtered_result, filter_error=True)
@@ -151,6 +169,8 @@ xfail todo
 
     def test_filter_predicate(self):
         """You can filter by predicate callbacks"""
+        # 0.0.7 and earlier did not support the 'tags' parameter, so we need
+        # to test that we still support behaviour without it.
         filtered_result = unittest.TestResult()
         def filter_cb(test, outcome, err, details):
             return outcome == 'success'
@@ -161,6 +181,18 @@ xfail todo
         # Only success should pass
         self.assertEqual(1, filtered_result.testsRun)
 
+    def test_filter_predicate_with_tags(self):
+        """You can filter by predicate callbacks that accept tags"""
+        filtered_result = unittest.TestResult()
+        def filter_cb(test, outcome, err, details, tags):
+            return outcome == 'success'
+        result_filter = TestResultFilter(filtered_result,
+            filter_predicate=filter_cb,
+            filter_success=False)
+        self.run_tests(result_filter)
+        # Only success should pass
+        self.assertEqual(1, filtered_result.testsRun)
+
     def test_time_ordering_preserved(self):
         # Passing a subunit stream through TestResultFilter preserves the
         # relative ordering of 'time' directives and any other subunit
@@ -202,6 +234,78 @@ xfail todo
              ('stopTest', foo), ], result._events)
 
 
+class TestFilterCommand(TestCase):
+
+    example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+    def run_command(self, args, stream):
+        root = os.path.dirname(
+            os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+        script_path = os.path.join(root, 'filters', 'subunit-filter')
+        command = [sys.executable, script_path] + list(args)
+        ps = subprocess.Popen(
+            command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE)
+        out, err = ps.communicate(stream)
+        if ps.returncode != 0:
+            raise RuntimeError("%s failed: %s" % (command, err))
+        return out
+
+    def to_events(self, stream):
+        test = subunit.ProtocolTestCase(BytesIO(stream))
+        result = ExtendedTestResult()
+        test.run(result)
+        return result._events
+
+    def test_default(self):
+        output = self.run_command([], (
+                "test: foo\n"
+                "skip: foo\n"
+                ))
+        events = self.to_events(output)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEqual(
+            [('startTest', foo),
+             ('addSkip', foo, {}),
+             ('stopTest', foo)],
+            events)
+
+    def test_tags(self):
+        output = self.run_command(['-s', '--with-tag', 'a'], (
+                "tags: a\n"
+                "test: foo\n"
+                "success: foo\n"
+                "tags: -a\n"
+                "test: bar\n"
+                "success: bar\n"
+                ))
+        events = self.to_events(output)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEqual(
+            [('tags', set(['a']), set()),
+             ('startTest', foo),
+             ('addSuccess', foo),
+             ('stopTest', foo),
+             ('tags', set(), set(['a'])),
+             ],
+            events)
+
+
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
     result = loader.loadTestsFromName(__name__)