subunit: Update to latest upstream version.
authorJelmer Vernooij <jelmer@samba.org>
Wed, 14 Nov 2012 08:47:16 +0000 (09:47 +0100)
committerJelmer Vernooij <jelmer@samba.org>
Wed, 14 Nov 2012 11:11:57 +0000 (12:11 +0100)
Autobuild-User(master): Jelmer Vernooij <jelmer@samba.org>
Autobuild-Date(master): Wed Nov 14 12:11:58 CET 2012 on sn-devel-104

21 files changed:
lib/subunit/Makefile.am
lib/subunit/NEWS
lib/subunit/configure.ac
lib/subunit/filters/subunit-filter
lib/subunit/filters/subunit-notify
lib/subunit/filters/subunit2csv [new file with mode: 0755]
lib/subunit/filters/subunit2junitxml
lib/subunit/perl/Makefile.PL.in
lib/subunit/python/subunit/__init__.py
lib/subunit/python/subunit/filters.py [new file with mode: 0644]
lib/subunit/python/subunit/iso8601.py
lib/subunit/python/subunit/test_results.py
lib/subunit/python/subunit/tests/sample-script.py
lib/subunit/python/subunit/tests/sample-two-script.py
lib/subunit/python/subunit/tests/test_run.py
lib/subunit/python/subunit/tests/test_subunit_filter.py
lib/subunit/python/subunit/tests/test_test_protocol.py
lib/subunit/python/subunit/tests/test_test_results.py
lib/subunit/runtests.py
lib/subunit/setup.py
lib/subunit/shell/tests/test_function_output.sh

index 716fa0fe211bdf0e05d9cf571255a140f7c60ce8..310c0426ae500b5a39295672db4703354fbd2a7f 100644 (file)
@@ -28,6 +28,7 @@ EXTRA_DIST =  \
        python/subunit/tests/test_details.py \
        python/subunit/tests/test_progress_model.py \
        python/subunit/tests/test_subunit_filter.py \
+       python/subunit/tests/test_run.py \
        python/subunit/tests/test_subunit_stats.py \
        python/subunit/tests/test_subunit_tags.py \
        python/subunit/tests/test_tap2subunit.py \
@@ -67,6 +68,7 @@ pkgpython_PYTHON = \
        python/subunit/__init__.py \
        python/subunit/chunked.py \
        python/subunit/details.py \
+       python/subunit/filters.py \
        python/subunit/iso8601.py \
        python/subunit/progress_model.py \
        python/subunit/run.py \
index 713d272bfaa9186cffc1b8e56096043d2f5adf54..f28ec5a6df9811b698d255b67bc7b988c4411570 100644 (file)
@@ -5,12 +5,30 @@ subunit release notes
 NEXT (In development)
 ---------------------
 
+BUG FIXES
+~~~~~~~~~
+
+* ``python/subunit/tests/test_run.py`` and ``python/subunit/filters.py`` were
+  not included in the 0.0.8 tarball. (Robert Collins)
+
+0.0.8
+-----
+
 IMPROVEMENTS
 ~~~~~~~~~~~~
 
 * Perl module now correctly outputs "failure" instead of "fail".  (Stewart Smith)
 
-* Shell functions now output timestamps. (Stewart Smith)
+* Shell functions now output timestamps. (Stewart Smith, Robert Collins)
+
+* 'subunit2csv' script that converts subunit output to CSV format.
+  (Jonathan Lange)
+
+* ``TagCollapsingDecorator`` now correctly distinguishes between local and
+  global tags.  (Jonathan Lange)
+
+* ``TestResultFilter`` always forwards ``time:`` events.
+  (Benji York, Brad Crittenden)
 
 BUG FIXES
 ~~~~~~~~~
@@ -22,6 +40,23 @@ BUG FIXES
   '--no-xfail', '--no-passthrough, '--no-success', and gives you just the
   failure stream. (John Arbash Meinel)
 
