2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``,
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
81 $ python -m subunit.run mylib.tests.test_suite
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
89 `ExecTestCase`` is a convenience wrapper for running an external
90 program to get a Subunit stream and then report that back to an arbitrary
93 class AggregateTests(subunit.ExecTestCase):
95 def test_script_one(self):
98 def test_script_two(self):
101 # Normally your normal test loading would take of this automatically,
102 # It is only spelt out in detail here for clarity.
103 suite = unittest.TestSuite([AggregateTests("test_script_one"),
104 AggregateTests("test_script_two")])
105 # Create any TestResult class you like.
106 result = unittest._TextTestResult(sys.stdout)
107 # And run your suite as normal, Subunit will exec each external script as
108 # needed and report to your result object.
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
117 * subunit.test_results contains TestResult helper classes.
123 from StringIO import StringIO
130 import chunked, content, content_type, details, test_results
141 return subunit.tests.test_suite()
144 def join_dir(base_path, path):
146 Returns an absolute path to C{path}, calculated relative to the parent
149 @param base_path: A path to a file or directory.
150 @param path: An absolute path, or a path relative to the containing
151 directory of C{base_path}.
153 @return: An absolute path to C{path}.
155 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
158 def tags_to_new_gone(tags):
159 """Split a list of tags into a new_set and a gone_set."""
164 gone_tags.add(tag[1:])
167 return new_tags, gone_tags
170 class DiscardStream(object):
171 """A filelike object which discards what is written to it."""
173 def write(self, bytes):
177 class _ParserState(object):
178 """State for the subunit parser."""
180 def __init__(self, parser):
183 def addError(self, offset, line):
184 """An 'error:' directive has been read."""
185 self.parser.stdOutLineReceived(line)
187 def addExpectedFail(self, offset, line):
188 """An 'xfail:' directive has been read."""
189 self.parser.stdOutLineReceived(line)
191 def addFailure(self, offset, line):
192 """A 'failure:' directive has been read."""
193 self.parser.stdOutLineReceived(line)
195 def addSkip(self, offset, line):
196 """A 'skip:' directive has been read."""
197 self.parser.stdOutLineReceived(line)
199 def addSuccess(self, offset, line):
200 """A 'success:' directive has been read."""
201 self.parser.stdOutLineReceived(line)
203 def lineReceived(self, line):
204 """a line has been received."""
205 parts = line.split(None, 1)
208 offset = len(cmd) + 1
210 if cmd in ('test', 'testing'):
211 self.startTest(offset, line)
213 self.addError(offset, line)
214 elif cmd == 'failure':
215 self.addFailure(offset, line)
216 elif cmd == 'progress':
217 self.parser._handleProgress(offset, line)
219 self.addSkip(offset, line)
220 elif cmd in ('success', 'successful'):
221 self.addSuccess(offset, line)
222 elif cmd in ('tags',):
223 self.parser._handleTags(offset, line)
224 elif cmd in ('time',):
225 self.parser._handleTime(offset, line)
227 self.addExpectedFail(offset, line)
229 self.parser.stdOutLineReceived(line)
231 self.parser.stdOutLineReceived(line)
233 def lostConnection(self):
234 """Connection lost."""
235 self.parser._lostConnectionInTest('unknown state of ')
237 def startTest(self, offset, line):
238 """A test start command received."""
239 self.parser.stdOutLineReceived(line)
242 class _InTest(_ParserState):
243 """State for the subunit parser after reading a test: directive."""
245 def _outcome(self, offset, line, no_details, details_state):
246 """An outcome directive has been read.
248 :param no_details: Callable to call when no details are presented.
249 :param details_state: The state to switch to for details
250 processing of this outcome.
252 if self.parser.current_test_description == line[offset:-1]:
253 self.parser._state = self.parser._outside_test
254 self.parser.current_test_description = None
256 self.parser.client.stopTest(self.parser._current_test)
257 self.parser._current_test = None
258 elif self.parser.current_test_description + " [" == line[offset:-1]:
259 self.parser._state = details_state
260 details_state.set_simple()
261 elif self.parser.current_test_description + " [ multipart" == \
263 self.parser._state = details_state
264 details_state.set_multipart()
266 self.parser.stdOutLineReceived(line)
269 self.parser.client.addError(self.parser._current_test,
272 def addError(self, offset, line):
273 """An 'error:' directive has been read."""
274 self._outcome(offset, line, self._error,
275 self.parser._reading_error_details)
278 self.parser.client.addExpectedFailure(self.parser._current_test,
281 def addExpectedFail(self, offset, line):
282 """An 'xfail:' directive has been read."""
283 self._outcome(offset, line, self._xfail,
284 self.parser._reading_xfail_details)
287 self.parser.client.addFailure(self.parser._current_test,
290 def addFailure(self, offset, line):
291 """A 'failure:' directive has been read."""
292 self._outcome(offset, line, self._failure,
293 self.parser._reading_failure_details)
296 self.parser._skip_or_error()
298 def addSkip(self, offset, line):
299 """A 'skip:' directive has been read."""
300 self._outcome(offset, line, self._skip,
301 self.parser._reading_skip_details)
304 self.parser.client.addSuccess(self.parser._current_test)
306 def addSuccess(self, offset, line):
307 """A 'success:' directive has been read."""
308 self._outcome(offset, line, self._succeed,
309 self.parser._reading_success_details)
311 def lostConnection(self):
312 """Connection lost."""
313 self.parser._lostConnectionInTest('')
316 class _OutSideTest(_ParserState):
317 """State for the subunit parser outside of a test context."""
319 def lostConnection(self):
320 """Connection lost."""
322 def startTest(self, offset, line):
323 """A test start command received."""
324 self.parser._state = self.parser._in_test
325 self.parser._current_test = RemotedTestCase(line[offset:-1])
326 self.parser.current_test_description = line[offset:-1]
327 self.parser.client.startTest(self.parser._current_test)
330 class _ReadingDetails(_ParserState):
331 """Common logic for readin state details."""
333 def endDetails(self):
334 """The end of a details section has been reached."""
335 self.parser._state = self.parser._outside_test
336 self.parser.current_test_description = None
337 self._report_outcome()
338 self.parser.client.stopTest(self.parser._current_test)
340 def lineReceived(self, line):
341 """a line has been received."""
342 self.details_parser.lineReceived(line)
344 def lostConnection(self):
345 """Connection lost."""
346 self.parser._lostConnectionInTest('%s report of ' %
347 self._outcome_label())
349 def _outcome_label(self):
350 """The label to describe this outcome."""
351 raise NotImplementedError(self._outcome_label)
353 def set_simple(self):
354 """Start a simple details parser."""
355 self.details_parser = details.SimpleDetailsParser(self)
357 def set_multipart(self):
358 """Start a multipart details parser."""
359 self.details_parser = details.MultipartDetailsParser(self)
362 class _ReadingFailureDetails(_ReadingDetails):
363 """State for the subunit parser when reading failure details."""
365 def _report_outcome(self):
366 self.parser.client.addFailure(self.parser._current_test,
367 details=self.details_parser.get_details())
369 def _outcome_label(self):
373 class _ReadingErrorDetails(_ReadingDetails):
374 """State for the subunit parser when reading error details."""
376 def _report_outcome(self):
377 self.parser.client.addError(self.parser._current_test,
378 details=self.details_parser.get_details())
380 def _outcome_label(self):
384 class _ReadingExpectedFailureDetails(_ReadingDetails):
385 """State for the subunit parser when reading xfail details."""
387 def _report_outcome(self):
388 self.parser.client.addExpectedFailure(self.parser._current_test,
389 details=self.details_parser.get_details())
391 def _outcome_label(self):
395 class _ReadingSkipDetails(_ReadingDetails):
396 """State for the subunit parser when reading skip details."""
398 def _report_outcome(self):
399 self.parser._skip_or_error(self.details_parser.get_message())
401 def _outcome_label(self):
405 class _ReadingSuccessDetails(_ReadingDetails):
406 """State for the subunit parser when reading success details."""
408 def _report_outcome(self):
409 self.parser.client.addSuccess(self.parser._current_test)
411 def _outcome_label(self):
415 class TestProtocolServer(object):
416 """A parser for subunit.
418 :ivar tags: The current tags associated with the protocol stream.
421 def __init__(self, client, stream=None):
422 """Create a TestProtocolServer instance.
424 :param client: An object meeting the unittest.TestResult protocol.
425 :param stream: The stream that lines received which are not part of the
426 subunit protocol should be written to. This allows custom handling
427 of mixed protocols. By default, sys.stdout will be used for
430 self.client = test_results.ExtendedToOriginalDecorator(client)
433 self._stream = stream
434 # state objects we can switch too
435 self._in_test = _InTest(self)
436 self._outside_test = _OutSideTest(self)
437 self._reading_error_details = _ReadingErrorDetails(self)
438 self._reading_failure_details = _ReadingFailureDetails(self)
439 self._reading_skip_details = _ReadingSkipDetails(self)
440 self._reading_success_details = _ReadingSuccessDetails(self)
441 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
442 # start with outside test.
443 self._state = self._outside_test
445 def _skip_or_error(self, message=None):
446 """Report the current test as a skip if possible, or else an error."""
448 message = "No reason given"
449 self.client.addSkip(self._current_test, message)
451 def _handleProgress(self, offset, line):
452 """Process a progress directive."""
453 line = line[offset:].strip()
455 whence = PROGRESS_CUR
458 whence = PROGRESS_PUSH
461 whence = PROGRESS_POP
464 whence = PROGRESS_SET
466 self.client.progress(delta, whence)
468 def _handleTags(self, offset, line):
469 """Process a tags command."""
470 tags = line[offset:].split()
471 new_tags, gone_tags = tags_to_new_gone(tags)
472 self.client.tags(new_tags, gone_tags)
474 def _handleTime(self, offset, line):
475 # Accept it, but do not do anything with it yet.
477 event_time = iso8601.parse_date(line[offset:-1])
479 raise TypeError("Failed to parse %r, got %r" % (line, e))
480 self.client.time(event_time)
482 def lineReceived(self, line):
483 """Call the appropriate local method for the received line."""
484 self._state.lineReceived(line)
486 def _lostConnectionInTest(self, state_string):
487 error_string = "lost connection during %stest '%s'" % (
488 state_string, self.current_test_description)
489 self.client.addError(self._current_test, RemoteError(error_string))
490 self.client.stopTest(self._current_test)
492 def lostConnection(self):
493 """The input connection has finished."""
494 self._state.lostConnection()
496 def readFrom(self, pipe):
497 """Blocking convenience API to parse an entire stream.
499 :param pipe: A file-like object supporting readlines().
502 for line in pipe.readlines():
503 self.lineReceived(line)
504 self.lostConnection()
506 def _startTest(self, offset, line):
507 """Internal call to change state machine. Override startTest()."""
508 self._state.startTest(offset, line)
510 def stdOutLineReceived(self, line):
511 self._stream.write(line)
514 class RemoteException(Exception):
515 """An exception that occured remotely to Python."""
517 def __eq__(self, other):
519 return self.args == other.args
520 except AttributeError:
524 class TestProtocolClient(unittest.TestResult):
525 """A TestResult which generates a subunit stream for a test run.
527 # Get a TestSuite or TestCase to run
529 # Create a stream (any object with a 'write' method)
530 stream = file('tests.log', 'wb')
531 # Create a subunit result object which will output to the stream
532 result = subunit.TestProtocolClient(stream)
533 # Optionally, to get timing data for performance analysis, wrap the
534 # serialiser with a timing decorator
535 result = subunit.test_results.AutoTimingTestResultDecorator(result)
536 # Run the test suite reporting to the subunit result object
542 def __init__(self, stream):
543 unittest.TestResult.__init__(self)
544 self._stream = stream
546 def addError(self, test, error=None, details=None):
547 """Report an error in test test.
549 Only one of error and details should be provided: conceptually there
550 are two separate methods:
551 addError(self, test, error)
552 addError(self, test, details)
554 :param error: Standard unittest positional argument form - an
556 :param details: New Testing-in-python drafted API; a dict from string
557 to subunit.Content objects.
559 self._addOutcome("error", test, error=error, details=details)
561 def addExpectedFailure(self, test, error=None, details=None):
562 """Report an expected failure in test test.
564 Only one of error and details should be provided: conceptually there
565 are two separate methods:
566 addError(self, test, error)
567 addError(self, test, details)
569 :param error: Standard unittest positional argument form - an
571 :param details: New Testing-in-python drafted API; a dict from string
572 to subunit.Content objects.
574 self._addOutcome("xfail", test, error=error, details=details)
576 def addFailure(self, test, error=None, details=None):
577 """Report a failure in test test.
579 Only one of error and details should be provided: conceptually there
580 are two separate methods:
581 addFailure(self, test, error)
582 addFailure(self, test, details)
584 :param error: Standard unittest positional argument form - an
586 :param details: New Testing-in-python drafted API; a dict from string
587 to subunit.Content objects.
589 self._addOutcome("failure", test, error=error, details=details)
591 def _addOutcome(self, outcome, test, error=None, details=None):
592 """Report a failure in test test.
594 Only one of error and details should be provided: conceptually there
595 are two separate methods:
596 addOutcome(self, test, error)
597 addOutcome(self, test, details)
599 :param outcome: A string describing the outcome - used as the
600 event name in the subunit stream.
601 :param error: Standard unittest positional argument form - an
603 :param details: New Testing-in-python drafted API; a dict from string
604 to subunit.Content objects.
606 self._stream.write("%s: %s" % (outcome, test.id()))
607 if error is None and details is None:
609 if error is not None:
610 self._stream.write(" [\n")
611 for line in self._exc_info_to_string(error, test).splitlines():
612 self._stream.write("%s\n" % line)
614 self._write_details(details)
615 self._stream.write("]\n")
617 def addSkip(self, test, reason=None, details=None):
618 """Report a skipped test."""
620 self._addOutcome("skip", test, error=None, details=details)
622 self._stream.write("skip: %s [\n" % test.id())
623 self._stream.write("%s\n" % reason)
624 self._stream.write("]\n")
626 def addSuccess(self, test, details=None):
627 """Report a success in a test."""
628 self._stream.write("successful: %s" % test.id())
630 self._stream.write("\n")
632 self._write_details(details)
633 self._stream.write("]\n")
634 addUnexpectedSuccess = addSuccess
636 def startTest(self, test):
637 """Mark a test as starting its test run."""
638 self._stream.write("test: %s\n" % test.id())
640 def progress(self, offset, whence):
641 """Provide indication about the progress/length of the test run.
643 :param offset: Information about the number of tests remaining. If
644 whence is PROGRESS_CUR, then offset increases/decreases the
645 remaining test count. If whence is PROGRESS_SET, then offset
646 specifies exactly the remaining test count.
647 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
650 if whence == PROGRESS_CUR and offset > -1:
652 elif whence == PROGRESS_PUSH:
655 elif whence == PROGRESS_POP:
660 self._stream.write("progress: %s%s\n" % (prefix, offset))
662 def time(self, a_datetime):
663 """Inform the client of the time.
665 ":param datetime: A datetime.datetime object.
667 time = a_datetime.astimezone(iso8601.Utc())
668 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
669 time.year, time.month, time.day, time.hour, time.minute,
670 time.second, time.microsecond))
672 def _write_details(self, details):
673 """Output details to the stream.
675 :param details: An extended details dict for a test outcome.
677 self._stream.write(" [ multipart\n")
678 for name, content in sorted(details.iteritems()):
679 self._stream.write("Content-Type: %s/%s" %
680 (content.content_type.type, content.content_type.subtype))
681 parameters = content.content_type.parameters
683 self._stream.write(";")
685 for param, value in parameters.iteritems():
686 param_strs.append("%s=%s" % (param, value))
687 self._stream.write(",".join(param_strs))
688 self._stream.write("\n%s\n" % name)
689 encoder = chunked.Encoder(self._stream)
690 map(encoder.write, content.iter_bytes())
694 """Obey the testtools result.done() interface."""
697 def RemoteError(description=""):
698 if description == "":
700 return (RemoteException, RemoteException(description), None)
703 class RemotedTestCase(unittest.TestCase):
704 """A class to represent test cases run in child processes.
706 Instances of this class are used to provide the Python test API a TestCase
707 that can be printed to the screen, introspected for metadata and so on.
708 However, as they are a simply a memoisation of a test that was actually
709 run in the past by a separate process, they cannot perform any interactive
713 def __eq__ (self, other):
715 return self.__description == other.__description
716 except AttributeError:
719 def __init__(self, description):
720 """Create a psuedo test case with description description."""
721 self.__description = description
723 def error(self, label):
724 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
731 self.error("tearDown")
733 def shortDescription(self):
734 return self.__description
737 return "%s" % (self.__description,)
740 return "%s (%s)" % (self.__description, self._strclass())
743 return "<%s description='%s'>" % \
744 (self._strclass(), self.__description)
746 def run(self, result=None):
747 if result is None: result = self.defaultTestResult()
748 result.startTest(self)
749 result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
750 result.stopTest(self)
754 return "%s.%s" % (cls.__module__, cls.__name__)
757 class ExecTestCase(unittest.TestCase):
758 """A test case which runs external scripts for test fixtures."""
760 def __init__(self, methodName='runTest'):
761 """Create an instance of the class that will use the named test
762 method when executed. Raises a ValueError if the instance does
763 not have a method with the specified name.
765 unittest.TestCase.__init__(self, methodName)
766 testMethod = getattr(self, methodName)
767 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
770 def countTestCases(self):
773 def run(self, result=None):
774 if result is None: result = self.defaultTestResult()
778 """Run the test without collecting errors in a TestResult"""
779 self._run(unittest.TestResult())
781 def _run(self, result):
782 protocol = TestProtocolServer(result)
783 output = subprocess.Popen(self.script, shell=True,
784 stdout=subprocess.PIPE).communicate()[0]
785 protocol.readFrom(StringIO(output))
788 class IsolatedTestCase(unittest.TestCase):
789 """A TestCase which executes in a forked process.
791 Each test gets its own process, which has a performance overhead but will
792 provide excellent isolation from global state (such as django configs,
793 zope utilities and so on).
796 def run(self, result=None):
797 if result is None: result = self.defaultTestResult()
798 run_isolated(unittest.TestCase, self, result)
801 class IsolatedTestSuite(unittest.TestSuite):
802 """A TestSuite which runs its tests in a forked process.
804 This decorator that will fork() before running the tests and report the
805 results from the child process using a Subunit stream. This is useful for
806 handling tests that mutate global state, or are testing C extensions that
810 def run(self, result=None):
811 if result is None: result = unittest.TestResult()
812 run_isolated(unittest.TestSuite, self, result)
815 def run_isolated(klass, self, result):
816 """Run a test suite or case in a subprocess, using the run method on klass.
818 c2pread, c2pwrite = os.pipe()
819 # fixme - error -> result
824 # Close parent's pipe ends
831 # at this point, sys.stdin is redirected, now we want
832 # to filter it to escape ]'s.
833 ### XXX: test and write that bit.
835 result = TestProtocolClient(sys.stdout)
836 klass.run(self, result)
839 # exit HARD, exit NOW.
843 # Close child pipe ends
845 # hookup a protocol engine
846 protocol = TestProtocolServer(result)
847 protocol.readFrom(os.fdopen(c2pread, 'rU'))
849 # TODO return code evaluation.
853 def TAP2SubUnit(tap, subunit):
854 """Filter a TAP pipe into a subunit pipe.
856 :param tap: A tap pipe/stream/file object.
857 :param subunit: A pipe/stream/file object to write subunit results to.
858 :return: The exit code to exit with.
863 client = TestProtocolClient(subunit)
867 def _skipped_test(subunit, plan_start):
868 # Some tests were skipped.
869 subunit.write('test test %d\n' % plan_start)
870 subunit.write('error test %d [\n' % plan_start)
871 subunit.write('test missing from TAP output\n')
873 return plan_start + 1
874 # Test data for the next test to emit
880 if test_name is None:
882 subunit.write("test %s\n" % test_name)
884 subunit.write("%s %s\n" % (result, test_name))
886 subunit.write("%s %s [\n" % (result, test_name))
889 subunit.write("%s\n" % line)
893 if state == BEFORE_PLAN:
894 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
897 _, plan_stop, comment = match.groups()
898 plan_stop = int(plan_stop)
899 if plan_start > plan_stop and plan_stop == 0:
902 subunit.write("test file skip\n")
903 subunit.write("skip file skip [\n")
904 subunit.write("%s\n" % comment)
907 # not a plan line, or have seen one before
908 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
910 # new test, emit current one.
912 status, number, description, directive, directive_comment = match.groups()
917 if description is None:
920 description = ' ' + description
921 if directive is not None:
922 if directive == 'TODO':
924 elif directive == 'SKIP':
926 if directive_comment is not None:
927 log.append(directive_comment)
928 if number is not None:
930 while plan_start < number:
931 plan_start = _skipped_test(subunit, plan_start)
932 test_name = "test %d%s" % (plan_start, description)
935 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
937 reason, = match.groups()
941 extra = ' %s' % reason
943 test_name = "Bail out!%s" % extra
947 match = re.match("\#.*\n", line)
949 log.append(line[:-1])
953 while plan_start <= plan_stop:
954 # record missed tests
955 plan_start = _skipped_test(subunit, plan_start)
959 def tag_stream(original, filtered, tags):
960 """Alter tags on a stream.
962 :param original: The input stream.
963 :param filtered: The output stream.
964 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
967 A 'TAG' command will add the tag to the output stream,
968 and override any existing '-TAG' command in that stream.
970 * A global 'tags: TAG' will be added to the start of the stream.
971 * Any tags commands with -TAG will have the -TAG removed.
973 A '-TAG' command will remove the TAG command from the stream.
975 * A 'tags: -TAG' command will be added to the start of the stream.
976 * Any 'tags: TAG' command will have 'TAG' removed from it.
977 Additionally, any redundant tagging commands (adding a tag globally
978 present, or removing a tag globally removed) are stripped as a
979 by-product of the filtering.
982 new_tags, gone_tags = tags_to_new_gone(tags)
983 def write_tags(new_tags, gone_tags):
984 if new_tags or gone_tags:
985 filtered.write("tags: " + ' '.join(new_tags))
987 for tag in gone_tags:
988 filtered.write("-" + tag)
990 write_tags(new_tags, gone_tags)
991 # TODO: use the protocol parser and thus don't mangle test comments.
992 for line in original:
993 if line.startswith("tags:"):
994 line_tags = line[5:].split()
995 line_new, line_gone = tags_to_new_gone(line_tags)
996 line_new = line_new - gone_tags
997 line_gone = line_gone - new_tags
998 write_tags(line_new, line_gone)
1000 filtered.write(line)
1004 class ProtocolTestCase(object):
1005 """Subunit wire protocol to unittest.TestCase adapter.
1007 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1008 calling a ProtocolTestCase or invoking the run() method will make a 'test
1009 run' happen. The 'test run' will simply be a replay of the test activity
1010 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1011 and ``countTestCases`` methods are not supported because there isn't a
1012 sensible mapping for those methods.
1014 # Get a stream (any object with a readline() method), in this case the
1015 # stream output by the example from ``subunit.TestProtocolClient``.
1016 stream = file('tests.log', 'rb')
1017 # Create a parser which will read from the stream and emit
1018 # activity to a unittest.TestResult when run() is called.
1019 suite = subunit.ProtocolTestCase(stream)
1020 # Create a result object to accept the contents of that stream.
1021 result = unittest._TextTestResult(sys.stdout)
1022 # 'run' the tests - process the stream and feed its contents to result.
1026 :seealso: TestProtocolServer (the subunit wire protocol parser).
1029 def __init__(self, stream, passthrough=None):
1030 """Create a ProtocolTestCase reading from stream.
1032 :param stream: A filelike object which a subunit stream can be read
1034 :param passthrough: A stream pass non subunit input on to. If not
1035 supplied, the TestProtocolServer default is used.
1037 self._stream = stream
1038 self._passthrough = passthrough
1040 def __call__(self, result=None):
1041 return self.run(result)
1043 def run(self, result=None):
1045 result = self.defaultTestResult()
1046 protocol = TestProtocolServer(result, self._passthrough)
1047 line = self._stream.readline()
1049 protocol.lineReceived(line)
1050 line = self._stream.readline()
1051 protocol.lostConnection()
1054 class TestResultStats(unittest.TestResult):
1055 """A pyunit TestResult interface implementation for making statistics.
1057 :ivar total_tests: The total tests seen.
1058 :ivar passed_tests: The tests that passed.
1059 :ivar failed_tests: The tests that failed.
1060 :ivar seen_tags: The tags seen across all tests.
1063 def __init__(self, stream):
1064 """Create a TestResultStats which outputs to stream."""
1065 unittest.TestResult.__init__(self)
1066 self._stream = stream
1067 self.failed_tests = 0
1068 self.skipped_tests = 0
1069 self.seen_tags = set()
1072 def total_tests(self):
1073 return self.testsRun
1075 def addError(self, test, err):
1076 self.failed_tests += 1
1078 def addFailure(self, test, err):
1079 self.failed_tests += 1
1081 def addSkip(self, test, reason):
1082 self.skipped_tests += 1
1084 def formatStats(self):
1085 self._stream.write("Total tests: %5d\n" % self.total_tests)
1086 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1087 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1088 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1089 tags = sorted(self.seen_tags)
1090 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1093 def passed_tests(self):
1094 return self.total_tests - self.failed_tests - self.skipped_tests
1096 def tags(self, new_tags, gone_tags):
1097 """Accumulate the seen tags."""
1098 self.seen_tags.update(new_tags)
1100 def wasSuccessful(self):
1101 """Tells whether or not this result was a success"""
1102 return self.failed_tests == 0
1105 class TestResultFilter(unittest.TestResult):
1106 """A pyunit TestResult interface implementation which filters tests.
1108 Tests that pass the filter are handed on to another TestResult instance
1109 for further processing/reporting. To obtain the filtered results,
1110 the other instance must be interrogated.
1112 :ivar result: The result that tests are passed to after filtering.
1113 :ivar filter_predicate: The callback run to decide whether to pass
1117 def __init__(self, result, filter_error=False, filter_failure=False,
1118 filter_success=True, filter_skip=False,
1119 filter_predicate=None):
1120 """Create a FilterResult object filtering to result.
1122 :param filter_error: Filter out errors.
1123 :param filter_failure: Filter out failures.
1124 :param filter_success: Filter out successful tests.
1125 :param filter_skip: Filter out skipped tests.
1126 :param filter_predicate: A callable taking (test, err) and
1127 returning True if the result should be passed through.
1128 err is None for success.
1130 unittest.TestResult.__init__(self)
1131 self.result = result
1132 self._filter_error = filter_error
1133 self._filter_failure = filter_failure
1134 self._filter_success = filter_success
1135 self._filter_skip = filter_skip
1136 if filter_predicate is None:
1137 filter_predicate = lambda test, err: True
1138 self.filter_predicate = filter_predicate
1139 # The current test (for filtering tags)
1140 self._current_test = None
1141 # Has the current test been filtered (for outputting test tags)
1142 self._current_test_filtered = None
1143 # The (new, gone) tags for the current test.
1144 self._current_test_tags = None
1146 def addError(self, test, err):
1147 if not self._filter_error and self.filter_predicate(test, err):
1148 self.result.startTest(test)
1149 self.result.addError(test, err)
1151 def addFailure(self, test, err):
1152 if not self._filter_failure and self.filter_predicate(test, err):
1153 self.result.startTest(test)
1154 self.result.addFailure(test, err)
1156 def addSkip(self, test, reason):
1157 if not self._filter_skip and self.filter_predicate(test, reason):
1158 self.result.startTest(test)
1159 # This is duplicated, it would be nice to have on a 'calls
1160 # TestResults' mixin perhaps.
1161 addSkip = getattr(self.result, 'addSkip', None)
1162 if not callable(addSkip):
1163 self.result.addError(test, RemoteError(reason))
1165 self.result.addSkip(test, reason)
1167 def addSuccess(self, test):
1168 if not self._filter_success and self.filter_predicate(test, None):
1169 self.result.startTest(test)
1170 self.result.addSuccess(test)
1172 def startTest(self, test):
1175 Not directly passed to the client, but used for handling of tags
1178 self._current_test = test
1179 self._current_test_filtered = False
1180 self._current_test_tags = set(), set()
1182 def stopTest(self, test):
1185 Not directly passed to the client, but used for handling of tags
1188 if not self._current_test_filtered:
1189 # Tags to output for this test.
1190 if self._current_test_tags[0] or self._current_test_tags[1]:
1191 tags_method = getattr(self.result, 'tags', None)
1192 if callable(tags_method):
1193 self.result.tags(*self._current_test_tags)
1194 self.result.stopTest(test)
1195 self._current_test = None
1196 self._current_test_filtered = None
1197 self._current_test_tags = None
1199 def tags(self, new_tags, gone_tags):
1200 """Handle tag instructions.
1202 Adds and removes tags as appropriate. If a test is currently running,
1203 tags are not affected for subsequent tests.
1205 :param new_tags: Tags to add,
1206 :param gone_tags: Tags to remove.
1208 if self._current_test is not None:
1209 # gather the tags until the test stops.
1210 self._current_test_tags[0].update(new_tags)
1211 self._current_test_tags[0].difference_update(gone_tags)
1212 self._current_test_tags[1].update(gone_tags)
1213 self._current_test_tags[1].difference_update(new_tags)
1214 tags_method = getattr(self.result, 'tags', None)
1215 if tags_method is None:
1217 return tags_method(new_tags, gone_tags)
1219 def id_to_orig_id(self, id):
1220 if id.startswith("subunit.RemotedTestCase."):
1221 return id[len("subunit.RemotedTestCase."):]