2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``,
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
81 $ python -m subunit.run mylib.tests.test_suite
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
89 `ExecTestCase`` is a convenience wrapper for running an external
90 program to get a Subunit stream and then report that back to an arbitrary
93 class AggregateTests(subunit.ExecTestCase):
95 def test_script_one(self):
98 def test_script_two(self):
101 # Normally your normal test loading would take of this automatically,
102 # It is only spelt out in detail here for clarity.
103 suite = unittest.TestSuite([AggregateTests("test_script_one"),
104 AggregateTests("test_script_two")])
105 # Create any TestResult class you like.
106 result = unittest._TextTestResult(sys.stdout)
107 # And run your suite as normal, Subunit will exec each external script as
108 # needed and report to your result object.
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
117 * subunit.test_results contains TestResult helper classes.
123 from StringIO import StringIO
130 import chunked, content, content_type, details, test_results
141 return subunit.tests.test_suite()
144 def join_dir(base_path, path):
146 Returns an absolute path to C{path}, calculated relative to the parent
149 @param base_path: A path to a file or directory.
150 @param path: An absolute path, or a path relative to the containing
151 directory of C{base_path}.
153 @return: An absolute path to C{path}.
155 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
158 def tags_to_new_gone(tags):
159 """Split a list of tags into a new_set and a gone_set."""
164 gone_tags.add(tag[1:])
167 return new_tags, gone_tags
170 class DiscardStream(object):
171 """A filelike object which discards what is written to it."""
173 def write(self, bytes):
177 class _ParserState(object):
178 """State for the subunit parser."""
180 def __init__(self, parser):
183 def addError(self, offset, line):
184 """An 'error:' directive has been read."""
185 self.parser.stdOutLineReceived(line)
187 def addExpectedFail(self, offset, line):
188 """An 'xfail:' directive has been read."""
189 self.parser.stdOutLineReceived(line)
191 def addFailure(self, offset, line):
192 """A 'failure:' directive has been read."""
193 self.parser.stdOutLineReceived(line)
195 def addSkip(self, offset, line):
196 """A 'skip:' directive has been read."""
197 self.parser.stdOutLineReceived(line)
199 def addSuccess(self, offset, line):
200 """A 'success:' directive has been read."""
201 self.parser.stdOutLineReceived(line)
203 def lineReceived(self, line):
204 """a line has been received."""
205 parts = line.split(None, 1)
208 offset = len(cmd) + 1
210 if cmd in ('test', 'testing'):
211 self.startTest(offset, line)
213 self.addError(offset, line)
214 elif cmd == 'failure':
215 self.addFailure(offset, line)
216 elif cmd == 'progress':
217 self.parser._handleProgress(offset, line)
219 self.addSkip(offset, line)
220 elif cmd in ('success', 'successful'):
221 self.addSuccess(offset, line)
222 elif cmd in ('tags',):
223 self.parser._handleTags(offset, line)
224 elif cmd in ('time',):
225 self.parser._handleTime(offset, line)
227 self.addExpectedFail(offset, line)
229 self.parser.stdOutLineReceived(line)
231 self.parser.stdOutLineReceived(line)
233 def lostConnection(self):
234 """Connection lost."""
235 self.parser._lostConnectionInTest('unknown state of ')
237 def startTest(self, offset, line):
238 """A test start command received."""
239 self.parser.stdOutLineReceived(line)
242 class _InTest(_ParserState):
243 """State for the subunit parser after reading a test: directive."""
245 def _outcome(self, offset, line, no_details, details_state):
246 """An outcome directive has been read.
248 :param no_details: Callable to call when no details are presented.
249 :param details_state: The state to switch to for details
250 processing of this outcome.
252 if self.parser.current_test_description == line[offset:-1]:
253 self.parser._state = self.parser._outside_test
254 self.parser.current_test_description = None
256 self.parser.client.stopTest(self.parser._current_test)
257 self.parser._current_test = None
258 elif self.parser.current_test_description + " [" == line[offset:-1]:
259 self.parser._state = details_state
260 details_state.set_simple()
261 elif self.parser.current_test_description + " [ multipart" == \
263 self.parser._state = details_state
264 details_state.set_multipart()
266 self.parser.stdOutLineReceived(line)
269 self.parser.client.addError(self.parser._current_test,
272 def addError(self, offset, line):
273 """An 'error:' directive has been read."""
274 self._outcome(offset, line, self._error,
275 self.parser._reading_error_details)
278 self.parser.client.addExpectedFailure(self.parser._current_test,
281 def addExpectedFail(self, offset, line):
282 """An 'xfail:' directive has been read."""
283 self._outcome(offset, line, self._xfail,
284 self.parser._reading_xfail_details)
287 self.parser.client.addFailure(self.parser._current_test, details={})
289 def addFailure(self, offset, line):
290 """A 'failure:' directive has been read."""
291 self._outcome(offset, line, self._failure,
292 self.parser._reading_failure_details)
295 self.parser.client.addSkip(self.parser._current_test, details={})
297 def addSkip(self, offset, line):
298 """A 'skip:' directive has been read."""
299 self._outcome(offset, line, self._skip,
300 self.parser._reading_skip_details)
303 self.parser.client.addSuccess(self.parser._current_test, details={})
305 def addSuccess(self, offset, line):
306 """A 'success:' directive has been read."""
307 self._outcome(offset, line, self._succeed,
308 self.parser._reading_success_details)
310 def lostConnection(self):
311 """Connection lost."""
312 self.parser._lostConnectionInTest('')
315 class _OutSideTest(_ParserState):
316 """State for the subunit parser outside of a test context."""
318 def lostConnection(self):
319 """Connection lost."""
321 def startTest(self, offset, line):
322 """A test start command received."""
323 self.parser._state = self.parser._in_test
324 self.parser._current_test = RemotedTestCase(line[offset:-1])
325 self.parser.current_test_description = line[offset:-1]
326 self.parser.client.startTest(self.parser._current_test)
329 class _ReadingDetails(_ParserState):
330 """Common logic for readin state details."""
332 def endDetails(self):
333 """The end of a details section has been reached."""
334 self.parser._state = self.parser._outside_test
335 self.parser.current_test_description = None
336 self._report_outcome()
337 self.parser.client.stopTest(self.parser._current_test)
339 def lineReceived(self, line):
340 """a line has been received."""
341 self.details_parser.lineReceived(line)
343 def lostConnection(self):
344 """Connection lost."""
345 self.parser._lostConnectionInTest('%s report of ' %
346 self._outcome_label())
348 def _outcome_label(self):
349 """The label to describe this outcome."""
350 raise NotImplementedError(self._outcome_label)
352 def set_simple(self):
353 """Start a simple details parser."""
354 self.details_parser = details.SimpleDetailsParser(self)
356 def set_multipart(self):
357 """Start a multipart details parser."""
358 self.details_parser = details.MultipartDetailsParser(self)
361 class _ReadingFailureDetails(_ReadingDetails):
362 """State for the subunit parser when reading failure details."""
364 def _report_outcome(self):
365 self.parser.client.addFailure(self.parser._current_test,
366 details=self.details_parser.get_details())
368 def _outcome_label(self):
372 class _ReadingErrorDetails(_ReadingDetails):
373 """State for the subunit parser when reading error details."""
375 def _report_outcome(self):
376 self.parser.client.addError(self.parser._current_test,
377 details=self.details_parser.get_details())
379 def _outcome_label(self):
383 class _ReadingExpectedFailureDetails(_ReadingDetails):
384 """State for the subunit parser when reading xfail details."""
386 def _report_outcome(self):
387 self.parser.client.addExpectedFailure(self.parser._current_test,
388 details=self.details_parser.get_details())
390 def _outcome_label(self):
394 class _ReadingSkipDetails(_ReadingDetails):
395 """State for the subunit parser when reading skip details."""
397 def _report_outcome(self):
398 self.parser.client.addSkip(self.parser._current_test,
399 details=self.details_parser.get_details("skip"))
401 def _outcome_label(self):
405 class _ReadingSuccessDetails(_ReadingDetails):
406 """State for the subunit parser when reading success details."""
408 def _report_outcome(self):
409 self.parser.client.addSuccess(self.parser._current_test,
410 details=self.details_parser.get_details("success"))
412 def _outcome_label(self):
416 class TestProtocolServer(object):
417 """A parser for subunit.
419 :ivar tags: The current tags associated with the protocol stream.
422 def __init__(self, client, stream=None):
423 """Create a TestProtocolServer instance.
425 :param client: An object meeting the unittest.TestResult protocol.
426 :param stream: The stream that lines received which are not part of the
427 subunit protocol should be written to. This allows custom handling
428 of mixed protocols. By default, sys.stdout will be used for
431 self.client = test_results.ExtendedToOriginalDecorator(client)
434 self._stream = stream
435 # state objects we can switch too
436 self._in_test = _InTest(self)
437 self._outside_test = _OutSideTest(self)
438 self._reading_error_details = _ReadingErrorDetails(self)
439 self._reading_failure_details = _ReadingFailureDetails(self)
440 self._reading_skip_details = _ReadingSkipDetails(self)
441 self._reading_success_details = _ReadingSuccessDetails(self)
442 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
443 # start with outside test.
444 self._state = self._outside_test
446 def _handleProgress(self, offset, line):
447 """Process a progress directive."""
448 line = line[offset:].strip()
450 whence = PROGRESS_CUR
453 whence = PROGRESS_PUSH
456 whence = PROGRESS_POP
459 whence = PROGRESS_SET
461 self.client.progress(delta, whence)
463 def _handleTags(self, offset, line):
464 """Process a tags command."""
465 tags = line[offset:].split()
466 new_tags, gone_tags = tags_to_new_gone(tags)
467 self.client.tags(new_tags, gone_tags)
469 def _handleTime(self, offset, line):
470 # Accept it, but do not do anything with it yet.
472 event_time = iso8601.parse_date(line[offset:-1])
474 raise TypeError("Failed to parse %r, got %r" % (line, e))
475 self.client.time(event_time)
477 def lineReceived(self, line):
478 """Call the appropriate local method for the received line."""
479 self._state.lineReceived(line)
481 def _lostConnectionInTest(self, state_string):
482 error_string = "lost connection during %stest '%s'" % (
483 state_string, self.current_test_description)
484 self.client.addError(self._current_test, RemoteError(error_string))
485 self.client.stopTest(self._current_test)
487 def lostConnection(self):
488 """The input connection has finished."""
489 self._state.lostConnection()
491 def readFrom(self, pipe):
492 """Blocking convenience API to parse an entire stream.
494 :param pipe: A file-like object supporting readlines().
497 for line in pipe.readlines():
498 self.lineReceived(line)
499 self.lostConnection()
501 def _startTest(self, offset, line):
502 """Internal call to change state machine. Override startTest()."""
503 self._state.startTest(offset, line)
505 def stdOutLineReceived(self, line):
506 self._stream.write(line)
509 class RemoteException(Exception):
510 """An exception that occured remotely to Python."""
512 def __eq__(self, other):
514 return self.args == other.args
515 except AttributeError:
519 class TestProtocolClient(unittest.TestResult):
520 """A TestResult which generates a subunit stream for a test run.
522 # Get a TestSuite or TestCase to run
524 # Create a stream (any object with a 'write' method)
525 stream = file('tests.log', 'wb')
526 # Create a subunit result object which will output to the stream
527 result = subunit.TestProtocolClient(stream)
528 # Optionally, to get timing data for performance analysis, wrap the
529 # serialiser with a timing decorator
530 result = subunit.test_results.AutoTimingTestResultDecorator(result)
531 # Run the test suite reporting to the subunit result object
537 def __init__(self, stream):
538 unittest.TestResult.__init__(self)
539 self._stream = stream
541 def addError(self, test, error=None, details=None):
542 """Report an error in test test.
544 Only one of error and details should be provided: conceptually there
545 are two separate methods:
546 addError(self, test, error)
547 addError(self, test, details)
549 :param error: Standard unittest positional argument form - an
551 :param details: New Testing-in-python drafted API; a dict from string
552 to subunit.Content objects.
554 self._addOutcome("error", test, error=error, details=details)
556 def addExpectedFailure(self, test, error=None, details=None):
557 """Report an expected failure in test test.
559 Only one of error and details should be provided: conceptually there
560 are two separate methods:
561 addError(self, test, error)
562 addError(self, test, details)
564 :param error: Standard unittest positional argument form - an
566 :param details: New Testing-in-python drafted API; a dict from string
567 to subunit.Content objects.
569 self._addOutcome("xfail", test, error=error, details=details)
571 def addFailure(self, test, error=None, details=None):
572 """Report a failure in test test.
574 Only one of error and details should be provided: conceptually there
575 are two separate methods:
576 addFailure(self, test, error)
577 addFailure(self, test, details)
579 :param error: Standard unittest positional argument form - an
581 :param details: New Testing-in-python drafted API; a dict from string
582 to subunit.Content objects.
584 self._addOutcome("failure", test, error=error, details=details)
586 def _addOutcome(self, outcome, test, error=None, details=None):
587 """Report a failure in test test.
589 Only one of error and details should be provided: conceptually there
590 are two separate methods:
591 addOutcome(self, test, error)
592 addOutcome(self, test, details)
594 :param outcome: A string describing the outcome - used as the
595 event name in the subunit stream.
596 :param error: Standard unittest positional argument form - an
598 :param details: New Testing-in-python drafted API; a dict from string
599 to subunit.Content objects.
601 self._stream.write("%s: %s" % (outcome, test.id()))
602 if error is None and details is None:
604 if error is not None:
605 self._stream.write(" [\n")
606 for line in self._exc_info_to_string(error, test).splitlines():
607 self._stream.write("%s\n" % line)
609 self._write_details(details)
610 self._stream.write("]\n")
612 def addSkip(self, test, reason=None, details=None):
613 """Report a skipped test."""
615 self._addOutcome("skip", test, error=None, details=details)
617 self._stream.write("skip: %s [\n" % test.id())
618 self._stream.write("%s\n" % reason)
619 self._stream.write("]\n")
621 def addSuccess(self, test, details=None):
622 """Report a success in a test."""
623 self._stream.write("successful: %s" % test.id())
625 self._stream.write("\n")
627 self._write_details(details)
628 self._stream.write("]\n")
629 addUnexpectedSuccess = addSuccess
631 def startTest(self, test):
632 """Mark a test as starting its test run."""
633 self._stream.write("test: %s\n" % test.id())
635 def progress(self, offset, whence):
636 """Provide indication about the progress/length of the test run.
638 :param offset: Information about the number of tests remaining. If
639 whence is PROGRESS_CUR, then offset increases/decreases the
640 remaining test count. If whence is PROGRESS_SET, then offset
641 specifies exactly the remaining test count.
642 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
645 if whence == PROGRESS_CUR and offset > -1:
647 elif whence == PROGRESS_PUSH:
650 elif whence == PROGRESS_POP:
655 self._stream.write("progress: %s%s\n" % (prefix, offset))
657 def time(self, a_datetime):
658 """Inform the client of the time.
660 ":param datetime: A datetime.datetime object.
662 time = a_datetime.astimezone(iso8601.Utc())
663 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
664 time.year, time.month, time.day, time.hour, time.minute,
665 time.second, time.microsecond))
667 def _write_details(self, details):
668 """Output details to the stream.
670 :param details: An extended details dict for a test outcome.
672 self._stream.write(" [ multipart\n")
673 for name, content in sorted(details.iteritems()):
674 self._stream.write("Content-Type: %s/%s" %
675 (content.content_type.type, content.content_type.subtype))
676 parameters = content.content_type.parameters
678 self._stream.write(";")
680 for param, value in parameters.iteritems():
681 param_strs.append("%s=%s" % (param, value))
682 self._stream.write(",".join(param_strs))
683 self._stream.write("\n%s\n" % name)
684 encoder = chunked.Encoder(self._stream)
685 map(encoder.write, content.iter_bytes())
689 """Obey the testtools result.done() interface."""
692 def RemoteError(description=""):
693 if description == "":
695 return (RemoteException, RemoteException(description), None)
698 class RemotedTestCase(unittest.TestCase):
699 """A class to represent test cases run in child processes.
701 Instances of this class are used to provide the Python test API a TestCase
702 that can be printed to the screen, introspected for metadata and so on.
703 However, as they are a simply a memoisation of a test that was actually
704 run in the past by a separate process, they cannot perform any interactive
708 def __eq__ (self, other):
710 return self.__description == other.__description
711 except AttributeError:
714 def __init__(self, description):
715 """Create a psuedo test case with description description."""
716 self.__description = description
718 def error(self, label):
719 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
726 self.error("tearDown")
728 def shortDescription(self):
729 return self.__description
732 return "%s" % (self.__description,)
735 return "%s (%s)" % (self.__description, self._strclass())
738 return "<%s description='%s'>" % \
739 (self._strclass(), self.__description)
741 def run(self, result=None):
742 if result is None: result = self.defaultTestResult()
743 result.startTest(self)
744 result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
745 result.stopTest(self)
749 return "%s.%s" % (cls.__module__, cls.__name__)
752 class ExecTestCase(unittest.TestCase):
753 """A test case which runs external scripts for test fixtures."""
755 def __init__(self, methodName='runTest'):
756 """Create an instance of the class that will use the named test
757 method when executed. Raises a ValueError if the instance does
758 not have a method with the specified name.
760 unittest.TestCase.__init__(self, methodName)
761 testMethod = getattr(self, methodName)
762 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
765 def countTestCases(self):
768 def run(self, result=None):
769 if result is None: result = self.defaultTestResult()
773 """Run the test without collecting errors in a TestResult"""
774 self._run(unittest.TestResult())
776 def _run(self, result):
777 protocol = TestProtocolServer(result)
778 output = subprocess.Popen(self.script, shell=True,
779 stdout=subprocess.PIPE).communicate()[0]
780 protocol.readFrom(StringIO(output))
783 class IsolatedTestCase(unittest.TestCase):
784 """A TestCase which executes in a forked process.
786 Each test gets its own process, which has a performance overhead but will
787 provide excellent isolation from global state (such as django configs,
788 zope utilities and so on).
791 def run(self, result=None):
792 if result is None: result = self.defaultTestResult()
793 run_isolated(unittest.TestCase, self, result)
796 class IsolatedTestSuite(unittest.TestSuite):
797 """A TestSuite which runs its tests in a forked process.
799 This decorator that will fork() before running the tests and report the
800 results from the child process using a Subunit stream. This is useful for
801 handling tests that mutate global state, or are testing C extensions that
805 def run(self, result=None):
806 if result is None: result = unittest.TestResult()
807 run_isolated(unittest.TestSuite, self, result)
810 def run_isolated(klass, self, result):
811 """Run a test suite or case in a subprocess, using the run method on klass.
813 c2pread, c2pwrite = os.pipe()
814 # fixme - error -> result
819 # Close parent's pipe ends
826 # at this point, sys.stdin is redirected, now we want
827 # to filter it to escape ]'s.
828 ### XXX: test and write that bit.
830 result = TestProtocolClient(sys.stdout)
831 klass.run(self, result)
834 # exit HARD, exit NOW.
838 # Close child pipe ends
840 # hookup a protocol engine
841 protocol = TestProtocolServer(result)
842 protocol.readFrom(os.fdopen(c2pread, 'rU'))
844 # TODO return code evaluation.
848 def TAP2SubUnit(tap, subunit):
849 """Filter a TAP pipe into a subunit pipe.
851 :param tap: A tap pipe/stream/file object.
852 :param subunit: A pipe/stream/file object to write subunit results to.
853 :return: The exit code to exit with.
858 client = TestProtocolClient(subunit)
862 def _skipped_test(subunit, plan_start):
863 # Some tests were skipped.
864 subunit.write('test test %d\n' % plan_start)
865 subunit.write('error test %d [\n' % plan_start)
866 subunit.write('test missing from TAP output\n')
868 return plan_start + 1
869 # Test data for the next test to emit
875 if test_name is None:
877 subunit.write("test %s\n" % test_name)
879 subunit.write("%s %s\n" % (result, test_name))
881 subunit.write("%s %s [\n" % (result, test_name))
884 subunit.write("%s\n" % line)
888 if state == BEFORE_PLAN:
889 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
892 _, plan_stop, comment = match.groups()
893 plan_stop = int(plan_stop)
894 if plan_start > plan_stop and plan_stop == 0:
897 subunit.write("test file skip\n")
898 subunit.write("skip file skip [\n")
899 subunit.write("%s\n" % comment)
902 # not a plan line, or have seen one before
903 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
905 # new test, emit current one.
907 status, number, description, directive, directive_comment = match.groups()
912 if description is None:
915 description = ' ' + description
916 if directive is not None:
917 if directive == 'TODO':
919 elif directive == 'SKIP':
921 if directive_comment is not None:
922 log.append(directive_comment)
923 if number is not None:
925 while plan_start < number:
926 plan_start = _skipped_test(subunit, plan_start)
927 test_name = "test %d%s" % (plan_start, description)
930 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
932 reason, = match.groups()
936 extra = ' %s' % reason
938 test_name = "Bail out!%s" % extra
942 match = re.match("\#.*\n", line)
944 log.append(line[:-1])
948 while plan_start <= plan_stop:
949 # record missed tests
950 plan_start = _skipped_test(subunit, plan_start)
954 def tag_stream(original, filtered, tags):
955 """Alter tags on a stream.
957 :param original: The input stream.
958 :param filtered: The output stream.
959 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
962 A 'TAG' command will add the tag to the output stream,
963 and override any existing '-TAG' command in that stream.
965 * A global 'tags: TAG' will be added to the start of the stream.
966 * Any tags commands with -TAG will have the -TAG removed.
968 A '-TAG' command will remove the TAG command from the stream.
970 * A 'tags: -TAG' command will be added to the start of the stream.
971 * Any 'tags: TAG' command will have 'TAG' removed from it.
972 Additionally, any redundant tagging commands (adding a tag globally
973 present, or removing a tag globally removed) are stripped as a
974 by-product of the filtering.
977 new_tags, gone_tags = tags_to_new_gone(tags)
978 def write_tags(new_tags, gone_tags):
979 if new_tags or gone_tags:
980 filtered.write("tags: " + ' '.join(new_tags))
982 for tag in gone_tags:
983 filtered.write("-" + tag)
985 write_tags(new_tags, gone_tags)
986 # TODO: use the protocol parser and thus don't mangle test comments.
987 for line in original:
988 if line.startswith("tags:"):
989 line_tags = line[5:].split()
990 line_new, line_gone = tags_to_new_gone(line_tags)
991 line_new = line_new - gone_tags
992 line_gone = line_gone - new_tags
993 write_tags(line_new, line_gone)
999 class ProtocolTestCase(object):
1000 """Subunit wire protocol to unittest.TestCase adapter.
1002 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1003 calling a ProtocolTestCase or invoking the run() method will make a 'test
1004 run' happen. The 'test run' will simply be a replay of the test activity
1005 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1006 and ``countTestCases`` methods are not supported because there isn't a
1007 sensible mapping for those methods.
1009 # Get a stream (any object with a readline() method), in this case the
1010 # stream output by the example from ``subunit.TestProtocolClient``.
1011 stream = file('tests.log', 'rb')
1012 # Create a parser which will read from the stream and emit
1013 # activity to a unittest.TestResult when run() is called.
1014 suite = subunit.ProtocolTestCase(stream)
1015 # Create a result object to accept the contents of that stream.
1016 result = unittest._TextTestResult(sys.stdout)
1017 # 'run' the tests - process the stream and feed its contents to result.
1021 :seealso: TestProtocolServer (the subunit wire protocol parser).
1024 def __init__(self, stream, passthrough=None):
1025 """Create a ProtocolTestCase reading from stream.
1027 :param stream: A filelike object which a subunit stream can be read
1029 :param passthrough: A stream pass non subunit input on to. If not
1030 supplied, the TestProtocolServer default is used.
1032 self._stream = stream
1033 self._passthrough = passthrough
1035 def __call__(self, result=None):
1036 return self.run(result)
1038 def run(self, result=None):
1040 result = self.defaultTestResult()
1041 protocol = TestProtocolServer(result, self._passthrough)
1042 line = self._stream.readline()
1044 protocol.lineReceived(line)
1045 line = self._stream.readline()
1046 protocol.lostConnection()
1049 class TestResultStats(unittest.TestResult):
1050 """A pyunit TestResult interface implementation for making statistics.
1052 :ivar total_tests: The total tests seen.
1053 :ivar passed_tests: The tests that passed.
1054 :ivar failed_tests: The tests that failed.
1055 :ivar seen_tags: The tags seen across all tests.
1058 def __init__(self, stream):
1059 """Create a TestResultStats which outputs to stream."""
1060 unittest.TestResult.__init__(self)
1061 self._stream = stream
1062 self.failed_tests = 0
1063 self.skipped_tests = 0
1064 self.seen_tags = set()
1067 def total_tests(self):
1068 return self.testsRun
1070 def addError(self, test, err):
1071 self.failed_tests += 1
1073 def addFailure(self, test, err):
1074 self.failed_tests += 1
1076 def addSkip(self, test, reason):
1077 self.skipped_tests += 1
1079 def formatStats(self):
1080 self._stream.write("Total tests: %5d\n" % self.total_tests)
1081 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1082 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1083 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1084 tags = sorted(self.seen_tags)
1085 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1088 def passed_tests(self):
1089 return self.total_tests - self.failed_tests - self.skipped_tests
1091 def tags(self, new_tags, gone_tags):
1092 """Accumulate the seen tags."""
1093 self.seen_tags.update(new_tags)
1095 def wasSuccessful(self):
1096 """Tells whether or not this result was a success"""
1097 return self.failed_tests == 0
1100 class TestResultFilter(unittest.TestResult):
1101 """A pyunit TestResult interface implementation which filters tests.
1103 Tests that pass the filter are handed on to another TestResult instance
1104 for further processing/reporting. To obtain the filtered results,
1105 the other instance must be interrogated.
1107 :ivar result: The result that tests are passed to after filtering.
1108 :ivar filter_predicate: The callback run to decide whether to pass
1112 def __init__(self, result, filter_error=False, filter_failure=False,
1113 filter_success=True, filter_skip=False,
1114 filter_predicate=None):
1115 """Create a FilterResult object filtering to result.
1117 :param filter_error: Filter out errors.
1118 :param filter_failure: Filter out failures.
1119 :param filter_success: Filter out successful tests.
1120 :param filter_skip: Filter out skipped tests.
1121 :param filter_predicate: A callable taking (test, err) and
1122 returning True if the result should be passed through.
1123 err is None for success.
1125 unittest.TestResult.__init__(self)
1126 self.result = result
1127 self._filter_error = filter_error
1128 self._filter_failure = filter_failure
1129 self._filter_success = filter_success
1130 self._filter_skip = filter_skip
1131 if filter_predicate is None:
1132 filter_predicate = lambda test, err: True
1133 self.filter_predicate = filter_predicate
1134 # The current test (for filtering tags)
1135 self._current_test = None
1136 # Has the current test been filtered (for outputting test tags)
1137 self._current_test_filtered = None
1138 # The (new, gone) tags for the current test.
1139 self._current_test_tags = None
1141 def addError(self, test, err):
1142 if not self._filter_error and self.filter_predicate(test, err):
1143 self.result.startTest(test)
1144 self.result.addError(test, err)
1146 def addFailure(self, test, err):
1147 if not self._filter_failure and self.filter_predicate(test, err):
1148 self.result.startTest(test)
1149 self.result.addFailure(test, err)
1151 def addSkip(self, test, reason):
1152 if not self._filter_skip and self.filter_predicate(test, reason):
1153 self.result.startTest(test)
1154 # This is duplicated, it would be nice to have on a 'calls
1155 # TestResults' mixin perhaps.
1156 addSkip = getattr(self.result, 'addSkip', None)
1157 if not callable(addSkip):
1158 self.result.addError(test, RemoteError(reason))
1160 self.result.addSkip(test, reason)
1162 def addSuccess(self, test):
1163 if not self._filter_success and self.filter_predicate(test, None):
1164 self.result.startTest(test)
1165 self.result.addSuccess(test)
1167 def startTest(self, test):
1170 Not directly passed to the client, but used for handling of tags
1173 self._current_test = test
1174 self._current_test_filtered = False
1175 self._current_test_tags = set(), set()
1177 def stopTest(self, test):
1180 Not directly passed to the client, but used for handling of tags
1183 if not self._current_test_filtered:
1184 # Tags to output for this test.
1185 if self._current_test_tags[0] or self._current_test_tags[1]:
1186 tags_method = getattr(self.result, 'tags', None)
1187 if callable(tags_method):
1188 self.result.tags(*self._current_test_tags)
1189 self.result.stopTest(test)
1190 self._current_test = None
1191 self._current_test_filtered = None
1192 self._current_test_tags = None
1194 def tags(self, new_tags, gone_tags):
1195 """Handle tag instructions.
1197 Adds and removes tags as appropriate. If a test is currently running,
1198 tags are not affected for subsequent tests.
1200 :param new_tags: Tags to add,
1201 :param gone_tags: Tags to remove.
1203 if self._current_test is not None:
1204 # gather the tags until the test stops.
1205 self._current_test_tags[0].update(new_tags)
1206 self._current_test_tags[0].difference_update(gone_tags)
1207 self._current_test_tags[1].update(gone_tags)
1208 self._current_test_tags[1].difference_update(new_tags)
1209 tags_method = getattr(self.result, 'tags', None)
1210 if tags_method is None:
1212 return tags_method(new_tags, gone_tags)
1214 def id_to_orig_id(self, id):
1215 if id.startswith("subunit.RemotedTestCase."):
1216 return id[len("subunit.RemotedTestCase."):]