+* Python2.6 support was broken by the fixup feature.
+  (Arfrever Frehtes Taifersar Arahesis, #987490)
+
+* Python3 support regressed in trunk.
+  (Arfrever Frehtes Taifersar Arahesis, #987514)
+
+* Python3 support was insufficiently robust in detecting unicode streams.
+  (Robert Collins, Arfrever Frehtes Taifersar Arahesis)
+
+* Tag support has been implemented for TestProtocolClient.
+  (Robert Collins, #518016)
+
+* Tags can now be filtered. (Jonathan Lange, #664171)
+
+* Test suite works with latest testtools (but not older ones - formatting
+  changes only). (Robert Collins)
+
 0.0.7
 -----
 
index 4375c379a3cbf0eb0ca4e9e8b8046e5b04c7b734..223b3c9b4997b2e0fdbb0c0459066ee550c210bc 100644 (file)
@@ -1,6 +1,6 @@
 m4_define([SUBUNIT_MAJOR_VERSION], [0])
 m4_define([SUBUNIT_MINOR_VERSION], [0])
-m4_define([SUBUNIT_MICRO_VERSION], [7])
+m4_define([SUBUNIT_MICRO_VERSION], [8])
 m4_define([SUBUNIT_VERSION],
 m4_defn([SUBUNIT_MAJOR_VERSION]).m4_defn([SUBUNIT_MINOR_VERSION]).m4_defn([SUBUNIT_MICRO_VERSION]))
 AC_PREREQ([2.59])
index 7f5620f151d0bae68d5424dd99b1f6f344ca2772..6a1ecc9a010118915403a70bcb923abf66d4d189 100755 (executable)
@@ -36,41 +36,59 @@ from subunit import (
     TestProtocolClient,
     read_test_list,
     )
-from subunit.test_results import TestResultFilter
-
-parser = OptionParser(description=__doc__)
-parser.add_option("--error", action="store_false",
-    help="include errors", default=False, dest="error")
-parser.add_option("-e", "--no-error", action="store_true",
-    help="exclude errors", dest="error")
-parser.add_option("--failure", action="store_false",
-    help="include failures", default=False, dest="failure")
-parser.add_option("-f", "--no-failure", action="store_true",
-    help="exclude failures", dest="failure")
-parser.add_option("--passthrough", action="store_false",
-    help="Show all non subunit input.", default=False, dest="no_passthrough")
-parser.add_option("--no-passthrough", action="store_true",
-    help="Hide all non subunit input.", default=False, dest="no_passthrough")
-parser.add_option("-s", "--success", action="store_false",
-    help="include successes", dest="success")
-parser.add_option("--no-success", action="store_true",
-    help="exclude successes", default=True, dest="success")
-parser.add_option("--no-skip", action="store_true",
-    help="exclude skips", dest="skip")
-parser.add_option("--xfail", action="store_false",
-    help="include expected falures", default=True, dest="xfail")
-parser.add_option("--no-xfail", action="store_true",
-    help="exclude expected falures", default=True, dest="xfail")
-parser.add_option("-m", "--with", type=str,
-    help="regexp to include (case-sensitive by default)",
-    action="append", dest="with_regexps")
-parser.add_option("--fixup-expected-failures", type=str,
-    help="File with list of test ids that are expected to fail; on failure "
-         "their result will be changed to xfail; on success they will be "
-         "changed to error.", dest="fixup_expected_failures", action="append")
-parser.add_option("--without", type=str,
-    help="regexp to exclude (case-sensitive by default)",
-    action="append", dest="without_regexps")
+from subunit.filters import filter_by_result
+from subunit.test_results import (
+    and_predicates,
+    make_tag_filter,
+    TestResultFilter,
+    )
+
+
+def make_options(description):
+    parser = OptionParser(description=__doc__)
+    parser.add_option("--error", action="store_false",
+        help="include errors", default=False, dest="error")
+    parser.add_option("-e", "--no-error", action="store_true",
+        help="exclude errors", dest="error")
+    parser.add_option("--failure", action="store_false",
+        help="include failures", default=False, dest="failure")
+    parser.add_option("-f", "--no-failure", action="store_true",
+        help="exclude failures", dest="failure")
+    parser.add_option("--passthrough", action="store_false",
+        help="Show all non subunit input.", default=False, dest="no_passthrough")
+    parser.add_option("--no-passthrough", action="store_true",
+        help="Hide all non subunit input.", default=False, dest="no_passthrough")
+    parser.add_option("-s", "--success", action="store_false",
+        help="include successes", dest="success")
+    parser.add_option("--no-success", action="store_true",
+        help="exclude successes", default=True, dest="success")
+    parser.add_option("--no-skip", action="store_true",
+        help="exclude skips", dest="skip")
+    parser.add_option("--xfail", action="store_false",
+        help="include expected falures", default=True, dest="xfail")
+    parser.add_option("--no-xfail", action="store_true",
+        help="exclude expected falures", default=True, dest="xfail")
+    parser.add_option(
+        "--with-tag", type=str,
+        help="include tests with these tags", action="append", dest="with_tags")
+    parser.add_option(
+        "--without-tag", type=str,
+        help="exclude tests with these tags", action="append", dest="without_tags")
+    parser.add_option("-m", "--with", type=str,
+        help="regexp to include (case-sensitive by default)",
+        action="append", dest="with_regexps")
+    parser.add_option("--fixup-expected-failures", type=str,
+        help="File with list of test ids that are expected to fail; on failure "
+             "their result will be changed to xfail; on success they will be "
+             "changed to error.", dest="fixup_expected_failures", action="append")
+    parser.add_option("--without", type=str,
+        help="regexp to exclude (case-sensitive by default)",
+        action="append", dest="without_regexps")
+    parser.add_option("-F", "--only-genuine-failures", action="callback",
+        callback=only_genuine_failures_callback,
+        help="Only pass through failures and exceptions.")
+    return parser
+
 
 def only_genuine_failures_callback(option, opt, value, parser):
     parser.rargs.insert(0, '--no-passthrough')
@@ -78,11 +96,6 @@ def only_genuine_failures_callback(option, opt, value, parser):
     parser.rargs.insert(0, '--no-skip')
     parser.rargs.insert(0, '--no-success')
 
-parser.add_option("-F", "--only-genuine-failures", action="callback",
-    callback=only_genuine_failures_callback,
-    help="Only pass through failures and exceptions.")
-
-(options, args) = parser.parse_args()
 
 def _compile_re_from_list(l):
     return re.compile("|".join(l), re.MULTILINE)
@@ -97,7 +110,7 @@ def _make_regexp_filter(with_regexps, without_regexps):
     with_re = with_regexps and _compile_re_from_list(with_regexps)
     without_re = without_regexps and _compile_re_from_list(without_regexps)
 
-    def check_regexps(test, outcome, err, details):
+    def check_regexps(test, outcome, err, details, tags):
         """Check if this test and error match the regexp filters."""
         test_str = str(test) + outcome + str(err) + str(details)
         if with_re and not with_re.search(test_str):
@@ -108,21 +121,38 @@ def _make_regexp_filter(with_regexps, without_regexps):
     return check_regexps
 
 
-regexp_filter = _make_regexp_filter(options.with_regexps,
-        options.without_regexps)
-fixup_expected_failures = set()
-for path in options.fixup_expected_failures or ():
-    fixup_expected_failures.update(read_test_list(path))
-result = TestProtocolClient(sys.stdout)
-result = TestResultFilter(result, filter_error=options.error,
-    filter_failure=options.failure, filter_success=options.success,
-    filter_skip=options.skip, filter_xfail=options.xfail,
-    filter_predicate=regexp_filter,
-    fixup_expected_failures=fixup_expected_failures)
-if options.no_passthrough:
-    passthrough_stream = DiscardStream()
-else:
-    passthrough_stream = None
-test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream)
-test.run(result)
-sys.exit(0)
+def _make_result(output, options, predicate):
+    """Make the result that we'll send the test outcomes to."""
+    fixup_expected_failures = set()
+    for path in options.fixup_expected_failures or ():
+        fixup_expected_failures.update(read_test_list(path))
+    return TestResultFilter(
+        TestProtocolClient(output),
+        filter_error=options.error,
+        filter_failure=options.failure,
+        filter_success=options.success,
+        filter_skip=options.skip,
+        filter_xfail=options.xfail,
+        filter_predicate=predicate,
+        fixup_expected_failures=fixup_expected_failures)
+
+
+def main():
+    parser = make_options(__doc__)
+    (options, args) = parser.parse_args()
+
+    regexp_filter = _make_regexp_filter(
+        options.with_regexps, options.without_regexps)
+    tag_filter = make_tag_filter(options.with_tags, options.without_tags)
+    filter_predicate = and_predicates([regexp_filter, tag_filter])
+
+    filter_by_result(
+        lambda output_to: _make_result(sys.stdout, options, filter_predicate),
+        output_path=None,
+        passthrough=(not options.no_passthrough),
+        forward=False)
+    sys.exit(0)
+
+
+if __name__ == '__main__':
+    main()
index 758e7fc8fff196d53ca1c26df75acd146d249b76..8cce2d16096b3f433d103fac9a4a9f3dc8c4c02f 100755 (executable)
 
 """Notify the user of a finished test run."""
 
-from optparse import OptionParser
-import sys
-
 import pygtk
 pygtk.require('2.0')
 import pynotify
 
-from subunit import DiscardStream, ProtocolTestCase, TestResultStats
+from subunit import TestResultStats
+from subunit.filters import run_filter_script
 
 if not pynotify.init("Subunit-notify"):
     sys.exit(1)
 
-parser = OptionParser(description=__doc__)
-parser.add_option("--no-passthrough", action="store_true",
-    help="Hide all non subunit input.", default=False, dest="no_passthrough")
-parser.add_option("-f", "--forward", action="store_true", default=False,
-    help="Forward subunit stream on stdout.")
-(options, args) = parser.parse_args()
-result = TestResultStats(sys.stdout)
-if options.no_passthrough:
-    passthrough_stream = DiscardStream()
-else:
-    passthrough_stream = None
-if options.forward:
-    forward_stream = sys.stdout
-else:
-    forward_stream = None
-test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream,
-                        forward=forward_stream)
-test.run(result)
-if result.failed_tests > 0:
-    summary = "Test run failed"
-else:
-    summary = "Test run successful"
-body = "Total tests: %d; Passed: %d; Failed: %d" % (
-    result.total_tests,
-    result.passed_tests,
-    result.failed_tests,
+
+def notify_of_result(result):
+    if result.failed_tests > 0:
+        summary = "Test run failed"
+    else:
+        summary = "Test run successful"
+    body = "Total tests: %d; Passed: %d; Failed: %d" % (
+        result.total_tests,
+        result.passed_tests,
+        result.failed_tests,
     )
-nw = pynotify.Notification(summary, body)
-nw.show()
+    nw = pynotify.Notification(summary, body)
+    nw.show()
+
 
-if result.wasSuccessful():
-    exit_code = 0
-else:
-    exit_code = 1
-sys.exit(exit_code)
+run_filter_script(TestResultStats, __doc__, notify_of_result)
diff --git a/lib/subunit/filters/subunit2csv b/lib/subunit/filters/subunit2csv
new file mode 100755 (executable)
index 0000000..14620ff
--- /dev/null
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+#  subunit: extensions to python unittest to get test results from subprocesses.
+#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
+#
+#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+#  license at the users choice. A copy of both licenses are available in the
+#  project source as Apache-2.0 and BSD. You may not use this file except in
+#  compliance with one of these two licences.
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under these licenses is d on an "AS IS" BASIS, WITHOUT
+#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+#  license you chose for the specific language governing permissions and
+#  limitations under that license.
+#
+
+"""Turn a subunit stream into a CSV"""
+
+from subunit.filters import run_filter_script
+from subunit.test_results import CsvResult
+
+
+run_filter_script(CsvResult, __doc__)
index bea795d2bdd801f546c41e1d2b6d6dbcefd79ccc..d568c71dd4a668cb0857d6d6ee0456bdd3dba252 100755 (executable)
 
 """Filter a subunit stream to get aggregate statistics."""
 
-from optparse import OptionParser
+
 import sys
-import unittest
+from subunit.filters import run_filter_script
 
-from subunit import DiscardStream, ProtocolTestCase
 try:
     from junitxml import JUnitXmlResult
 except ImportError:
@@ -28,38 +27,5 @@ except ImportError:
         "http://pypi.python.org/pypi/junitxml) is required for this filter.")
     raise
 
-parser = OptionParser(description=__doc__)
-parser.add_option("--no-passthrough", action="store_true",
-    help="Hide all non subunit input.", default=False, dest="no_passthrough")
-parser.add_option("-o", "--output-to",
-    help="Output the XML to this path rather than stdout.")
-parser.add_option("-f", "--forward", action="store_true", default=False,
-    help="Forward subunit stream on stdout.")
-(options, args) = parser.parse_args()
-if options.output_to is None:
-    output_to = sys.stdout
-else:
-    output_to = file(options.output_to, 'wb')
-try:
-    result = JUnitXmlResult(output_to)
-    if options.no_passthrough:
-        passthrough_stream = DiscardStream()
-    else:
-        passthrough_stream = None
-    if options.forward:
-        forward_stream = sys.stdout
-    else:
-        forward_stream = None
-    test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream,
-       forward=forward_stream)
-    result.startTestRun()
-    test.run(result)
-    result.stopTestRun()
-finally:
-    if options.output_to is not None:
-        output_to.close()
-if result.wasSuccessful():
-    exit_code = 0
-else:
-    exit_code = 1
-sys.exit(exit_code)
+
+run_filter_script(JUnitXmlResult, __doc__)
index 26e1c181f0e0bc9f48e45c0a62c7479064821187..cf5e6c4c76b509d130880c10c64b6eeffd9287fe 100755 (executable)
@@ -13,6 +13,7 @@ check: # test
 
 uninstall_distcheck:
        rm -fr $(DESTINSTALLARCHLIB)
+       rm MYMETA.yml
 
 VPATH = @srcdir@
 .PHONY: uninstall_distcheck
index b4c939756f268277e07ff92fff000eed7b69d424..6015c0e68ca050237178c42efa0b8094668091ea 100644 (file)
@@ -121,8 +121,14 @@ import re
 import subprocess
 import sys
 import unittest
+if sys.version_info > (3, 0):
+    from io import UnsupportedOperation as _UnsupportedOperation
+else:
+    _UnsupportedOperation = AttributeError
+
 
 from testtools import content, content_type, ExtendedToOriginalDecorator
+from testtools.content import TracebackContent
 from testtools.compat import _b, _u, BytesIO, StringIO
 try:
     from testtools.testresult.real import _StringException
@@ -182,9 +188,15 @@ def tags_to_new_gone(tags):
 class DiscardStream(object):
     """A filelike object which discards what is written to it."""
 
+    def fileno(self):
+        raise _UnsupportedOperation()
+
     def write(self, bytes):
         pass
 
+    def read(self, len=0):
+        return _b('')
+
 
 class _ParserState(object):
     """State for the subunit parser."""
@@ -599,8 +611,8 @@ class TestProtocolClient(testresult.TestResult):
 
     def __init__(self, stream):
         testresult.TestResult.__init__(self)
+        stream = _make_stream_binary(stream)
         self._stream = stream
-        _make_stream_binary(stream)
         self._progress_fmt = _b("progress: ")
         self._bytes_eol = _b("\n")
         self._progress_plus = _b("+")
@@ -682,10 +694,9 @@ class TestProtocolClient(testresult.TestResult):
                 raise ValueError
         if error is not None:
             self._stream.write(self._start_simple)
-            # XXX: this needs to be made much stricter, along the lines of
-            # Martin[gz]'s work in testtools. Perhaps subunit can use that?
-            for line in self._exc_info_to_unicode(error, test).splitlines():
-                self._stream.write(("%s\n" % line).encode('utf8'))
+            tb_content = TracebackContent(error, test)
+            for bytes in tb_content.iter_bytes():
+                self._stream.write(bytes)
         elif details is not None:
             self._write_details(details)
         else:
@@ -755,6 +766,15 @@ class TestProtocolClient(testresult.TestResult):
         self._stream.write(self._progress_fmt + prefix + offset +
             self._bytes_eol)
 
+    def tags(self, new_tags, gone_tags):
+        """Inform the client about tags added/removed from the stream."""
+        if not new_tags and not gone_tags:
+            return
+        tags = set([tag.encode('utf8') for tag in new_tags])
+        tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
+        tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
+        self._stream.write(tag_line)
+
     def time(self, a_datetime):
         """Inform the client of the time.
 
@@ -1122,7 +1142,7 @@ class ProtocolTestCase(object):
     :seealso: TestProtocolServer (the subunit wire protocol parser).
     """
 
-    def __init__(self, stream, passthrough=None, forward=False):
+    def __init__(self, stream, passthrough=None, forward=None):
         """Create a ProtocolTestCase reading from stream.
 
         :param stream: A filelike object which a subunit stream can be read
@@ -1132,9 +1152,11 @@ class ProtocolTestCase(object):
         :param forward: A stream to pass subunit input on to. If not supplied
             subunit input is not forwarded.
         """
+        stream = _make_stream_binary(stream)
         self._stream = stream
-        _make_stream_binary(stream)
         self._passthrough = passthrough
+        if forward is not None:
+            forward = _make_stream_binary(forward)
         self._forward = forward
 
     def __call__(self, result=None):
@@ -1217,11 +1239,6 @@ def get_default_formatter():
         return stream
 
 
-if sys.version_info > (3, 0):
-    from io import UnsupportedOperation as _NoFilenoError
-else:
-    _NoFilenoError = AttributeError
-
 def read_test_list(path):
     """Read a list of test ids from a file on disk.
 
@@ -1236,15 +1253,37 @@ def read_test_list(path):
 
 
 def _make_stream_binary(stream):
-    """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
+    """Ensure that a stream will be binary safe. See _make_binary_on_windows.
+    
+    :return: A binary version of the same stream (some streams cannot be
+        'fixed' but can be unwrapped).
+    """
     try:
         fileno = stream.fileno()
-    except _NoFilenoError:
-        return
-    _make_binary_on_windows(fileno)
+    except _UnsupportedOperation:
+        pass
+    else:
+        _make_binary_on_windows(fileno)
+    return _unwrap_text(stream)
 
 def _make_binary_on_windows(fileno):
     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
     if sys.platform == "win32":
         import msvcrt
         msvcrt.setmode(fileno, os.O_BINARY)
+
+
+def _unwrap_text(stream):
+    """Unwrap stream if it is a text stream to get the original buffer."""
+    if sys.version_info > (3, 0):
+        try:
+            # Read streams
+            if type(stream.read(0)) is str:
+                return stream.buffer
+        except (_UnsupportedOperation, IOError):
+            # Cannot read from the stream: try via writes
+            try:
+                stream.write(_b(''))
+            except TypeError:
+                return stream.buffer
+    return stream
diff --git a/lib/subunit/python/subunit/filters.py b/lib/subunit/python/subunit/filters.py
new file mode 100644 (file)
index 0000000..dc3fd8a
--- /dev/null
@@ -0,0 +1,125 @@
+#  subunit: extensions to python unittest to get test results from subprocesses.
+#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
+#
+#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+#  license at the users choice. A copy of both licenses are available in the
+#  project source as Apache-2.0 and BSD. You may not use this file except in
+#  compliance with one of these two licences.
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+#  license you chose for the specific language governing permissions and
+#  limitations under that license.
+#
+
+
+from optparse import OptionParser
+import sys
+
+from subunit import DiscardStream, ProtocolTestCase
+
+
+def make_options(description):
+    parser = OptionParser(description=description)
+    parser.add_option(
+        "--no-passthrough", action="store_true",
+        help="Hide all non subunit input.", default=False,
+        dest="no_passthrough")
+    parser.add_option(
+        "-o", "--output-to",
+        help="Send the output to this path rather than stdout.")
+    parser.add_option(
+        "-f", "--forward", action="store_true", default=False,
+        help="Forward subunit stream on stdout.")
+    return parser
+
+
+def run_tests_from_stream(input_stream, result, passthrough_stream=None,
+                          forward_stream=None):
+    """Run tests from a subunit input stream through 'result'.
+
+    :param input_stream: A stream containing subunit input.
+    :param result: A TestResult that will receive the test events.
+    :param passthrough_stream: All non-subunit input received will be
+        sent to this stream.  If not provided, uses the ``TestProtocolServer``
+        default, which is ``sys.stdout``.
+    :param forward_stream: All subunit input received will be forwarded
+        to this stream.  If not provided, uses the ``TestProtocolServer``
+        default, which is to not forward any input.
+    """
+    test = ProtocolTestCase(
+        input_stream, passthrough=passthrough_stream,
+        forward=forward_stream)
+    result.startTestRun()
+    test.run(result)
+    result.stopTestRun()
+
+
+def filter_by_result(result_factory, output_path, passthrough, forward,
+                     input_stream=sys.stdin):
+    """Filter an input stream using a test result.
+
+    :param result_factory: A callable that when passed an output stream
+        returns a TestResult.  It is expected that this result will output
+        to the given stream.
+    :param output_path: A path send output to.  If None, output will be go
+        to ``sys.stdout``.
+    :param passthrough: If True, all non-subunit input will be sent to
+        ``sys.stdout``.  If False, that input will be discarded.
+    :param forward: If True, all subunit input will be forwarded directly to
+        ``sys.stdout`` as well as to the ``TestResult``.
+    :param input_stream: The source of subunit input.  Defaults to
+        ``sys.stdin``.
+    :return: A test result with the resultts of the run.
+    """
+    if passthrough:
+        passthrough_stream = sys.stdout
+    else:
+        passthrough_stream = DiscardStream()
+
+    if forward:
+        forward_stream = sys.stdout
+    else:
+        forward_stream = DiscardStream()
+
+    if output_path is None:
+        output_to = sys.stdout
+    else:
+        output_to = file(output_path, 'wb')
+
+    try:
+        result = result_factory(output_to)
+        run_tests_from_stream(
+            input_stream, result, passthrough_stream, forward_stream)
+    finally:
+        if output_path:
+            output_to.close()
+    return result
+
+
+def run_filter_script(result_factory, description, post_run_hook=None):
+    """Main function for simple subunit filter scripts.
+
+    Many subunit filter scripts take a stream of subunit input and use a
+    TestResult to handle the events generated by that stream.  This function
+    wraps a lot of the boiler-plate around that by making a script with
+    options for handling passthrough information and stream forwarding, and
+    that will exit with a successful return code (i.e. 0) if the input stream
+    represents a successful test run.
+
+    :param result_factory: A callable that takes an output stream and returns
+        a test result that outputs to that stream.
+    :param description: A description of the filter script.
+    """
+    parser = make_options(description)
+    (options, args) = parser.parse_args()
+    result = filter_by_result(
+        result_factory, options.output_to, not options.no_passthrough,
+        options.forward)
+    if post_run_hook:
+        post_run_hook(result)
+    if result.wasSuccessful():
+        sys.exit(0)
+    else:
+        sys.exit(1)
index cbe9a3b3ebd9e28e78936ffa115a8d5b601e910f..07855d0975c944a054c707f515f40402fcd2ea72 100644 (file)
@@ -127,7 +127,7 @@ def parse_date(datestring, default_timezone=UTC):
     if groups["fraction"] is None:
         groups["fraction"] = 0
     else:
-        groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6)
+        groups["fraction"] = int(float("0.%s" % groups["fraction"].decode()) * 1e6)
     return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]),
         int(groups["hour"]), int(groups["minute"]), int(groups["second"]),
         int(groups["fraction"]), tz)
