2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occured at, to allow reconstructing
68 test timing from a stream.
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
82 $ python -m subunit.run mylib.tests.test_suite
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
94 class AggregateTests(subunit.ExecTestCase):
96 def test_script_one(self):
99 def test_script_two(self):
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105 AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
122 from StringIO import StringIO
128 from testtools import content, content_type, ExtendedToOriginalDecorator
130 from testtools.testresult.real import _StringException
131 RemoteException = _StringException
132 _remote_exception_str = '_StringException' # For testing.
134 raise ImportError ("testtools.testresult.real does not contain "
135 "_StringException, check your version.")
138 from testtools.testresult.real import _StringException
140 import chunked, details, test_results
151 return subunit.tests.test_suite()
154 def join_dir(base_path, path):
156 Returns an absolute path to C{path}, calculated relative to the parent
159 @param base_path: A path to a file or directory.
160 @param path: An absolute path, or a path relative to the containing
161 directory of C{base_path}.
163 @return: An absolute path to C{path}.
165 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
168 def tags_to_new_gone(tags):
169 """Split a list of tags into a new_set and a gone_set."""
174 gone_tags.add(tag[1:])
177 return new_tags, gone_tags
180 class DiscardStream(object):
181 """A filelike object which discards what is written to it."""
183 def write(self, bytes):
187 class _ParserState(object):
188 """State for the subunit parser."""
190 def __init__(self, parser):
193 def addError(self, offset, line):
194 """An 'error:' directive has been read."""
195 self.parser.stdOutLineReceived(line)
197 def addExpectedFail(self, offset, line):
198 """An 'xfail:' directive has been read."""
199 self.parser.stdOutLineReceived(line)
201 def addFailure(self, offset, line):
202 """A 'failure:' directive has been read."""
203 self.parser.stdOutLineReceived(line)
205 def addSkip(self, offset, line):
206 """A 'skip:' directive has been read."""
207 self.parser.stdOutLineReceived(line)
209 def addSuccess(self, offset, line):
210 """A 'success:' directive has been read."""
211 self.parser.stdOutLineReceived(line)
213 def lineReceived(self, line):
214 """a line has been received."""
215 parts = line.split(None, 1)
218 offset = len(cmd) + 1
220 if cmd in ('test', 'testing'):
221 self.startTest(offset, line)
223 self.addError(offset, line)
224 elif cmd == 'failure':
225 self.addFailure(offset, line)
226 elif cmd == 'progress':
227 self.parser._handleProgress(offset, line)
229 self.addSkip(offset, line)
230 elif cmd in ('success', 'successful'):
231 self.addSuccess(offset, line)
232 elif cmd in ('tags',):
233 self.parser._handleTags(offset, line)
234 self.parser.subunitLineReceived(line)
235 elif cmd in ('time',):
236 self.parser._handleTime(offset, line)
237 self.parser.subunitLineReceived(line)
239 self.addExpectedFail(offset, line)
241 self.parser.stdOutLineReceived(line)
243 self.parser.stdOutLineReceived(line)
245 def lostConnection(self):
246 """Connection lost."""
247 self.parser._lostConnectionInTest('unknown state of ')
249 def startTest(self, offset, line):
250 """A test start command received."""
251 self.parser.stdOutLineReceived(line)
254 class _InTest(_ParserState):
255 """State for the subunit parser after reading a test: directive."""
257 def _outcome(self, offset, line, no_details, details_state):
258 """An outcome directive has been read.
260 :param no_details: Callable to call when no details are presented.
261 :param details_state: The state to switch to for details
262 processing of this outcome.
264 if self.parser.current_test_description == line[offset:-1]:
265 self.parser._state = self.parser._outside_test
266 self.parser.current_test_description = None
268 self.parser.client.stopTest(self.parser._current_test)
269 self.parser._current_test = None
270 self.parser.subunitLineReceived(line)
271 elif self.parser.current_test_description + " [" == line[offset:-1]:
272 self.parser._state = details_state
273 details_state.set_simple()
274 self.parser.subunitLineReceived(line)
275 elif self.parser.current_test_description + " [ multipart" == \
277 self.parser._state = details_state
278 details_state.set_multipart()
279 self.parser.subunitLineReceived(line)
281 self.parser.stdOutLineReceived(line)
284 self.parser.client.addError(self.parser._current_test,
287 def addError(self, offset, line):
288 """An 'error:' directive has been read."""
289 self._outcome(offset, line, self._error,
290 self.parser._reading_error_details)
293 self.parser.client.addExpectedFailure(self.parser._current_test,
296 def addExpectedFail(self, offset, line):
297 """An 'xfail:' directive has been read."""
298 self._outcome(offset, line, self._xfail,
299 self.parser._reading_xfail_details)
302 self.parser.client.addFailure(self.parser._current_test, details={})
304 def addFailure(self, offset, line):
305 """A 'failure:' directive has been read."""
306 self._outcome(offset, line, self._failure,
307 self.parser._reading_failure_details)
310 self.parser.client.addSkip(self.parser._current_test, details={})
312 def addSkip(self, offset, line):
313 """A 'skip:' directive has been read."""
314 self._outcome(offset, line, self._skip,
315 self.parser._reading_skip_details)
318 self.parser.client.addSuccess(self.parser._current_test, details={})
320 def addSuccess(self, offset, line):
321 """A 'success:' directive has been read."""
322 self._outcome(offset, line, self._succeed,
323 self.parser._reading_success_details)
325 def lostConnection(self):
326 """Connection lost."""
327 self.parser._lostConnectionInTest('')
330 class _OutSideTest(_ParserState):
331 """State for the subunit parser outside of a test context."""
333 def lostConnection(self):
334 """Connection lost."""
336 def startTest(self, offset, line):
337 """A test start command received."""
338 self.parser._state = self.parser._in_test
339 self.parser._current_test = RemotedTestCase(line[offset:-1])
340 self.parser.current_test_description = line[offset:-1]
341 self.parser.client.startTest(self.parser._current_test)
342 self.parser.subunitLineReceived(line)
345 class _ReadingDetails(_ParserState):
346 """Common logic for readin state details."""
348 def endDetails(self):
349 """The end of a details section has been reached."""
350 self.parser._state = self.parser._outside_test
351 self.parser.current_test_description = None
352 self._report_outcome()
353 self.parser.client.stopTest(self.parser._current_test)
355 def lineReceived(self, line):
356 """a line has been received."""
357 self.details_parser.lineReceived(line)
358 self.parser.subunitLineReceived(line)
360 def lostConnection(self):
361 """Connection lost."""
362 self.parser._lostConnectionInTest('%s report of ' %
363 self._outcome_label())
365 def _outcome_label(self):
366 """The label to describe this outcome."""
367 raise NotImplementedError(self._outcome_label)
369 def set_simple(self):
370 """Start a simple details parser."""
371 self.details_parser = details.SimpleDetailsParser(self)
373 def set_multipart(self):
374 """Start a multipart details parser."""
375 self.details_parser = details.MultipartDetailsParser(self)
378 class _ReadingFailureDetails(_ReadingDetails):
379 """State for the subunit parser when reading failure details."""
381 def _report_outcome(self):
382 self.parser.client.addFailure(self.parser._current_test,
383 details=self.details_parser.get_details())
385 def _outcome_label(self):
389 class _ReadingErrorDetails(_ReadingDetails):
390 """State for the subunit parser when reading error details."""
392 def _report_outcome(self):
393 self.parser.client.addError(self.parser._current_test,
394 details=self.details_parser.get_details())
396 def _outcome_label(self):
400 class _ReadingExpectedFailureDetails(_ReadingDetails):
401 """State for the subunit parser when reading xfail details."""
403 def _report_outcome(self):
404 self.parser.client.addExpectedFailure(self.parser._current_test,
405 details=self.details_parser.get_details())
407 def _outcome_label(self):
411 class _ReadingSkipDetails(_ReadingDetails):
412 """State for the subunit parser when reading skip details."""
414 def _report_outcome(self):
415 self.parser.client.addSkip(self.parser._current_test,
416 details=self.details_parser.get_details("skip"))
418 def _outcome_label(self):
422 class _ReadingSuccessDetails(_ReadingDetails):
423 """State for the subunit parser when reading success details."""
425 def _report_outcome(self):
426 self.parser.client.addSuccess(self.parser._current_test,
427 details=self.details_parser.get_details("success"))
429 def _outcome_label(self):
433 class TestProtocolServer(object):
434 """A parser for subunit.
436 :ivar tags: The current tags associated with the protocol stream.
439 def __init__(self, client, stream=None, forward_stream=None):
440 """Create a TestProtocolServer instance.
442 :param client: An object meeting the unittest.TestResult protocol.
443 :param stream: The stream that lines received which are not part of the
444 subunit protocol should be written to. This allows custom handling
445 of mixed protocols. By default, sys.stdout will be used for
447 :param forward_stream: A stream to forward subunit lines to. This
448 allows a filter to forward the entire stream while still parsing
449 and acting on it. By default forward_stream is set to
450 DiscardStream() and no forwarding happens.
452 self.client = ExtendedToOriginalDecorator(client)
455 self._stream = stream
456 self._forward_stream = forward_stream or DiscardStream()
457 # state objects we can switch too
458 self._in_test = _InTest(self)
459 self._outside_test = _OutSideTest(self)
460 self._reading_error_details = _ReadingErrorDetails(self)
461 self._reading_failure_details = _ReadingFailureDetails(self)
462 self._reading_skip_details = _ReadingSkipDetails(self)
463 self._reading_success_details = _ReadingSuccessDetails(self)
464 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
465 # start with outside test.
466 self._state = self._outside_test
468 def _handleProgress(self, offset, line):
469 """Process a progress directive."""
470 line = line[offset:].strip()
472 whence = PROGRESS_CUR
475 whence = PROGRESS_PUSH
478 whence = PROGRESS_POP
481 whence = PROGRESS_SET
483 self.client.progress(delta, whence)
485 def _handleTags(self, offset, line):
486 """Process a tags command."""
487 tags = line[offset:].split()
488 new_tags, gone_tags = tags_to_new_gone(tags)
489 self.client.tags(new_tags, gone_tags)
491 def _handleTime(self, offset, line):
492 # Accept it, but do not do anything with it yet.
494 event_time = iso8601.parse_date(line[offset:-1])
496 raise TypeError("Failed to parse %r, got %r" % (line, e))
497 self.client.time(event_time)
499 def lineReceived(self, line):
500 """Call the appropriate local method for the received line."""
501 self._state.lineReceived(line)
503 def _lostConnectionInTest(self, state_string):
504 error_string = "lost connection during %stest '%s'" % (
505 state_string, self.current_test_description)
506 self.client.addError(self._current_test, RemoteError(error_string))
507 self.client.stopTest(self._current_test)
509 def lostConnection(self):
510 """The input connection has finished."""
511 self._state.lostConnection()
513 def readFrom(self, pipe):
514 """Blocking convenience API to parse an entire stream.
516 :param pipe: A file-like object supporting readlines().
519 for line in pipe.readlines():
520 self.lineReceived(line)
521 self.lostConnection()
523 def _startTest(self, offset, line):
524 """Internal call to change state machine. Override startTest()."""
525 self._state.startTest(offset, line)
527 def subunitLineReceived(self, line):
528 self._forward_stream.write(line)
530 def stdOutLineReceived(self, line):
531 self._stream.write(line)
534 class TestProtocolClient(unittest.TestResult):
535 """A TestResult which generates a subunit stream for a test run.
537 # Get a TestSuite or TestCase to run
539 # Create a stream (any object with a 'write' method)
540 stream = file('tests.log', 'wb')
541 # Create a subunit result object which will output to the stream
542 result = subunit.TestProtocolClient(stream)
543 # Optionally, to get timing data for performance analysis, wrap the
544 # serialiser with a timing decorator
545 result = subunit.test_results.AutoTimingTestResultDecorator(result)
546 # Run the test suite reporting to the subunit result object
552 def __init__(self, stream):
553 unittest.TestResult.__init__(self)
554 self._stream = stream
556 def addError(self, test, error=None, details=None):
557 """Report an error in test test.
559 Only one of error and details should be provided: conceptually there
560 are two separate methods:
561 addError(self, test, error)
562 addError(self, test, details)
564 :param error: Standard unittest positional argument form - an
566 :param details: New Testing-in-python drafted API; a dict from string
567 to subunit.Content objects.
569 self._addOutcome("error", test, error=error, details=details)
571 def addExpectedFailure(self, test, error=None, details=None):
572 """Report an expected failure in test test.
574 Only one of error and details should be provided: conceptually there
575 are two separate methods:
576 addError(self, test, error)
577 addError(self, test, details)
579 :param error: Standard unittest positional argument form - an
581 :param details: New Testing-in-python drafted API; a dict from string
582 to subunit.Content objects.
584 self._addOutcome("xfail", test, error=error, details=details)
586 def addFailure(self, test, error=None, details=None):
587 """Report a failure in test test.
589 Only one of error and details should be provided: conceptually there
590 are two separate methods:
591 addFailure(self, test, error)
592 addFailure(self, test, details)
594 :param error: Standard unittest positional argument form - an
596 :param details: New Testing-in-python drafted API; a dict from string
597 to subunit.Content objects.
599 self._addOutcome("failure", test, error=error, details=details)
601 def _addOutcome(self, outcome, test, error=None, details=None):
602 """Report a failure in test test.
604 Only one of error and details should be provided: conceptually there
605 are two separate methods:
606 addOutcome(self, test, error)
607 addOutcome(self, test, details)
609 :param outcome: A string describing the outcome - used as the
610 event name in the subunit stream.
611 :param error: Standard unittest positional argument form - an
613 :param details: New Testing-in-python drafted API; a dict from string
614 to subunit.Content objects.
616 self._stream.write("%s: %s" % (outcome, test.id()))
617 if error is None and details is None:
619 if error is not None:
620 self._stream.write(" [\n")
621 for line in self._exc_info_to_string(error, test).splitlines():
622 self._stream.write("%s\n" % line)
624 self._write_details(details)
625 self._stream.write("]\n")
627 def addSkip(self, test, reason=None, details=None):
628 """Report a skipped test."""
630 self._addOutcome("skip", test, error=None, details=details)
632 self._stream.write("skip: %s [\n" % test.id())
633 self._stream.write("%s\n" % reason)
634 self._stream.write("]\n")
636 def addSuccess(self, test, details=None):
637 """Report a success in a test."""
638 self._stream.write("successful: %s" % test.id())
640 self._stream.write("\n")
642 self._write_details(details)
643 self._stream.write("]\n")
644 addUnexpectedSuccess = addSuccess
646 def startTest(self, test):
647 """Mark a test as starting its test run."""
648 self._stream.write("test: %s\n" % test.id())
650 def progress(self, offset, whence):
651 """Provide indication about the progress/length of the test run.
653 :param offset: Information about the number of tests remaining. If
654 whence is PROGRESS_CUR, then offset increases/decreases the
655 remaining test count. If whence is PROGRESS_SET, then offset
656 specifies exactly the remaining test count.
657 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
660 if whence == PROGRESS_CUR and offset > -1:
662 elif whence == PROGRESS_PUSH:
665 elif whence == PROGRESS_POP:
670 self._stream.write("progress: %s%s\n" % (prefix, offset))
672 def time(self, a_datetime):
673 """Inform the client of the time.
675 ":param datetime: A datetime.datetime object.
677 time = a_datetime.astimezone(iso8601.Utc())
678 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
679 time.year, time.month, time.day, time.hour, time.minute,
680 time.second, time.microsecond))
682 def _write_details(self, details):
683 """Output details to the stream.
685 :param details: An extended details dict for a test outcome.
687 self._stream.write(" [ multipart\n")
688 for name, content in sorted(details.iteritems()):
689 self._stream.write("Content-Type: %s/%s" %
690 (content.content_type.type, content.content_type.subtype))
691 parameters = content.content_type.parameters
693 self._stream.write(";")
695 for param, value in parameters.iteritems():
696 param_strs.append("%s=%s" % (param, value))
697 self._stream.write(",".join(param_strs))
698 self._stream.write("\n%s\n" % name)
699 encoder = chunked.Encoder(self._stream)
700 map(encoder.write, content.iter_bytes())
704 """Obey the testtools result.done() interface."""
707 def RemoteError(description=""):
708 return (_StringException, _StringException(description), None)
711 class RemotedTestCase(unittest.TestCase):
712 """A class to represent test cases run in child processes.
714 Instances of this class are used to provide the Python test API a TestCase
715 that can be printed to the screen, introspected for metadata and so on.
716 However, as they are a simply a memoisation of a test that was actually
717 run in the past by a separate process, they cannot perform any interactive
721 def __eq__ (self, other):
723 return self.__description == other.__description
724 except AttributeError:
727 def __init__(self, description):
728 """Create a psuedo test case with description description."""
729 self.__description = description
731 def error(self, label):
732 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
739 self.error("tearDown")
741 def shortDescription(self):
742 return self.__description
745 return "%s" % (self.__description,)
748 return "%s (%s)" % (self.__description, self._strclass())
751 return "<%s description='%s'>" % \
752 (self._strclass(), self.__description)
754 def run(self, result=None):
755 if result is None: result = self.defaultTestResult()
756 result.startTest(self)
757 result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
758 result.stopTest(self)
762 return "%s.%s" % (cls.__module__, cls.__name__)
765 class ExecTestCase(unittest.TestCase):
766 """A test case which runs external scripts for test fixtures."""
768 def __init__(self, methodName='runTest'):
769 """Create an instance of the class that will use the named test
770 method when executed. Raises a ValueError if the instance does
771 not have a method with the specified name.
773 unittest.TestCase.__init__(self, methodName)
774 testMethod = getattr(self, methodName)
775 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
778 def countTestCases(self):
781 def run(self, result=None):
782 if result is None: result = self.defaultTestResult()
786 """Run the test without collecting errors in a TestResult"""
787 self._run(unittest.TestResult())
789 def _run(self, result):
790 protocol = TestProtocolServer(result)
791 output = subprocess.Popen(self.script, shell=True,
792 stdout=subprocess.PIPE).communicate()[0]
793 protocol.readFrom(StringIO(output))
796 class IsolatedTestCase(unittest.TestCase):
797 """A TestCase which executes in a forked process.
799 Each test gets its own process, which has a performance overhead but will
800 provide excellent isolation from global state (such as django configs,
801 zope utilities and so on).
804 def run(self, result=None):
805 if result is None: result = self.defaultTestResult()
806 run_isolated(unittest.TestCase, self, result)
809 class IsolatedTestSuite(unittest.TestSuite):
810 """A TestSuite which runs its tests in a forked process.
812 This decorator that will fork() before running the tests and report the
813 results from the child process using a Subunit stream. This is useful for
814 handling tests that mutate global state, or are testing C extensions that
818 def run(self, result=None):
819 if result is None: result = unittest.TestResult()
820 run_isolated(unittest.TestSuite, self, result)
823 def run_isolated(klass, self, result):
824 """Run a test suite or case in a subprocess, using the run method on klass.
826 c2pread, c2pwrite = os.pipe()
827 # fixme - error -> result
832 # Close parent's pipe ends
839 # at this point, sys.stdin is redirected, now we want
840 # to filter it to escape ]'s.
841 ### XXX: test and write that bit.
843 result = TestProtocolClient(sys.stdout)
844 klass.run(self, result)
847 # exit HARD, exit NOW.
851 # Close child pipe ends
853 # hookup a protocol engine
854 protocol = TestProtocolServer(result)
855 protocol.readFrom(os.fdopen(c2pread, 'rU'))
857 # TODO return code evaluation.
861 def TAP2SubUnit(tap, subunit):
862 """Filter a TAP pipe into a subunit pipe.
864 :param tap: A tap pipe/stream/file object.
865 :param subunit: A pipe/stream/file object to write subunit results to.
866 :return: The exit code to exit with.
871 client = TestProtocolClient(subunit)
875 def _skipped_test(subunit, plan_start):
876 # Some tests were skipped.
877 subunit.write('test test %d\n' % plan_start)
878 subunit.write('error test %d [\n' % plan_start)
879 subunit.write('test missing from TAP output\n')
881 return plan_start + 1
882 # Test data for the next test to emit
888 if test_name is None:
890 subunit.write("test %s\n" % test_name)
892 subunit.write("%s %s\n" % (result, test_name))
894 subunit.write("%s %s [\n" % (result, test_name))
897 subunit.write("%s\n" % line)
901 if state == BEFORE_PLAN:
902 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
905 _, plan_stop, comment = match.groups()
906 plan_stop = int(plan_stop)
907 if plan_start > plan_stop and plan_stop == 0:
910 subunit.write("test file skip\n")
911 subunit.write("skip file skip [\n")
912 subunit.write("%s\n" % comment)
915 # not a plan line, or have seen one before
916 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
918 # new test, emit current one.
920 status, number, description, directive, directive_comment = match.groups()
925 if description is None:
928 description = ' ' + description
929 if directive is not None:
930 if directive == 'TODO':
932 elif directive == 'SKIP':
934 if directive_comment is not None:
935 log.append(directive_comment)
936 if number is not None:
938 while plan_start < number:
939 plan_start = _skipped_test(subunit, plan_start)
940 test_name = "test %d%s" % (plan_start, description)
943 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
945 reason, = match.groups()
949 extra = ' %s' % reason
951 test_name = "Bail out!%s" % extra
955 match = re.match("\#.*\n", line)
957 log.append(line[:-1])
961 while plan_start <= plan_stop:
962 # record missed tests
963 plan_start = _skipped_test(subunit, plan_start)
967 def tag_stream(original, filtered, tags):
968 """Alter tags on a stream.
970 :param original: The input stream.
971 :param filtered: The output stream.
972 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
975 A 'TAG' command will add the tag to the output stream,
976 and override any existing '-TAG' command in that stream.
978 * A global 'tags: TAG' will be added to the start of the stream.
979 * Any tags commands with -TAG will have the -TAG removed.
981 A '-TAG' command will remove the TAG command from the stream.
983 * A 'tags: -TAG' command will be added to the start of the stream.
984 * Any 'tags: TAG' command will have 'TAG' removed from it.
985 Additionally, any redundant tagging commands (adding a tag globally
986 present, or removing a tag globally removed) are stripped as a
987 by-product of the filtering.
990 new_tags, gone_tags = tags_to_new_gone(tags)
991 def write_tags(new_tags, gone_tags):
992 if new_tags or gone_tags:
993 filtered.write("tags: " + ' '.join(new_tags))
995 for tag in gone_tags:
996 filtered.write("-" + tag)
998 write_tags(new_tags, gone_tags)
999 # TODO: use the protocol parser and thus don't mangle test comments.
1000 for line in original:
1001 if line.startswith("tags:"):
1002 line_tags = line[5:].split()
1003 line_new, line_gone = tags_to_new_gone(line_tags)
1004 line_new = line_new - gone_tags
1005 line_gone = line_gone - new_tags
1006 write_tags(line_new, line_gone)
1008 filtered.write(line)
1012 class ProtocolTestCase(object):
1013 """Subunit wire protocol to unittest.TestCase adapter.
1015 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1016 calling a ProtocolTestCase or invoking the run() method will make a 'test
1017 run' happen. The 'test run' will simply be a replay of the test activity
1018 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1019 and ``countTestCases`` methods are not supported because there isn't a
1020 sensible mapping for those methods.
1022 # Get a stream (any object with a readline() method), in this case the
1023 # stream output by the example from ``subunit.TestProtocolClient``.
1024 stream = file('tests.log', 'rb')
1025 # Create a parser which will read from the stream and emit
1026 # activity to a unittest.TestResult when run() is called.
1027 suite = subunit.ProtocolTestCase(stream)
1028 # Create a result object to accept the contents of that stream.
1029 result = unittest._TextTestResult(sys.stdout)
1030 # 'run' the tests - process the stream and feed its contents to result.
1034 :seealso: TestProtocolServer (the subunit wire protocol parser).
1037 def __init__(self, stream, passthrough=None, forward=False):
1038 """Create a ProtocolTestCase reading from stream.
1040 :param stream: A filelike object which a subunit stream can be read
1042 :param passthrough: A stream pass non subunit input on to. If not
1043 supplied, the TestProtocolServer default is used.
1044 :param forward: A stream to pass subunit input on to. If not supplied
1045 subunit input is not forwarded.
1047 self._stream = stream
1048 self._passthrough = passthrough
1049 self._forward = forward
1051 def __call__(self, result=None):
1052 return self.run(result)
1054 def run(self, result=None):
1056 result = self.defaultTestResult()
1057 protocol = TestProtocolServer(result, self._passthrough, self._forward)
1058 line = self._stream.readline()
1060 protocol.lineReceived(line)
1061 line = self._stream.readline()
1062 protocol.lostConnection()
1065 class TestResultStats(unittest.TestResult):
1066 """A pyunit TestResult interface implementation for making statistics.
1068 :ivar total_tests: The total tests seen.
1069 :ivar passed_tests: The tests that passed.
1070 :ivar failed_tests: The tests that failed.
1071 :ivar seen_tags: The tags seen across all tests.
1074 def __init__(self, stream):
1075 """Create a TestResultStats which outputs to stream."""
1076 unittest.TestResult.__init__(self)
1077 self._stream = stream
1078 self.failed_tests = 0
1079 self.skipped_tests = 0
1080 self.seen_tags = set()
1083 def total_tests(self):
1084 return self.testsRun
1086 def addError(self, test, err, details=None):
1087 self.failed_tests += 1
1089 def addFailure(self, test, err, details=None):
1090 self.failed_tests += 1
1092 def addSkip(self, test, reason, details=None):
1093 self.skipped_tests += 1
1095 def formatStats(self):
1096 self._stream.write("Total tests: %5d\n" % self.total_tests)
1097 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1098 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1099 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1100 tags = sorted(self.seen_tags)
1101 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1104 def passed_tests(self):
1105 return self.total_tests - self.failed_tests - self.skipped_tests
1107 def tags(self, new_tags, gone_tags):
1108 """Accumulate the seen tags."""
1109 self.seen_tags.update(new_tags)
1111 def wasSuccessful(self):
1112 """Tells whether or not this result was a success"""
1113 return self.failed_tests == 0