2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occured at, to allow reconstructing
68 test timing from a stream.
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
82 $ python -m subunit.run mylib.tests.test_suite
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
94 class AggregateTests(subunit.ExecTestCase):
96 def test_script_one(self):
99 def test_script_two(self):
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105 AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
122 from StringIO import StringIO
128 from testtools import content, content_type, ExtendedToOriginalDecorator
130 from testtools.testresult.real import _StringException
131 RemoteException = _StringException
132 _remote_exception_str = '_StringException' # For testing.
134 raise ImportError ("testtools.testresult.real does not contain "
135 "_StringException, check your version.")
138 from testtools.testresult.real import _StringException
140 import chunked, details, test_results
151 return subunit.tests.test_suite()
154 def join_dir(base_path, path):
156 Returns an absolute path to C{path}, calculated relative to the parent
159 @param base_path: A path to a file or directory.
160 @param path: An absolute path, or a path relative to the containing
161 directory of C{base_path}.
163 @return: An absolute path to C{path}.
165 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
168 def tags_to_new_gone(tags):
169 """Split a list of tags into a new_set and a gone_set."""
174 gone_tags.add(tag[1:])
177 return new_tags, gone_tags
180 class DiscardStream(object):
181 """A filelike object which discards what is written to it."""
183 def write(self, bytes):
187 class _ParserState(object):
188 """State for the subunit parser."""
190 def __init__(self, parser):
193 def addError(self, offset, line):
194 """An 'error:' directive has been read."""
195 self.parser.stdOutLineReceived(line)
197 def addExpectedFail(self, offset, line):
198 """An 'xfail:' directive has been read."""
199 self.parser.stdOutLineReceived(line)
201 def addFailure(self, offset, line):
202 """A 'failure:' directive has been read."""
203 self.parser.stdOutLineReceived(line)
205 def addSkip(self, offset, line):
206 """A 'skip:' directive has been read."""
207 self.parser.stdOutLineReceived(line)
209 def addSuccess(self, offset, line):
210 """A 'success:' directive has been read."""
211 self.parser.stdOutLineReceived(line)
213 def lineReceived(self, line):
214 """a line has been received."""
215 parts = line.split(None, 1)
216 if len(parts) == 2 and line.startswith(parts[0]):
218 offset = len(cmd) + 1
219 cmd = cmd.rstrip(':')
220 if cmd in ('test', 'testing'):
221 self.startTest(offset, line)
223 self.addError(offset, line)
224 elif cmd == 'failure':
225 self.addFailure(offset, line)
226 elif cmd == 'progress':
227 self.parser._handleProgress(offset, line)
229 self.addSkip(offset, line)
230 elif cmd in ('success', 'successful'):
231 self.addSuccess(offset, line)
232 elif cmd in ('tags',):
233 self.parser._handleTags(offset, line)
234 self.parser.subunitLineReceived(line)
235 elif cmd in ('time',):
236 self.parser._handleTime(offset, line)
237 self.parser.subunitLineReceived(line)
239 self.addExpectedFail(offset, line)
241 self.parser.stdOutLineReceived(line)
243 self.parser.stdOutLineReceived(line)
245 def lostConnection(self):
246 """Connection lost."""
247 self.parser._lostConnectionInTest('unknown state of ')
249 def startTest(self, offset, line):
250 """A test start command received."""
251 self.parser.stdOutLineReceived(line)
254 class _InTest(_ParserState):
255 """State for the subunit parser after reading a test: directive."""
257 def _outcome(self, offset, line, no_details, details_state):
258 """An outcome directive has been read.
260 :param no_details: Callable to call when no details are presented.
261 :param details_state: The state to switch to for details
262 processing of this outcome.
264 if self.parser.current_test_description == line[offset:-1]:
265 self.parser._state = self.parser._outside_test
266 self.parser.current_test_description = None
268 self.parser.client.stopTest(self.parser._current_test)
269 self.parser._current_test = None
270 self.parser.subunitLineReceived(line)
271 elif self.parser.current_test_description + " [" == line[offset:-1]:
272 self.parser._state = details_state
273 details_state.set_simple()
274 self.parser.subunitLineReceived(line)
275 elif self.parser.current_test_description + " [ multipart" == \
277 self.parser._state = details_state
278 details_state.set_multipart()
279 self.parser.subunitLineReceived(line)
281 self.parser.stdOutLineReceived(line)
284 self.parser.client.addError(self.parser._current_test,
287 def addError(self, offset, line):
288 """An 'error:' directive has been read."""
289 self._outcome(offset, line, self._error,
290 self.parser._reading_error_details)
293 self.parser.client.addExpectedFailure(self.parser._current_test,
296 def addExpectedFail(self, offset, line):
297 """An 'xfail:' directive has been read."""
298 self._outcome(offset, line, self._xfail,
299 self.parser._reading_xfail_details)
302 self.parser.client.addFailure(self.parser._current_test, details={})
304 def addFailure(self, offset, line):
305 """A 'failure:' directive has been read."""
306 self._outcome(offset, line, self._failure,
307 self.parser._reading_failure_details)
310 self.parser.client.addSkip(self.parser._current_test, details={})
312 def addSkip(self, offset, line):
313 """A 'skip:' directive has been read."""
314 self._outcome(offset, line, self._skip,
315 self.parser._reading_skip_details)
318 self.parser.client.addSuccess(self.parser._current_test, details={})
320 def addSuccess(self, offset, line):
321 """A 'success:' directive has been read."""
322 self._outcome(offset, line, self._succeed,
323 self.parser._reading_success_details)
325 def lostConnection(self):
326 """Connection lost."""
327 self.parser._lostConnectionInTest('')
330 class _OutSideTest(_ParserState):
331 """State for the subunit parser outside of a test context."""
333 def lostConnection(self):
334 """Connection lost."""
336 def startTest(self, offset, line):
337 """A test start command received."""
338 self.parser._state = self.parser._in_test
339 self.parser._current_test = RemotedTestCase(line[offset:-1])
340 self.parser.current_test_description = line[offset:-1]
341 self.parser.client.startTest(self.parser._current_test)
342 self.parser.subunitLineReceived(line)
345 class _ReadingDetails(_ParserState):
346 """Common logic for readin state details."""
348 def endDetails(self):
349 """The end of a details section has been reached."""
350 self.parser._state = self.parser._outside_test
351 self.parser.current_test_description = None
352 self._report_outcome()
353 self.parser.client.stopTest(self.parser._current_test)
355 def lineReceived(self, line):
356 """a line has been received."""
357 self.details_parser.lineReceived(line)
358 self.parser.subunitLineReceived(line)
360 def lostConnection(self):
361 """Connection lost."""
362 self.parser._lostConnectionInTest('%s report of ' %
363 self._outcome_label())
365 def _outcome_label(self):
366 """The label to describe this outcome."""
367 raise NotImplementedError(self._outcome_label)
369 def set_simple(self):
370 """Start a simple details parser."""
371 self.details_parser = details.SimpleDetailsParser(self)
373 def set_multipart(self):
374 """Start a multipart details parser."""
375 self.details_parser = details.MultipartDetailsParser(self)
378 class _ReadingFailureDetails(_ReadingDetails):
379 """State for the subunit parser when reading failure details."""
381 def _report_outcome(self):
382 self.parser.client.addFailure(self.parser._current_test,
383 details=self.details_parser.get_details())
385 def _outcome_label(self):
389 class _ReadingErrorDetails(_ReadingDetails):
390 """State for the subunit parser when reading error details."""
392 def _report_outcome(self):
393 self.parser.client.addError(self.parser._current_test,
394 details=self.details_parser.get_details())
396 def _outcome_label(self):
400 class _ReadingExpectedFailureDetails(_ReadingDetails):
401 """State for the subunit parser when reading xfail details."""
403 def _report_outcome(self):
404 self.parser.client.addExpectedFailure(self.parser._current_test,
405 details=self.details_parser.get_details())
407 def _outcome_label(self):
411 class _ReadingSkipDetails(_ReadingDetails):
412 """State for the subunit parser when reading skip details."""
414 def _report_outcome(self):
415 self.parser.client.addSkip(self.parser._current_test,
416 details=self.details_parser.get_details("skip"))
418 def _outcome_label(self):
422 class _ReadingSuccessDetails(_ReadingDetails):
423 """State for the subunit parser when reading success details."""
425 def _report_outcome(self):
426 self.parser.client.addSuccess(self.parser._current_test,
427 details=self.details_parser.get_details("success"))
429 def _outcome_label(self):
433 class TestProtocolServer(object):
434 """A parser for subunit.
436 :ivar tags: The current tags associated with the protocol stream.
439 def __init__(self, client, stream=None, forward_stream=None):
440 """Create a TestProtocolServer instance.
442 :param client: An object meeting the unittest.TestResult protocol.
443 :param stream: The stream that lines received which are not part of the
444 subunit protocol should be written to. This allows custom handling
445 of mixed protocols. By default, sys.stdout will be used for
447 :param forward_stream: A stream to forward subunit lines to. This
448 allows a filter to forward the entire stream while still parsing
449 and acting on it. By default forward_stream is set to
450 DiscardStream() and no forwarding happens.
452 self.client = ExtendedToOriginalDecorator(client)
455 self._stream = stream
456 self._forward_stream = forward_stream or DiscardStream()
457 # state objects we can switch too
458 self._in_test = _InTest(self)
459 self._outside_test = _OutSideTest(self)
460 self._reading_error_details = _ReadingErrorDetails(self)
461 self._reading_failure_details = _ReadingFailureDetails(self)
462 self._reading_skip_details = _ReadingSkipDetails(self)
463 self._reading_success_details = _ReadingSuccessDetails(self)
464 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
465 # start with outside test.
466 self._state = self._outside_test
468 def _handleProgress(self, offset, line):
469 """Process a progress directive."""
470 line = line[offset:].strip()
472 whence = PROGRESS_CUR
475 whence = PROGRESS_PUSH
478 whence = PROGRESS_POP
481 whence = PROGRESS_SET
483 self.client.progress(delta, whence)
485 def _handleTags(self, offset, line):
486 """Process a tags command."""
487 tags = line[offset:].split()
488 new_tags, gone_tags = tags_to_new_gone(tags)
489 self.client.tags(new_tags, gone_tags)
491 def _handleTime(self, offset, line):
492 # Accept it, but do not do anything with it yet.
494 event_time = iso8601.parse_date(line[offset:-1])
496 raise TypeError("Failed to parse %r, got %r" % (line, e))
497 self.client.time(event_time)
499 def lineReceived(self, line):
500 """Call the appropriate local method for the received line."""
501 self._state.lineReceived(line)
503 def _lostConnectionInTest(self, state_string):
504 error_string = "lost connection during %stest '%s'" % (
505 state_string, self.current_test_description)
506 self.client.addError(self._current_test, RemoteError(error_string))
507 self.client.stopTest(self._current_test)
509 def lostConnection(self):
510 """The input connection has finished."""
511 self._state.lostConnection()
513 def readFrom(self, pipe):
514 """Blocking convenience API to parse an entire stream.
516 :param pipe: A file-like object supporting readlines().
519 for line in pipe.readlines():
520 self.lineReceived(line)
521 self.lostConnection()
523 def _startTest(self, offset, line):
524 """Internal call to change state machine. Override startTest()."""
525 self._state.startTest(offset, line)
527 def subunitLineReceived(self, line):
528 self._forward_stream.write(line)
530 def stdOutLineReceived(self, line):
531 self._stream.write(line)
534 class TestProtocolClient(unittest.TestResult):
535 """A TestResult which generates a subunit stream for a test run.
537 # Get a TestSuite or TestCase to run
539 # Create a stream (any object with a 'write' method)
540 stream = file('tests.log', 'wb')
541 # Create a subunit result object which will output to the stream
542 result = subunit.TestProtocolClient(stream)
543 # Optionally, to get timing data for performance analysis, wrap the
544 # serialiser with a timing decorator
545 result = subunit.test_results.AutoTimingTestResultDecorator(result)
546 # Run the test suite reporting to the subunit result object
552 def __init__(self, stream):
553 unittest.TestResult.__init__(self)
554 self._stream = stream
555 _make_stream_binary(stream)
557 def addError(self, test, error=None, details=None):
558 """Report an error in test test.
560 Only one of error and details should be provided: conceptually there
561 are two separate methods:
562 addError(self, test, error)
563 addError(self, test, details)
565 :param error: Standard unittest positional argument form - an
567 :param details: New Testing-in-python drafted API; a dict from string
568 to subunit.Content objects.
570 self._addOutcome("error", test, error=error, details=details)
572 def addExpectedFailure(self, test, error=None, details=None):
573 """Report an expected failure in test test.
575 Only one of error and details should be provided: conceptually there
576 are two separate methods:
577 addError(self, test, error)
578 addError(self, test, details)
580 :param error: Standard unittest positional argument form - an
582 :param details: New Testing-in-python drafted API; a dict from string
583 to subunit.Content objects.
585 self._addOutcome("xfail", test, error=error, details=details)
587 def addFailure(self, test, error=None, details=None):
588 """Report a failure in test test.
590 Only one of error and details should be provided: conceptually there
591 are two separate methods:
592 addFailure(self, test, error)
593 addFailure(self, test, details)
595 :param error: Standard unittest positional argument form - an
597 :param details: New Testing-in-python drafted API; a dict from string
598 to subunit.Content objects.
600 self._addOutcome("failure", test, error=error, details=details)
602 def _addOutcome(self, outcome, test, error=None, details=None):
603 """Report a failure in test test.
605 Only one of error and details should be provided: conceptually there
606 are two separate methods:
607 addOutcome(self, test, error)
608 addOutcome(self, test, details)
610 :param outcome: A string describing the outcome - used as the
611 event name in the subunit stream.
612 :param error: Standard unittest positional argument form - an
614 :param details: New Testing-in-python drafted API; a dict from string
615 to subunit.Content objects.
617 self._stream.write("%s: %s" % (outcome, test.id()))
618 if error is None and details is None:
620 if error is not None:
621 self._stream.write(" [\n")
622 # XXX: this needs to be made much stricter, along the lines of
623 # Martin[gz]'s work in testtools. Perhaps subunit can use that?
624 for line in self._exc_info_to_string(error, test).splitlines():
625 self._stream.write("%s\n" % line)
627 self._write_details(details)
628 self._stream.write("]\n")
630 def addSkip(self, test, reason=None, details=None):
631 """Report a skipped test."""
633 self._addOutcome("skip", test, error=None, details=details)
635 self._stream.write("skip: %s [\n" % test.id())
636 self._stream.write("%s\n" % reason)
637 self._stream.write("]\n")
639 def addSuccess(self, test, details=None):
640 """Report a success in a test."""
641 self._stream.write("successful: %s" % test.id())
643 self._stream.write("\n")
645 self._write_details(details)
646 self._stream.write("]\n")
647 addUnexpectedSuccess = addSuccess
649 def startTest(self, test):
650 """Mark a test as starting its test run."""
651 self._stream.write("test: %s\n" % test.id())
653 def progress(self, offset, whence):
654 """Provide indication about the progress/length of the test run.
656 :param offset: Information about the number of tests remaining. If
657 whence is PROGRESS_CUR, then offset increases/decreases the
658 remaining test count. If whence is PROGRESS_SET, then offset
659 specifies exactly the remaining test count.
660 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
663 if whence == PROGRESS_CUR and offset > -1:
665 elif whence == PROGRESS_PUSH:
668 elif whence == PROGRESS_POP:
673 self._stream.write("progress: %s%s\n" % (prefix, offset))
675 def time(self, a_datetime):
676 """Inform the client of the time.
678 ":param datetime: A datetime.datetime object.
680 time = a_datetime.astimezone(iso8601.Utc())
681 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
682 time.year, time.month, time.day, time.hour, time.minute,
683 time.second, time.microsecond))
685 def _write_details(self, details):
686 """Output details to the stream.
688 :param details: An extended details dict for a test outcome.
690 self._stream.write(" [ multipart\n")
691 for name, content in sorted(details.iteritems()):
692 self._stream.write("Content-Type: %s/%s" %
693 (content.content_type.type, content.content_type.subtype))
694 parameters = content.content_type.parameters
696 self._stream.write(";")
698 for param, value in parameters.iteritems():
699 param_strs.append("%s=%s" % (param, value))
700 self._stream.write(",".join(param_strs))
701 self._stream.write("\n%s\n" % name)
702 encoder = chunked.Encoder(self._stream)
703 map(encoder.write, content.iter_bytes())
707 """Obey the testtools result.done() interface."""
710 def RemoteError(description=""):
711 return (_StringException, _StringException(description), None)
714 class RemotedTestCase(unittest.TestCase):
715 """A class to represent test cases run in child processes.
717 Instances of this class are used to provide the Python test API a TestCase
718 that can be printed to the screen, introspected for metadata and so on.
719 However, as they are a simply a memoisation of a test that was actually
720 run in the past by a separate process, they cannot perform any interactive
724 def __eq__ (self, other):
726 return self.__description == other.__description
727 except AttributeError:
730 def __init__(self, description):
731 """Create a psuedo test case with description description."""
732 self.__description = description
734 def error(self, label):
735 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
742 self.error("tearDown")
744 def shortDescription(self):
745 return self.__description
748 return "%s" % (self.__description,)
751 return "%s (%s)" % (self.__description, self._strclass())
754 return "<%s description='%s'>" % \
755 (self._strclass(), self.__description)
757 def run(self, result=None):
758 if result is None: result = self.defaultTestResult()
759 result.startTest(self)
760 result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
761 result.stopTest(self)
765 return "%s.%s" % (cls.__module__, cls.__name__)
768 class ExecTestCase(unittest.TestCase):
769 """A test case which runs external scripts for test fixtures."""
771 def __init__(self, methodName='runTest'):
772 """Create an instance of the class that will use the named test
773 method when executed. Raises a ValueError if the instance does
774 not have a method with the specified name.
776 unittest.TestCase.__init__(self, methodName)
777 testMethod = getattr(self, methodName)
778 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
781 def countTestCases(self):
784 def run(self, result=None):
785 if result is None: result = self.defaultTestResult()
789 """Run the test without collecting errors in a TestResult"""
790 self._run(unittest.TestResult())
792 def _run(self, result):
793 protocol = TestProtocolServer(result)
794 output = subprocess.Popen(self.script, shell=True,
795 stdout=subprocess.PIPE).communicate()[0]
796 protocol.readFrom(StringIO(output))
799 class IsolatedTestCase(unittest.TestCase):
800 """A TestCase which executes in a forked process.
802 Each test gets its own process, which has a performance overhead but will
803 provide excellent isolation from global state (such as django configs,
804 zope utilities and so on).
807 def run(self, result=None):
808 if result is None: result = self.defaultTestResult()
809 run_isolated(unittest.TestCase, self, result)
812 class IsolatedTestSuite(unittest.TestSuite):
813 """A TestSuite which runs its tests in a forked process.
815 This decorator that will fork() before running the tests and report the
816 results from the child process using a Subunit stream. This is useful for
817 handling tests that mutate global state, or are testing C extensions that
821 def run(self, result=None):
822 if result is None: result = unittest.TestResult()
823 run_isolated(unittest.TestSuite, self, result)
826 def run_isolated(klass, self, result):
827 """Run a test suite or case in a subprocess, using the run method on klass.
829 c2pread, c2pwrite = os.pipe()
830 # fixme - error -> result
835 # Close parent's pipe ends
842 # at this point, sys.stdin is redirected, now we want
843 # to filter it to escape ]'s.
844 ### XXX: test and write that bit.
846 result = TestProtocolClient(sys.stdout)
847 klass.run(self, result)
850 # exit HARD, exit NOW.
854 # Close child pipe ends
856 # hookup a protocol engine
857 protocol = TestProtocolServer(result)
858 protocol.readFrom(os.fdopen(c2pread, 'rU'))
860 # TODO return code evaluation.
864 def TAP2SubUnit(tap, subunit):
865 """Filter a TAP pipe into a subunit pipe.
867 :param tap: A tap pipe/stream/file object.
868 :param subunit: A pipe/stream/file object to write subunit results to.
869 :return: The exit code to exit with.
874 client = TestProtocolClient(subunit)
878 def _skipped_test(subunit, plan_start):
879 # Some tests were skipped.
880 subunit.write('test test %d\n' % plan_start)
881 subunit.write('error test %d [\n' % plan_start)
882 subunit.write('test missing from TAP output\n')
884 return plan_start + 1
885 # Test data for the next test to emit
891 if test_name is None:
893 subunit.write("test %s\n" % test_name)
895 subunit.write("%s %s\n" % (result, test_name))
897 subunit.write("%s %s [\n" % (result, test_name))
900 subunit.write("%s\n" % line)
904 if state == BEFORE_PLAN:
905 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
908 _, plan_stop, comment = match.groups()
909 plan_stop = int(plan_stop)
910 if plan_start > plan_stop and plan_stop == 0:
913 subunit.write("test file skip\n")
914 subunit.write("skip file skip [\n")
915 subunit.write("%s\n" % comment)
918 # not a plan line, or have seen one before
919 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
921 # new test, emit current one.
923 status, number, description, directive, directive_comment = match.groups()
928 if description is None:
931 description = ' ' + description
932 if directive is not None:
933 if directive.upper() == 'TODO':
935 elif directive.upper() == 'SKIP':
937 if directive_comment is not None:
938 log.append(directive_comment)
939 if number is not None:
941 while plan_start < number:
942 plan_start = _skipped_test(subunit, plan_start)
943 test_name = "test %d%s" % (plan_start, description)
946 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
948 reason, = match.groups()
952 extra = ' %s' % reason
954 test_name = "Bail out!%s" % extra
958 match = re.match("\#.*\n", line)
960 log.append(line[:-1])
964 while plan_start <= plan_stop:
965 # record missed tests
966 plan_start = _skipped_test(subunit, plan_start)
970 def tag_stream(original, filtered, tags):
971 """Alter tags on a stream.
973 :param original: The input stream.
974 :param filtered: The output stream.
975 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
978 A 'TAG' command will add the tag to the output stream,
979 and override any existing '-TAG' command in that stream.
981 * A global 'tags: TAG' will be added to the start of the stream.
982 * Any tags commands with -TAG will have the -TAG removed.
984 A '-TAG' command will remove the TAG command from the stream.
986 * A 'tags: -TAG' command will be added to the start of the stream.
987 * Any 'tags: TAG' command will have 'TAG' removed from it.
988 Additionally, any redundant tagging commands (adding a tag globally
989 present, or removing a tag globally removed) are stripped as a
990 by-product of the filtering.
993 new_tags, gone_tags = tags_to_new_gone(tags)
994 def write_tags(new_tags, gone_tags):
995 if new_tags or gone_tags:
996 filtered.write("tags: " + ' '.join(new_tags))
998 for tag in gone_tags:
999 filtered.write("-" + tag)
1000 filtered.write("\n")
1001 write_tags(new_tags, gone_tags)
1002 # TODO: use the protocol parser and thus don't mangle test comments.
1003 for line in original:
1004 if line.startswith("tags:"):
1005 line_tags = line[5:].split()
1006 line_new, line_gone = tags_to_new_gone(line_tags)
1007 line_new = line_new - gone_tags
1008 line_gone = line_gone - new_tags
1009 write_tags(line_new, line_gone)
1011 filtered.write(line)
1015 class ProtocolTestCase(object):
1016 """Subunit wire protocol to unittest.TestCase adapter.
1018 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1019 calling a ProtocolTestCase or invoking the run() method will make a 'test
1020 run' happen. The 'test run' will simply be a replay of the test activity
1021 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1022 and ``countTestCases`` methods are not supported because there isn't a
1023 sensible mapping for those methods.
1025 # Get a stream (any object with a readline() method), in this case the
1026 # stream output by the example from ``subunit.TestProtocolClient``.
1027 stream = file('tests.log', 'rb')
1028 # Create a parser which will read from the stream and emit
1029 # activity to a unittest.TestResult when run() is called.
1030 suite = subunit.ProtocolTestCase(stream)
1031 # Create a result object to accept the contents of that stream.
1032 result = unittest._TextTestResult(sys.stdout)
1033 # 'run' the tests - process the stream and feed its contents to result.
1037 :seealso: TestProtocolServer (the subunit wire protocol parser).
1040 def __init__(self, stream, passthrough=None, forward=False):
1041 """Create a ProtocolTestCase reading from stream.
1043 :param stream: A filelike object which a subunit stream can be read
1045 :param passthrough: A stream pass non subunit input on to. If not
1046 supplied, the TestProtocolServer default is used.
1047 :param forward: A stream to pass subunit input on to. If not supplied
1048 subunit input is not forwarded.
1050 self._stream = stream
1051 _make_stream_binary(stream)
1052 self._passthrough = passthrough
1053 self._forward = forward
1054 _make_stream_binary(forward)
1056 def __call__(self, result=None):
1057 return self.run(result)
1059 def run(self, result=None):
1061 result = self.defaultTestResult()
1062 protocol = TestProtocolServer(result, self._passthrough, self._forward)
1063 line = self._stream.readline()
1065 protocol.lineReceived(line)
1066 line = self._stream.readline()
1067 protocol.lostConnection()
1070 class TestResultStats(unittest.TestResult):
1071 """A pyunit TestResult interface implementation for making statistics.
1073 :ivar total_tests: The total tests seen.
1074 :ivar passed_tests: The tests that passed.
1075 :ivar failed_tests: The tests that failed.
1076 :ivar seen_tags: The tags seen across all tests.
1079 def __init__(self, stream):
1080 """Create a TestResultStats which outputs to stream."""
1081 unittest.TestResult.__init__(self)
1082 self._stream = stream
1083 self.failed_tests = 0
1084 self.skipped_tests = 0
1085 self.seen_tags = set()
1088 def total_tests(self):
1089 return self.testsRun
1091 def addError(self, test, err, details=None):
1092 self.failed_tests += 1
1094 def addFailure(self, test, err, details=None):
1095 self.failed_tests += 1
1097 def addSkip(self, test, reason, details=None):
1098 self.skipped_tests += 1
1100 def formatStats(self):
1101 self._stream.write("Total tests: %5d\n" % self.total_tests)
1102 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1103 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1104 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1105 tags = sorted(self.seen_tags)
1106 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1109 def passed_tests(self):
1110 return self.total_tests - self.failed_tests - self.skipped_tests
1112 def tags(self, new_tags, gone_tags):
1113 """Accumulate the seen tags."""
1114 self.seen_tags.update(new_tags)
1116 def wasSuccessful(self):
1117 """Tells whether or not this result was a success"""
1118 return self.failed_tests == 0
1121 def get_default_formatter():
1122 """Obtain the default formatter to write to.
1124 :return: A file-like object.
1126 formatter = os.getenv("SUBUNIT_FORMATTER")
1128 return os.popen(formatter, "w")
1133 def _make_stream_binary(stream):
1134 """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
1135 if getattr(stream, 'fileno', None) is not None:
1136 _make_binary_on_windows(stream.fileno())
1138 def _make_binary_on_windows(fileno):
1139 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1140 if sys.platform == "win32":
1142 msvcrt.setmode(fileno, os.O_BINARY)