index 33fb50e07306ad60a27739473d2cbe9bf579c4c8..c00a2d3e9706cf3c2011b1c35856e63ff08402b7 100644 (file)
 
 """TestResult helper classes used to by subunit."""
 
+import csv
 import datetime
 
 import testtools
+from testtools.compat import all
+from testtools.content import (
+    text_content,
+    TracebackContent,
+    )
 
 from subunit import iso8601
 
@@ -34,6 +40,9 @@ class TestResultDecorator(object):
     or features by degrading them.
     """
 
+    # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
+    # we should gut this and just use that.
+
     def __init__(self, decorated):
         """Create a TestResultDecorator forwarding to decorated."""
         # Make every decorator degrade gracefully.
@@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
         return self.decorated.time(a_datetime)
 
 
-class TagCollapsingDecorator(TestResultDecorator):
-    """Collapses many 'tags' calls into one where possible."""
+class TagsMixin(object):
 
-    def __init__(self, result):
-        super(TagCollapsingDecorator, self).__init__(result)
-        # The (new, gone) tags for the current test.
-        self._current_test_tags = None
+    def __init__(self):
+        self._clear_tags()
 
-    def startTest(self, test):
-        """Start a test.
+    def _clear_tags(self):
+        self._global_tags = set(), set()
+        self._test_tags = None
 
-        Not directly passed to the client, but used for handling of tags
-        correctly.
-        """
-        self.decorated.startTest(test)
-        self._current_test_tags = set(), set()
+    def _get_active_tags(self):
+        global_new, global_gone = self._global_tags
+        if self._test_tags is None:
+            return set(global_new)
+        test_new, test_gone = self._test_tags
+        return global_new.difference(test_gone).union(test_new)
 
-    def stopTest(self, test):
-        """Stop a test.
+    def _get_current_scope(self):
+        if self._test_tags:
+            return self._test_tags
+        return self._global_tags
 
-        Not directly passed to the client, but used for handling of tags
-        correctly.
-        """
-        # Tags to output for this test.
-        if self._current_test_tags[0] or self._current_test_tags[1]:
-            self.decorated.tags(*self._current_test_tags)
-        self.decorated.stopTest(test)
-        self._current_test_tags = None
+    def _flush_current_scope(self, tag_receiver):
+        new_tags, gone_tags = self._get_current_scope()
+        if new_tags or gone_tags:
+            tag_receiver.tags(new_tags, gone_tags)
+        if self._test_tags:
+            self._test_tags = set(), set()
+        else:
+            self._global_tags = set(), set()
+
+    def startTestRun(self):
+        self._clear_tags()
+
+    def startTest(self, test):
+        self._test_tags = set(), set()
+
+    def stopTest(self, test):
+        self._test_tags = None
 
     def tags(self, new_tags, gone_tags):
         """Handle tag instructions.
@@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator):
         :param new_tags: Tags to add,
         :param gone_tags: Tags to remove.
         """
-        if self._current_test_tags is not None:
-            # gather the tags until the test stops.
-            self._current_test_tags[0].update(new_tags)
-            self._current_test_tags[0].difference_update(gone_tags)
-            self._current_test_tags[1].update(gone_tags)
-            self._current_test_tags[1].difference_update(new_tags)
-        else:
-            return self.decorated.tags(new_tags, gone_tags)
+        current_new_tags, current_gone_tags = self._get_current_scope()
+        current_new_tags.update(new_tags)
+        current_new_tags.difference_update(gone_tags)
+        current_gone_tags.update(gone_tags)
+        current_gone_tags.difference_update(new_tags)
+
+
+class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
+    """Collapses many 'tags' calls into one where possible."""
+
+    def __init__(self, result):
+        super(TagCollapsingDecorator, self).__init__(result)
+        self._clear_tags()
+
+    def _before_event(self):
+        self._flush_current_scope(self.decorated)
+
+    def tags(self, new_tags, gone_tags):
+        TagsMixin.tags(self, new_tags, gone_tags)
 
 
 class TimeCollapsingDecorator(HookedTestResultDecorator):
@@ -273,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
         self._last_received_time = a_time
 
 
-def all_true(bools):
-    """Return True if all of 'bools' are True. False otherwise."""
-    for b in bools:
-        if not b:
-            return False
-    return True
+def and_predicates(predicates):
+    """Return a predicate that is true iff all predicates are true."""
+    # XXX: Should probably be in testtools to be better used by matchers. jml
+    return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
 
 
-class TestResultFilter(TestResultDecorator):
-    """A pyunit TestResult interface implementation which filters tests.
+def make_tag_filter(with_tags, without_tags):
+    """Make a callback that checks tests against tags."""
 
-    Tests that pass the filter are handed on to another TestResult instance
-    for further processing/reporting. To obtain the filtered results,
-    the other instance must be interrogated.
+    with_tags = with_tags and set(with_tags) or None
+    without_tags = without_tags and set(without_tags) or None
 
-    :ivar result: The result that tests are passed to after filtering.
-    :ivar filter_predicate: The callback run to decide whether to pass
-        a result.
-    """
+    def check_tags(test, outcome, err, details, tags):
+        if with_tags and not with_tags <= tags:
+            return False
+        if without_tags and bool(without_tags & tags):
+            return False
+        return True
 
-    def __init__(self, result, filter_error=False, filter_failure=False,
-        filter_success=True, filter_skip=False, filter_xfail=False,
-        filter_predicate=None, fixup_expected_failures=None):
-        """Create a FilterResult object filtering to result.
+    return check_tags
 
-        :param filter_error: Filter out errors.
-        :param filter_failure: Filter out failures.
-        :param filter_success: Filter out successful tests.
-        :param filter_skip: Filter out skipped tests.
-        :param filter_xfail: Filter out expected failure tests.
-        :param filter_predicate: A callable taking (test, outcome, err,
-            details) and returning True if the result should be passed
-            through.  err and details may be none if no error or extra
-            metadata is available. outcome is the name of the outcome such
-            as 'success' or 'failure'.
-        :param fixup_expected_failures: Set of test ids to consider known
-            failing.
-        """
-        super(TestResultFilter, self).__init__(result)
+
+class _PredicateFilter(TestResultDecorator, TagsMixin):
+
+    def __init__(self, result, predicate):
+        super(_PredicateFilter, self).__init__(result)
+        self._clear_tags()
         self.decorated = TimeCollapsingDecorator(
             TagCollapsingDecorator(self.decorated))
-        predicates = []
-        if filter_error:
-            predicates.append(lambda t, outcome, e, d: outcome != 'error')
-        if filter_failure:
-            predicates.append(lambda t, outcome, e, d: outcome != 'failure')
-        if filter_success:
-            predicates.append(lambda t, outcome, e, d: outcome != 'success')
-        if filter_skip:
-            predicates.append(lambda t, outcome, e, d: outcome != 'skip')
-        if filter_xfail:
-            predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
-        if filter_predicate is not None:
-            predicates.append(filter_predicate)
-        self.filter_predicate = (
-            lambda test, outcome, err, details:
-                all_true(p(test, outcome, err, details) for p in predicates))
+        self._predicate = predicate
         # The current test (for filtering tags)
         self._current_test = None
         # Has the current test been filtered (for outputting test tags)
         self._current_test_filtered = None
         # Calls to this result that we don't know whether to forward on yet.
         self._buffered_calls = []
-        if fixup_expected_failures is None:
-            self._fixup_expected_failures = frozenset()
-        else:
-            self._fixup_expected_failures = fixup_expected_failures
+
+    def filter_predicate(self, test, outcome, error, details):
+        return self._predicate(
+            test, outcome, error, details, self._get_active_tags())
 
     def addError(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'error', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addError', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addError', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addFailure(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'failure', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addFailure', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
@@ -370,17 +365,6 @@ class TestResultFilter(TestResultDecorator):
         else:
             self._filtered()
 
-    def addSuccess(self, test, details=None):
-        if (self.filter_predicate(test, 'success', None, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addUnexpectedSuccess', [test], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addSuccess', [test], {'details': details}))
-        else:
-            self._filtered()
-
     def addExpectedFailure(self, test, err=None, details=None):
         if self.filter_predicate(test, 'expectedfailure', err, details):
             self._buffered_calls.append(
@@ -392,18 +376,23 @@ class TestResultFilter(TestResultDecorator):
         self._buffered_calls.append(
             ('addUnexpectedSuccess', [test], {'details': details}))
 
+    def addSuccess(self, test, details=None):
+        if (self.filter_predicate(test, 'success', None, details)):
+            self._buffered_calls.append(
+                ('addSuccess', [test], {'details': details}))
+        else:
+            self._filtered()
+
     def _filtered(self):
         self._current_test_filtered = True
 
-    def _failure_expected(self, test):
-        return (test.id() in self._fixup_expected_failures)
-
     def startTest(self, test):
         """Start a test.
 
         Not directly passed to the client, but used for handling of tags
         correctly.
         """
+        TagsMixin.startTest(self, test)
         self._current_test = test
         self._current_test_filtered = False
         self._buffered_calls.append(('startTest', [test], {}))
@@ -415,19 +404,23 @@ class TestResultFilter(TestResultDecorator):
         correctly.
         """
         if not self._current_test_filtered:
-            # Tags to output for this test.
             for method, args, kwargs in self._buffered_calls:
                 getattr(self.decorated, method)(*args, **kwargs)
             self.decorated.stopTest(test)
         self._current_test = None
         self._current_test_filtered = None
         self._buffered_calls = []
+        TagsMixin.stopTest(self, test)
 
-    def time(self, a_time):
+    def tags(self, new_tags, gone_tags):
+        TagsMixin.tags(self, new_tags, gone_tags)
         if self._current_test is not None:
-            self._buffered_calls.append(('time', [a_time], {}))
+            self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
         else:
-            return self.decorated.time(a_time)
+            return super(_PredicateFilter, self).tags(new_tags, gone_tags)
+
+    def time(self, a_time):
+        return self.decorated.time(a_time)
 
     def id_to_orig_id(self, id):
         if id.startswith("subunit.RemotedTestCase."):
@@ -435,6 +428,95 @@ class TestResultFilter(TestResultDecorator):
         return id
 
 
+class TestResultFilter(TestResultDecorator):
+    """A pyunit TestResult interface implementation which filters tests.
+
+    Tests that pass the filter are handed on to another TestResult instance
+    for further processing/reporting. To obtain the filtered results,
+    the other instance must be interrogated.
+
+    :ivar result: The result that tests are passed to after filtering.
+    :ivar filter_predicate: The callback run to decide whether to pass
+        a result.
+    """
+
+    def __init__(self, result, filter_error=False, filter_failure=False,
+        filter_success=True, filter_skip=False, filter_xfail=False,
+        filter_predicate=None, fixup_expected_failures=None):
+        """Create a FilterResult object filtering to result.
+
+        :param filter_error: Filter out errors.
+        :param filter_failure: Filter out failures.
+        :param filter_success: Filter out successful tests.
+        :param filter_skip: Filter out skipped tests.
+        :param filter_xfail: Filter out expected failure tests.
+        :param filter_predicate: A callable taking (test, outcome, err,
+            details, tags) and returning True if the result should be passed
+            through.  err and details may be none if no error or extra
+            metadata is available. outcome is the name of the outcome such
+            as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
+            are still supported but should be updated to accept the tags
+            parameter for efficiency.
+        :param fixup_expected_failures: Set of test ids to consider known
+            failing.
+        """
+        predicates = []
+        if filter_error:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'error')
+        if filter_failure:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'failure')
+        if filter_success:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'success')
+        if filter_skip:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'skip')
+        if filter_xfail:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
+        if filter_predicate is not None:
+            def compat(test, outcome, error, details, tags):
+                # 0.0.7 and earlier did not support the 'tags' parameter.
+                try:
+                    return filter_predicate(
+                        test, outcome, error, details, tags)
+                except TypeError:
+                    return filter_predicate(test, outcome, error, details)
+            predicates.append(compat)
+        predicate = and_predicates(predicates)
+        super(TestResultFilter, self).__init__(
+            _PredicateFilter(result, predicate))
+        if fixup_expected_failures is None:
+            self._fixup_expected_failures = frozenset()
+        else:
+            self._fixup_expected_failures = fixup_expected_failures
+
+    def addError(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addError(
+                test, err=err, details=details)
+
+    def addFailure(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addFailure(
+                test, err=err, details=details)
+
+    def addSuccess(self, test, details=None):
+        if self._failure_expected(test):
+            self.addUnexpectedSuccess(test, details=details)
+        else:
+            super(TestResultFilter, self).addSuccess(test, details=details)
+
+    def _failure_expected(self, test):
+        return (test.id() in self._fixup_expected_failures)
+
+
 class TestIdPrintingResult(testtools.TestResult):
 
     def __init__(self, stream, show_times=False):
@@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult):
     def wasSuccessful(self):
         "Tells whether or not this result was a success"
         return self.failed_tests == 0
+
+
+class TestByTestResult(testtools.TestResult):
+    """Call something every time a test completes."""
+
+# XXX: In testtools since lp:testtools r249.  Once that's released, just
+# import that.
+
+    def __init__(self, on_test):
+        """Construct a ``TestByTestResult``.
+
+        :param on_test: A callable that take a test case, a status (one of
+            "success", "failure", "error", "skip", or "xfail"), a start time
+            (a ``datetime`` with timezone), a stop time, an iterable of tags,
+            and a details dict. Is called at the end of each test (i.e. on
+            ``stopTest``) with the accumulated values for that test.
+        """
+        super(TestByTestResult, self).__init__()
+        self._on_test = on_test
+
+    def startTest(self, test):
+        super(TestByTestResult, self).startTest(test)
+        self._start_time = self._now()
+        # There's no supported (i.e. tested) behaviour that relies on these
+        # being set, but it makes me more comfortable all the same. -- jml
+        self._status = None
+        self._details = None
+        self._stop_time = None
+
+    def stopTest(self, test):
+        self._stop_time = self._now()
+        super(TestByTestResult, self).stopTest(test)
+        self._on_test(
+            test=test,
+            status=self._status,
+            start_time=self._start_time,
+            stop_time=self._stop_time,
+            # current_tags is new in testtools 0.9.13.
+            tags=getattr(self, 'current_tags', None),
+            details=self._details)
+
+    def _err_to_details(self, test, err, details):
+        if details:
+            return details
+        return {'traceback': TracebackContent(err, test)}
+
+    def addSuccess(self, test, details=None):
+        super(TestByTestResult, self).addSuccess(test)
+        self._status = 'success'
+        self._details = details
+
+    def addFailure(self, test, err=None, details=None):
+        super(TestByTestResult, self).addFailure(test, err, details)
+        self._status = 'failure'
+        self._details = self._err_to_details(test, err, details)
+
+    def addError(self, test, err=None, details=None):
+        super(TestByTestResult, self).addError(test, err, details)
+        self._status = 'error'
+        self._details = self._err_to_details(test, err, details)
+
+    def addSkip(self, test, reason=None, details=None):
+        super(TestByTestResult, self).addSkip(test, reason, details)
+        self._status = 'skip'
+        if details is None:
+            details = {'reason': text_content(reason)}
+        elif reason:
+            # XXX: What if details already has 'reason' key?
+            details['reason'] = text_content(reason)
+        self._details = details
+
+    def addExpectedFailure(self, test, err=None, details=None):
+        super(TestByTestResult, self).addExpectedFailure(test, err, details)
+        self._status = 'xfail'
+        self._details = self._err_to_details(test, err, details)
+
+    def addUnexpectedSuccess(self, test, details=None):
+        super(TestByTestResult, self).addUnexpectedSuccess(test, details)
+        self._status = 'success'
+        self._details = details
+
+
+class CsvResult(TestByTestResult):
+
+    def __init__(self, stream):
+        super(CsvResult, self).__init__(self._on_test)
+        self._write_row = csv.writer(stream).writerow
+
+    def _on_test(self, test, status, start_time, stop_time, tags, details):
+        self._write_row([test.id(), status, start_time, stop_time])
+
+    def startTestRun(self):
+        super(CsvResult, self).startTestRun()
+        self._write_row(['test', 'status', 'start_time', 'stop_time'])
index 618e4952d7938c36dc0df5b41dd1be118155bdf7..91838f6d6fb3d24356f63947bb598294c0b4f11d 100755 (executable)
@@ -7,15 +7,15 @@ if len(sys.argv) == 2:
     # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args 
     # uses this code path to be sure that the arguments were passed to
     # sample-script.py
-    print "test fail"
-    print "error fail"
+    print("test fail")
+    print("error fail")
     sys.exit(0)
-print "test old mcdonald"
-print "success old mcdonald"
-print "test bing crosby"
-print "failure bing crosby ["
-print "foo.c:53:ERROR invalid state"
-print "]"
-print "test an error"
-print "error an error"
+print("test old mcdonald")
+print("success old mcdonald")
+print("test bing crosby")
+print("failure bing crosby [")
+print("foo.c:53:ERROR invalid state")
+print("]")
+print("test an error")
+print("error an error")
 sys.exit(0)
index d5550842bf27ba4244c3f17ca6de16a2a2303e04..fc73dfc409d169c3c927c96131ab0c36e0e09c08 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 import sys
-print "test old mcdonald"
-print "success old mcdonald"
-print "test bing crosby"
-print "success bing crosby"
+print("test old mcdonald")
+print("success old mcdonald")
+print("test bing crosby")
+print("success bing crosby")
 sys.exit(0)
index 5a96bcf30e7cf5f10e1ee8f2374e4b903a3be373..10519ed086883e0ead6aa7c636e35665f59f01b4 100644 (file)
@@ -14,7 +14,7 @@
 #  limitations under that license.
 #
 
-from cStringIO import StringIO
+from testtools.compat import BytesIO
 import unittest
 
 from testtools import PlaceHolder
@@ -42,7 +42,7 @@ class TimeCollectingTestResult(unittest.TestResult):
 class TestSubunitTestRunner(unittest.TestCase):
 
     def test_includes_timing_output(self):
-        io = StringIO()
+        io = BytesIO()
         runner = SubunitTestRunner(stream=io)
         test = PlaceHolder('name')
         runner.run(test)
index 06754840eb94735db420c75b06b33b2646e1a311..33b924824d32453f2385fdff7888c62e836dfe0a 100644 (file)
 """Tests for subunit.TestResultFilter."""
 
 from datetime import datetime
+import os
+import subprocess
+import sys
 from subunit import iso8601
 import unittest
 
 from testtools import TestCase
-from testtools.compat import _b, BytesIO, StringIO
+from testtools.compat import _b, BytesIO
 from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
-from subunit.test_results import TestResultFilter
+from subunit.test_results import make_tag_filter, TestResultFilter
 
 
 class TestTestResultFilter(TestCase):
@@ -77,6 +80,40 @@ xfail todo
             filtered_result.failures])
         self.assertEqual(4, filtered_result.testsRun)
 
+    def test_tag_filter(self):
+        tag_filter = make_tag_filter(['global'], ['local'])
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(
+            result, filter_success=False, filter_predicate=tag_filter)
+        self.run_tests(result_filter)
+        tests_included = [
+            event[1] for event in result._events if event[0] == 'startTest']
+        tests_expected = list(map(
+            subunit.RemotedTestCase,
+            ['passed', 'error', 'skipped', 'todo']))
+        self.assertEquals(tests_expected, tests_included)
+
+    def test_tags_tracked_correctly(self):
+        tag_filter = make_tag_filter(['a'], [])
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(
+            result, filter_success=False, filter_predicate=tag_filter)
+        input_stream = _b(
+            "test: foo\n"
+            "tags: a\n"
+            "successful: foo\n"
+            "test: bar\n"
+            "successful: bar\n")
+        self.run_tests(result_filter, input_stream)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEquals(
+            [('startTest', foo),
+             ('tags', set(['a']), set()),
+             ('addSuccess', foo),
+             ('stopTest', foo),
+             ],
+            result._events)
+
     def test_exclude_errors(self):
         filtered_result = unittest.TestResult()
         result_filter = TestResultFilter(filtered_result, filter_error=True)
@@ -151,6 +188,8 @@ xfail todo
 
     def test_filter_predicate(self):
         """You can filter by predicate callbacks"""
+        # 0.0.7 and earlier did not support the 'tags' parameter, so we need
+        # to test that we still support behaviour without it.
         filtered_result = unittest.TestResult()
         def filter_cb(test, outcome, err, details):
             return outcome == 'success'
@@ -161,6 +200,18 @@ xfail todo
         # Only success should pass
         self.assertEqual(1, filtered_result.testsRun)
 
+    def test_filter_predicate_with_tags(self):
+        """You can filter by predicate callbacks that accept tags"""
+        filtered_result = unittest.TestResult()
+        def filter_cb(test, outcome, err, details, tags):
+            return outcome == 'success'
+        result_filter = TestResultFilter(filtered_result,
+            filter_predicate=filter_cb,
+            filter_success=False)
+        self.run_tests(result_filter)
+        # Only success should pass
+        self.assertEqual(1, filtered_result.testsRun)
+
     def test_time_ordering_preserved(self):
         # Passing a subunit stream through TestResultFilter preserves the
         # relative ordering of 'time' directives and any other subunit
@@ -179,14 +230,41 @@ xfail todo
         result_filter = TestResultFilter(result)
         self.run_tests(result_filter, subunit_stream)
         foo = subunit.RemotedTestCase('foo')
-        self.assertEquals(
+        self.maxDiff = None
+        self.assertEqual(
             [('time', date_a),
-             ('startTest', foo),
              ('time', date_b),
+             ('startTest', foo),
              ('addError', foo, {}),
              ('stopTest', foo),
              ('time', date_c)], result._events)
 
+    def test_time_passes_through_filtered_tests(self):
+        # Passing a subunit stream through TestResultFilter preserves 'time'
+        # directives even if a specific test is filtered out.
+        date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
+        date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
+        date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
+        subunit_stream = _b('\n'.join([
+            "time: %s",
+            "test: foo",
+            "time: %s",
+            "success: foo",
+            "time: %s",
+            ""]) % (date_a, date_b, date_c))
+        result = ExtendedTestResult()
+        result_filter = TestResultFilter(result)
+        result_filter.startTestRun()
+        self.run_tests(result_filter, subunit_stream)
+        result_filter.stopTestRun()
+        foo = subunit.RemotedTestCase('foo')
+        self.maxDiff = None
+        self.assertEqual(
+            [('startTestRun',),
+             ('time', date_a),
+             ('time', date_c),
+             ('stopTestRun',),], result._events)
+
     def test_skip_preserved(self):
         subunit_stream = _b('\n'.join([
             "test: foo",
@@ -201,6 +279,90 @@ xfail todo
              ('addSkip', foo, {}),
              ('stopTest', foo), ], result._events)
 
+    if sys.version_info < (2, 7):
+        # These tests require Python >=2.7.
+        del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
+
+
+class TestFilterCommand(TestCase):
+
+    example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+    def run_command(self, args, stream):
+        root = os.path.dirname(
+            os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+        script_path = os.path.join(root, 'filters', 'subunit-filter')
+        command = [sys.executable, script_path] + list(args)
+        ps = subprocess.Popen(
+            command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE)
+        out, err = ps.communicate(stream)
+        if ps.returncode != 0:
+            raise RuntimeError("%s failed: %s" % (command, err))
+        return out
+
+    def to_events(self, stream):
+        test = subunit.ProtocolTestCase(BytesIO(stream))
+        result = ExtendedTestResult()
+        test.run(result)
+        return result._events
+
+    def test_default(self):
+        output = self.run_command([], _b(
+                "test: foo\n"
+                "skip: foo\n"
+                ))
+        events = self.to_events(output)
+        foo = subunit.RemotedTestCase('foo')
+        self.assertEqual(
+            [('startTest', foo),
+             ('addSkip', foo, {}),
+             ('stopTest', foo)],
+            events)
+
+    def test_tags(self):
+        output = self.run_command(['-s', '--with-tag', 'a'], _b(
+                "tags: a\n"
+                "test: foo\n"
+                "success: foo\n"
+                "tags: -a\n"
+                "test: bar\n"
+                "success: bar\n"
+                "test: baz\n"
+                "tags: a\n"
+                "success: baz\n"
+                ))
+        events = self.to_events(output)
+        foo = subunit.RemotedTestCase('foo')
+        baz = subunit.RemotedTestCase('baz')
+        self.assertEqual(
+            [('tags', set(['a']), set()),
+             ('startTest', foo),
+             ('addSuccess', foo),
+             ('stopTest', foo),
+             ('tags', set(), set(['a'])),
+             ('startTest', baz),
+             ('tags', set(['a']), set()),
+             ('addSuccess', baz),
+             ('stopTest', baz),
+             ],
+            events)
+
 
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
index c93aabd80cdd25e5f9337a99c1ef05306c78c62b..ec6830d03b82333d6a0ac4af9a068e21368d4410 100644 (file)
@@ -18,9 +18,9 @@ import datetime
 import unittest
 import os
 
-from testtools import skipIf, TestCase
-from testtools.compat import _b, _u, BytesIO, StringIO
-from testtools.content import Content, TracebackContent
+from testtools import skipIf, TestCase, TestResult
+from testtools.compat import _b, _u, BytesIO
+from testtools.content import Content, TracebackContent, text_content
 from testtools.content_type import ContentType
 try:
     from testtools.testresult.doubles import (
@@ -40,6 +40,10 @@ from subunit import _remote_exception_str, _remote_exception_str_chunked
 import subunit.iso8601 as iso8601
 
 
+def details_to_str(details):
+    return TestResult()._err_details_to_string(None, details=details)
+
+
 class TestTestImports(unittest.TestCase):
 
     def test_imports(self):
@@ -87,11 +91,12 @@ class TestTestProtocolServerPipe(unittest.TestCase):
     def test_story(self):
         client = unittest.TestResult()
         protocol = subunit.TestProtocolServer(client)
+        traceback = "foo.c:53:ERROR invalid state\n"
         pipe = BytesIO(_b("test old mcdonald\n"
                         "success old mcdonald\n"
                         "test bing crosby\n"
                         "failure bing crosby [\n"
-                        "foo.c:53:ERROR invalid state\n"
+                        +  traceback +
                         "]\n"
                         "test an error\n"
                         "error an error\n"))
@@ -102,9 +107,8 @@ class TestTestProtocolServerPipe(unittest.TestCase):
                          [(an_error, _remote_exception_str + '\n')])
         self.assertEqual(
             client.failures,
-            [(bing, _remote_exception_str + ": Text attachment: traceback\n"
-                "------------\nfoo.c:53:ERROR invalid state\n"
-                "------------\n\n")])
+            [(bing, _remote_exception_str + ": "
+              + details_to_str({'traceback': text_content(traceback)}) + "\n")])
         self.assertEqual(client.testsRun, 3)
 
     def test_non_test_characters_forwarded_immediately(self):
@@ -559,9 +563,7 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
                 value = details
             else:
                 if error_message is not None:
-                    value = subunit.RemoteError(_u("Text attachment: traceback\n"
-                        "------------\n") + _u(error_message) +
-                        _u("------------\n"))
+                    value = subunit.RemoteError(details_to_str(details))
                 else:
                     value = subunit.RemoteError()
             self.assertEqual([
@@ -1299,6 +1301,22 @@ class TestTestProtocolClient(unittest.TestCase):
                 "something\n"
                 "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
 
+    def test_tags_empty(self):
+        self.protocol.tags(set(), set())
+        self.assertEqual(_b(""), self.io.getvalue())
+
+    def test_tags_add(self):
+        self.protocol.tags(set(['foo']), set())
+        self.assertEqual(_b("tags: foo\n"), self.io.getvalue())
+
+    def test_tags_both(self):
+        self.protocol.tags(set(['quux']), set(['bar']))
+        self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue())
+
+    def test_tags_gone(self):
+        self.protocol.tags(set(), set(['bar']))
+        self.assertEqual(_b("tags: -bar\n"), self.io.getvalue())
+
 
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
index 94d22748e80889376bf639d6f361492d38138f03..236dfa22e51f0b15474ff5aae236b7feef86c48a 100644 (file)
 #  limitations under that license.
 #
 
+import csv
 import datetime
+import sys
 import unittest
 
 from testtools import TestCase
+from testtools.compat import StringIO
+from testtools.content import (
+    text_content,
+    TracebackContent,
+    )
 from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
 import subunit.iso8601 as iso8601
 import subunit.test_results
 
+import testtools
+
 
 class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
 
@@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
 
 class TestTagCollapsingDecorator(TestCase):
 
-    def test_tags_forwarded_outside_of_tests(self):
+    def test_tags_collapsed_outside_of_tests(self):
         result = ExtendedTestResult()
         tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
-        tag_collapser.tags(set(['a', 'b']), set())
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
         self.assertEquals(
-            [('tags', set(['a', 'b']), set([]))], result._events)
+            [('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ], result._events)
+
+    def test_tags_collapsed_outside_of_tests_are_flushed(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
+        tag_collapser.addSuccess(self)
+        tag_collapser.stopTest(self)
+        tag_collapser.stopTestRun()
+        self.assertEquals(
+            [('startTestRun',),
+             ('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ('addSuccess', self),
+             ('stopTest', self),
+             ('stopTestRun',),
+             ], result._events)
+
+    def test_tags_forwarded_after_tests(self):
+        test = subunit.RemotedTestCase('foo')
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.startTest(test)
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        tag_collapser.tags(set(['a']), set(['b']))
+        tag_collapser.stopTestRun()
+        self.assertEqual(
+            [('startTestRun',),
+             ('startTest', test),
+             ('addSuccess', test),
+             ('stopTest', test),
+             ('tags', set(['a']), set(['b'])),
+             ('stopTestRun',),
+             ],
+            result._events)
 
     def test_tags_collapsed_inside_of_tests(self):
         result = ExtendedTestResult()
@@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase):
              ('stopTest', test)],
             result._events)
 
+    def test_tags_sent_before_result(self):
+        # Because addSuccess and friends tend to send subunit output
+        # immediately, and because 'tags:' before a result line means
+        # something different to 'tags:' after a result line, we need to be
+        # sure that tags are emitted before 'addSuccess' (or whatever).
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['a']), set()),
+             ('addSuccess', test),
+             ('stopTest', test)],
+            result._events)
+
 
 class TestTimeCollapsingDecorator(TestCase):
 
@@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase):
              ('stopTest', foo)], result._events)
 
 
+class TestByTestResultTests(testtools.TestCase):
+
+    def setUp(self):
+        super(TestByTestResultTests, self).setUp()
+        self.log = []
+        self.result = subunit.test_results.TestByTestResult(self.on_test)
+        if sys.version_info >= (3, 0):
+            self.result._now = iter(range(5)).__next__
+        else:
+            self.result._now = iter(range(5)).next
+
+    def assertCalled(self, **kwargs):
+        defaults = {
+            'test': self,
+            'tags': set(),
+            'details': None,
+            'start_time': 0,
+            'stop_time': 1,
+            }
+        defaults.update(kwargs)
+        self.assertEqual([defaults], self.log)
+
+    def on_test(self, **kwargs):
+        self.log.append(kwargs)
+
+    def test_no_tests_nothing_reported(self):
+        self.result.startTestRun()
+        self.result.stopTestRun()
+        self.assertEqual([], self.log)
+
+    def test_add_success(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success')
+
+    def test_add_success_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_tags(self):
+        if not getattr(self.result, 'tags', None):
+            self.skipTest("No tags in testtools")
+        self.result.tags(['foo'], [])
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', tags=set(['foo']))
+
+    def test_add_error(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addError(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='error',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_error_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addError(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='error', details=details)
+
+    def test_add_failure(self):
+        self.result.startTest(self)
+        try:
+            self.fail("intentional failure")
+        except self.failureException:
+            failure = sys.exc_info()
+        self.result.addFailure(self, failure)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='failure',
+            details={'traceback': TracebackContent(failure, self)})
+
+    def test_add_failure_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='failure', details=details)
+
+    def test_add_xfail(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addExpectedFailure(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='xfail',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_xfail_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addExpectedFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='xfail', details=details)
+
+    def test_add_unexpected_success(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addUnexpectedSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_add_skip_reason(self):
+        self.result.startTest(self)
+        reason = self.getUniqueString()
+        self.result.addSkip(self, reason)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='skip', details={'reason': text_content(reason)})
+
+    def test_add_skip_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSkip(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='skip', details=details)
+
+    def test_twice(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self, details={'foo': 'bar'})
+        self.result.stopTest(self)
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertEqual(
+            [{'test': self,
+              'status': 'success',
+              'start_time': 0,
+              'stop_time': 1,
+              'tags': set(),
+              'details': {'foo': 'bar'}},
+             {'test': self,
+              'status': 'success',
+              'start_time': 2,
+              'stop_time': 3,
+              'tags': set(),
+              'details': None},
+             ],
+            self.log)
+
+
+class TestCsvResult(testtools.TestCase):
+
+    def parse_stream(self, stream):
+        stream.seek(0)
+        reader = csv.reader(stream)
+        return list(reader)
+
+    def test_csv_output(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        if sys.version_info >= (3, 0):
+            result._now = iter(range(5)).__next__
+        else:
+            result._now = iter(range(5)).next
+        result.startTestRun()
+        result.startTest(self)
+        result.addSuccess(self)
+        result.stopTest(self)
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time'],
+             [self.id(), 'success', '0', '1'],
+             ],
+            self.parse_stream(stream))
+
+    def test_just_header_when_no_tests(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        result.startTestRun()
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time']],
+            self.parse_stream(stream))
+
+    def test_no_output_before_events(self):
+        stream = StringIO()
+        subunit.test_results.CsvResult(stream)
+        self.assertEqual([], self.parse_stream(stream))
+
+
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
     result = loader.loadTestsFromName(__name__)
index 8ecc6cd3fb4c47f1d4237bf8a8392cb6d08bba62..691e3b34da81ac240cc544a7bd17ad9563212e91 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # -*- Mode: python -*-
 #
 # Copyright (C) 2004 Canonical.com
index bb51a24fccc5607b12511dcc88586be8a38f2ad2..a78eb999d7cf5262b125d26532a721ca5933e7fa 100755 (executable)
@@ -13,19 +13,22 @@ else:
         ]
     }
 
-try:
+
+def _get_version_from_file(filename, start_of_line, split_marker):
+    """Extract version from file, giving last matching value or None"""
+    try:
+        return [x for x in open(filename)
+            if x.startswith(start_of_line)][-1].split(split_marker)[1].strip()
+    except (IOError, IndexError):
+        return None
+
+
+VERSION = (
     # Assume we are in a distribution, which has PKG-INFO
-    version_lines = [x for x in open('PKG-INFO').readlines()
-                        if x.startswith('Version:')]
-    version_line = version_lines and version_lines[-1] or 'VERSION = 0.0'
-    VERSION = version_line.split(':')[1].strip()
-except IOError:
+    _get_version_from_file('PKG-INFO', 'Version:', ':')
     # Must be a development checkout, so use the Makefile
-    version_lines = [x for x in open('Makefile').readlines()
-                        if x.startswith('VERSION')]
-    version_line = version_lines and version_lines[-1] or 'VERSION = 0.0'
-    VERSION = version_line.split('=')[1].strip()
+    or _get_version_from_file('Makefile', 'VERSION', '=')
+    or "0.0")
 
 
 setup(
index b78eee6946e87ffc5dc478c79af4ee7e88b232db..00b0844ddaa4b09b3229a36d284f4de8ba8ddbec 100755 (executable)
@@ -27,7 +27,7 @@
 . ${SHELL_SHARE}subunit.sh
 
 echo 'test: subunit_start_test output'
-func_output=$(subunit_start_test "foo bar")
+func_output=$(subunit_start_test "foo bar"|grep -v 'time:')
 func_status=$?
 if [ $func_status == 0 -a "x$func_output" = "xtest: foo bar" ]; then
   echo 'success: subunit_start_test output'
@@ -40,7 +40,7 @@ else
 fi
 
 subunit_start_test "subunit_pass_test output"
-func_output=$(subunit_pass_test "foo bar")
+func_output=$(subunit_pass_test "foo bar"|grep -v 'time:')
 func_status=$?
 if [ $func_status == 0 -a "x$func_output" = "xsuccess: foo bar" ]; then
   subunit_pass_test "subunit_pass_test output"
@@ -53,12 +53,12 @@ else
 fi
 
 subunit_start_test "subunit_fail_test output"
-func_output=$(subunit_fail_test "foo bar" <<END
+func_output=$((subunit_fail_test "foo bar" <<END
 something
   wrong
 here
 END
-)
+)|grep -v 'time:')
 func_status=$?
 if [ $func_status == 0 -a "x$func_output" = "xfailure: foo bar [
 something
@@ -75,12 +75,12 @@ else
 fi
 
 subunit_start_test "subunit_error_test output"
-func_output=$(subunit_error_test "foo bar" <<END
+func_output=$((subunit_error_test "foo bar" <<END
 something
   died
 here
 END
-)
+)| grep -v 'time:')
 func_status=$?
 if [ $func_status == 0 -a "x$func_output" = "xerror: foo bar [
 something