2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
82 $ python -m subunit.run mylib.tests.test_suite
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
94 class AggregateTests(subunit.ExecTestCase):
96 def test_script_one(self):
99 def test_script_two(self):
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105 AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
125 from io import UnsupportedOperation as _UnsupportedOperation
127 _UnsupportedOperation = AttributeError
129 from extras import safe_hasattr
130 from testtools import content, content_type, ExtendedToOriginalDecorator
131 from testtools.content import TracebackContent
132 from testtools.compat import _b, _u, BytesIO, StringIO
134 from testtools.testresult.real import _StringException
135 RemoteException = _StringException
136 # For testing: different pythons have different str() implementations.
137 if sys.version_info > (3, 0):
138 _remote_exception_str = "testtools.testresult.real._StringException"
139 _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
141 _remote_exception_str = "_StringException"
142 _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
144 raise ImportError ("testtools.testresult.real does not contain "
145 "_StringException, check your version.")
146 from testtools import testresult, CopyStreamResult
148 from subunit import chunked, details, iso8601, test_results
149 from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes
151 # same format as sys.version_info: "A tuple containing the five components of
152 # the version number: major, minor, micro, releaselevel, and serial. All
153 # values except releaselevel are integers; the release level is 'alpha',
154 # 'beta', 'candidate', or 'final'. The version_info value corresponding to the
155 # Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a
156 # releaselevel of 'dev' for unreleased under-development code.
158 # If the releaselevel is 'alpha' then the major/minor/micro components are not
159 # established at this point, and setup.py will use a version of next-$(revno).
160 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
161 # Otherwise it is major.minor.micro~$(revno).
163 __version__ = (0, 0, 13, 'final', 0)
173 return subunit.tests.test_suite()
176 def join_dir(base_path, path):
178 Returns an absolute path to C{path}, calculated relative to the parent
181 @param base_path: A path to a file or directory.
182 @param path: An absolute path, or a path relative to the containing
183 directory of C{base_path}.
185 @return: An absolute path to C{path}.
187 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
190 def tags_to_new_gone(tags):
191 """Split a list of tags into a new_set and a gone_set."""
196 gone_tags.add(tag[1:])
199 return new_tags, gone_tags
202 class DiscardStream(object):
203 """A filelike object which discards what is written to it."""
206 raise _UnsupportedOperation()
208 def write(self, bytes):
211 def read(self, len=0):
215 class _ParserState(object):
216 """State for the subunit parser."""
218 def __init__(self, parser):
220 self._test_sym = (_b('test'), _b('testing'))
221 self._colon_sym = _b(':')
222 self._error_sym = (_b('error'),)
223 self._failure_sym = (_b('failure'),)
224 self._progress_sym = (_b('progress'),)
225 self._skip_sym = _b('skip')
226 self._success_sym = (_b('success'), _b('successful'))
227 self._tags_sym = (_b('tags'),)
228 self._time_sym = (_b('time'),)
229 self._xfail_sym = (_b('xfail'),)
230 self._uxsuccess_sym = (_b('uxsuccess'),)
231 self._start_simple = _u(" [")
232 self._start_multipart = _u(" [ multipart")
234 def addError(self, offset, line):
235 """An 'error:' directive has been read."""
236 self.parser.stdOutLineReceived(line)
238 def addExpectedFail(self, offset, line):
239 """An 'xfail:' directive has been read."""
240 self.parser.stdOutLineReceived(line)
242 def addFailure(self, offset, line):
243 """A 'failure:' directive has been read."""
244 self.parser.stdOutLineReceived(line)
246 def addSkip(self, offset, line):
247 """A 'skip:' directive has been read."""
248 self.parser.stdOutLineReceived(line)
250 def addSuccess(self, offset, line):
251 """A 'success:' directive has been read."""
252 self.parser.stdOutLineReceived(line)
254 def lineReceived(self, line):
255 """a line has been received."""
256 parts = line.split(None, 1)
257 if len(parts) == 2 and line.startswith(parts[0]):
259 offset = len(cmd) + 1
260 cmd = cmd.rstrip(self._colon_sym)
261 if cmd in self._test_sym:
262 self.startTest(offset, line)
263 elif cmd in self._error_sym:
264 self.addError(offset, line)
265 elif cmd in self._failure_sym:
266 self.addFailure(offset, line)
267 elif cmd in self._progress_sym:
268 self.parser._handleProgress(offset, line)
269 elif cmd in self._skip_sym:
270 self.addSkip(offset, line)
271 elif cmd in self._success_sym:
272 self.addSuccess(offset, line)
273 elif cmd in self._tags_sym:
274 self.parser._handleTags(offset, line)
275 self.parser.subunitLineReceived(line)
276 elif cmd in self._time_sym:
277 self.parser._handleTime(offset, line)
278 self.parser.subunitLineReceived(line)
279 elif cmd in self._xfail_sym:
280 self.addExpectedFail(offset, line)
281 elif cmd in self._uxsuccess_sym:
282 self.addUnexpectedSuccess(offset, line)
284 self.parser.stdOutLineReceived(line)
286 self.parser.stdOutLineReceived(line)
288 def lostConnection(self):
289 """Connection lost."""
290 self.parser._lostConnectionInTest(_u('unknown state of '))
292 def startTest(self, offset, line):
293 """A test start command received."""
294 self.parser.stdOutLineReceived(line)
297 class _InTest(_ParserState):
298 """State for the subunit parser after reading a test: directive."""
300 def _outcome(self, offset, line, no_details, details_state):
301 """An outcome directive has been read.
303 :param no_details: Callable to call when no details are presented.
304 :param details_state: The state to switch to for details
305 processing of this outcome.
307 test_name = line[offset:-1].decode('utf8')
308 if self.parser.current_test_description == test_name:
309 self.parser._state = self.parser._outside_test
310 self.parser.current_test_description = None
312 self.parser.client.stopTest(self.parser._current_test)
313 self.parser._current_test = None
314 self.parser.subunitLineReceived(line)
315 elif self.parser.current_test_description + self._start_simple == \
317 self.parser._state = details_state
318 details_state.set_simple()
319 self.parser.subunitLineReceived(line)
320 elif self.parser.current_test_description + self._start_multipart == \
322 self.parser._state = details_state
323 details_state.set_multipart()
324 self.parser.subunitLineReceived(line)
326 self.parser.stdOutLineReceived(line)
329 self.parser.client.addError(self.parser._current_test,
332 def addError(self, offset, line):
333 """An 'error:' directive has been read."""
334 self._outcome(offset, line, self._error,
335 self.parser._reading_error_details)
338 self.parser.client.addExpectedFailure(self.parser._current_test,
341 def addExpectedFail(self, offset, line):
342 """An 'xfail:' directive has been read."""
343 self._outcome(offset, line, self._xfail,
344 self.parser._reading_xfail_details)
346 def _uxsuccess(self):
347 self.parser.client.addUnexpectedSuccess(self.parser._current_test)
349 def addUnexpectedSuccess(self, offset, line):
350 """A 'uxsuccess:' directive has been read."""
351 self._outcome(offset, line, self._uxsuccess,
352 self.parser._reading_uxsuccess_details)
355 self.parser.client.addFailure(self.parser._current_test, details={})
357 def addFailure(self, offset, line):
358 """A 'failure:' directive has been read."""
359 self._outcome(offset, line, self._failure,
360 self.parser._reading_failure_details)
363 self.parser.client.addSkip(self.parser._current_test, details={})
365 def addSkip(self, offset, line):
366 """A 'skip:' directive has been read."""
367 self._outcome(offset, line, self._skip,
368 self.parser._reading_skip_details)
371 self.parser.client.addSuccess(self.parser._current_test, details={})
373 def addSuccess(self, offset, line):
374 """A 'success:' directive has been read."""
375 self._outcome(offset, line, self._succeed,
376 self.parser._reading_success_details)
378 def lostConnection(self):
379 """Connection lost."""
380 self.parser._lostConnectionInTest(_u(''))
383 class _OutSideTest(_ParserState):
384 """State for the subunit parser outside of a test context."""
386 def lostConnection(self):
387 """Connection lost."""
389 def startTest(self, offset, line):
390 """A test start command received."""
391 self.parser._state = self.parser._in_test
392 test_name = line[offset:-1].decode('utf8')
393 self.parser._current_test = RemotedTestCase(test_name)
394 self.parser.current_test_description = test_name
395 self.parser.client.startTest(self.parser._current_test)
396 self.parser.subunitLineReceived(line)
399 class _ReadingDetails(_ParserState):
400 """Common logic for readin state details."""
402 def endDetails(self):
403 """The end of a details section has been reached."""
404 self.parser._state = self.parser._outside_test
405 self.parser.current_test_description = None
406 self._report_outcome()
407 self.parser.client.stopTest(self.parser._current_test)
409 def lineReceived(self, line):
410 """a line has been received."""
411 self.details_parser.lineReceived(line)
412 self.parser.subunitLineReceived(line)
414 def lostConnection(self):
415 """Connection lost."""
416 self.parser._lostConnectionInTest(_u('%s report of ') %
417 self._outcome_label())
419 def _outcome_label(self):
420 """The label to describe this outcome."""
421 raise NotImplementedError(self._outcome_label)
423 def set_simple(self):
424 """Start a simple details parser."""
425 self.details_parser = details.SimpleDetailsParser(self)
427 def set_multipart(self):
428 """Start a multipart details parser."""
429 self.details_parser = details.MultipartDetailsParser(self)
432 class _ReadingFailureDetails(_ReadingDetails):
433 """State for the subunit parser when reading failure details."""
435 def _report_outcome(self):
436 self.parser.client.addFailure(self.parser._current_test,
437 details=self.details_parser.get_details())
439 def _outcome_label(self):
443 class _ReadingErrorDetails(_ReadingDetails):
444 """State for the subunit parser when reading error details."""
446 def _report_outcome(self):
447 self.parser.client.addError(self.parser._current_test,
448 details=self.details_parser.get_details())
450 def _outcome_label(self):
454 class _ReadingExpectedFailureDetails(_ReadingDetails):
455 """State for the subunit parser when reading xfail details."""
457 def _report_outcome(self):
458 self.parser.client.addExpectedFailure(self.parser._current_test,
459 details=self.details_parser.get_details())
461 def _outcome_label(self):
465 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
466 """State for the subunit parser when reading uxsuccess details."""
468 def _report_outcome(self):
469 self.parser.client.addUnexpectedSuccess(self.parser._current_test,
470 details=self.details_parser.get_details())
472 def _outcome_label(self):
476 class _ReadingSkipDetails(_ReadingDetails):
477 """State for the subunit parser when reading skip details."""
479 def _report_outcome(self):
480 self.parser.client.addSkip(self.parser._current_test,
481 details=self.details_parser.get_details("skip"))
483 def _outcome_label(self):
487 class _ReadingSuccessDetails(_ReadingDetails):
488 """State for the subunit parser when reading success details."""
490 def _report_outcome(self):
491 self.parser.client.addSuccess(self.parser._current_test,
492 details=self.details_parser.get_details("success"))
494 def _outcome_label(self):
498 class TestProtocolServer(object):
499 """A parser for subunit.
501 :ivar tags: The current tags associated with the protocol stream.
504 def __init__(self, client, stream=None, forward_stream=None):
505 """Create a TestProtocolServer instance.
507 :param client: An object meeting the unittest.TestResult protocol.
508 :param stream: The stream that lines received which are not part of the
509 subunit protocol should be written to. This allows custom handling
510 of mixed protocols. By default, sys.stdout will be used for
511 convenience. It should accept bytes to its write() method.
512 :param forward_stream: A stream to forward subunit lines to. This
513 allows a filter to forward the entire stream while still parsing
514 and acting on it. By default forward_stream is set to
515 DiscardStream() and no forwarding happens.
517 self.client = ExtendedToOriginalDecorator(client)
520 if sys.version_info > (3, 0):
521 stream = stream.buffer
522 self._stream = stream
523 self._forward_stream = forward_stream or DiscardStream()
524 # state objects we can switch too
525 self._in_test = _InTest(self)
526 self._outside_test = _OutSideTest(self)
527 self._reading_error_details = _ReadingErrorDetails(self)
528 self._reading_failure_details = _ReadingFailureDetails(self)
529 self._reading_skip_details = _ReadingSkipDetails(self)
530 self._reading_success_details = _ReadingSuccessDetails(self)
531 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
532 self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
533 # start with outside test.
534 self._state = self._outside_test
535 # Avoid casts on every call
536 self._plusminus = _b('+-')
537 self._push_sym = _b('push')
538 self._pop_sym = _b('pop')
540 def _handleProgress(self, offset, line):
541 """Process a progress directive."""
542 line = line[offset:].strip()
543 if line[0] in self._plusminus:
544 whence = PROGRESS_CUR
546 elif line == self._push_sym:
547 whence = PROGRESS_PUSH
549 elif line == self._pop_sym:
550 whence = PROGRESS_POP
553 whence = PROGRESS_SET
555 self.client.progress(delta, whence)
557 def _handleTags(self, offset, line):
558 """Process a tags command."""
559 tags = line[offset:].decode('utf8').split()
560 new_tags, gone_tags = tags_to_new_gone(tags)
561 self.client.tags(new_tags, gone_tags)
563 def _handleTime(self, offset, line):
564 # Accept it, but do not do anything with it yet.
566 event_time = iso8601.parse_date(line[offset:-1])
568 raise TypeError(_u("Failed to parse %r, got %r")
569 % (line, sys.exec_info[1]))
570 self.client.time(event_time)
572 def lineReceived(self, line):
573 """Call the appropriate local method for the received line."""
574 self._state.lineReceived(line)
576 def _lostConnectionInTest(self, state_string):
577 error_string = _u("lost connection during %stest '%s'") % (
578 state_string, self.current_test_description)
579 self.client.addError(self._current_test, RemoteError(error_string))
580 self.client.stopTest(self._current_test)
582 def lostConnection(self):
583 """The input connection has finished."""
584 self._state.lostConnection()
586 def readFrom(self, pipe):
587 """Blocking convenience API to parse an entire stream.
589 :param pipe: A file-like object supporting readlines().
592 for line in pipe.readlines():
593 self.lineReceived(line)
594 self.lostConnection()
596 def _startTest(self, offset, line):
597 """Internal call to change state machine. Override startTest()."""
598 self._state.startTest(offset, line)
600 def subunitLineReceived(self, line):
601 self._forward_stream.write(line)
603 def stdOutLineReceived(self, line):
604 self._stream.write(line)
607 class TestProtocolClient(testresult.TestResult):
608 """A TestResult which generates a subunit stream for a test run.
610 # Get a TestSuite or TestCase to run
612 # Create a stream (any object with a 'write' method). This should accept
613 # bytes not strings: subunit is a byte orientated protocol.
614 stream = file('tests.log', 'wb')
615 # Create a subunit result object which will output to the stream
616 result = subunit.TestProtocolClient(stream)
617 # Optionally, to get timing data for performance analysis, wrap the
618 # serialiser with a timing decorator
619 result = subunit.test_results.AutoTimingTestResultDecorator(result)
620 # Run the test suite reporting to the subunit result object
626 def __init__(self, stream):
627 testresult.TestResult.__init__(self)
628 stream = make_stream_binary(stream)
629 self._stream = stream
630 self._progress_fmt = _b("progress: ")
631 self._bytes_eol = _b("\n")
632 self._progress_plus = _b("+")
633 self._progress_push = _b("push")
634 self._progress_pop = _b("pop")
635 self._empty_bytes = _b("")
636 self._start_simple = _b(" [\n")
637 self._end_simple = _b("]\n")
639 def addError(self, test, error=None, details=None):
640 """Report an error in test test.
642 Only one of error and details should be provided: conceptually there
643 are two separate methods:
644 addError(self, test, error)
645 addError(self, test, details)
647 :param error: Standard unittest positional argument form - an
649 :param details: New Testing-in-python drafted API; a dict from string
650 to subunit.Content objects.
652 self._addOutcome("error", test, error=error, details=details)
656 def addExpectedFailure(self, test, error=None, details=None):
657 """Report an expected failure in test test.
659 Only one of error and details should be provided: conceptually there
660 are two separate methods:
661 addError(self, test, error)
662 addError(self, test, details)
664 :param error: Standard unittest positional argument form - an
666 :param details: New Testing-in-python drafted API; a dict from string
667 to subunit.Content objects.
669 self._addOutcome("xfail", test, error=error, details=details)
671 def addFailure(self, test, error=None, details=None):
672 """Report a failure in test test.
674 Only one of error and details should be provided: conceptually there
675 are two separate methods:
676 addFailure(self, test, error)
677 addFailure(self, test, details)
679 :param error: Standard unittest positional argument form - an
681 :param details: New Testing-in-python drafted API; a dict from string
682 to subunit.Content objects.
684 self._addOutcome("failure", test, error=error, details=details)
688 def _addOutcome(self, outcome, test, error=None, details=None,
689 error_permitted=True):
690 """Report a failure in test test.
692 Only one of error and details should be provided: conceptually there
693 are two separate methods:
694 addOutcome(self, test, error)
695 addOutcome(self, test, details)
697 :param outcome: A string describing the outcome - used as the
698 event name in the subunit stream.
699 :param error: Standard unittest positional argument form - an
701 :param details: New Testing-in-python drafted API; a dict from string
702 to subunit.Content objects.
703 :param error_permitted: If True then one and only one of error or
704 details must be supplied. If False then error must not be supplied
705 and details is still optional. """
706 self._stream.write(_b("%s: " % outcome) + self._test_id(test))
708 if error is None and details is None:
711 if error is not None:
713 if error is not None:
714 self._stream.write(self._start_simple)
715 tb_content = TracebackContent(error, test)
716 for bytes in tb_content.iter_bytes():
717 self._stream.write(bytes)
718 elif details is not None:
719 self._write_details(details)
721 self._stream.write(_b("\n"))
722 if details is not None or error is not None:
723 self._stream.write(self._end_simple)
725 def addSkip(self, test, reason=None, details=None):
726 """Report a skipped test."""
728 self._addOutcome("skip", test, error=None, details=details)
730 self._stream.write(_b("skip: %s [\n" % test.id()))
731 self._stream.write(_b("%s\n" % reason))
732 self._stream.write(self._end_simple)
734 def addSuccess(self, test, details=None):
735 """Report a success in a test."""
736 self._addOutcome("successful", test, details=details, error_permitted=False)
738 def addUnexpectedSuccess(self, test, details=None):
739 """Report an unexpected success in test test.
741 Details can optionally be provided: conceptually there
742 are two separate methods:
744 addError(self, test, details)
746 :param details: New Testing-in-python drafted API; a dict from string
747 to subunit.Content objects.
749 self._addOutcome("uxsuccess", test, details=details,
750 error_permitted=False)
754 def _test_id(self, test):
756 if type(result) is not bytes:
757 result = result.encode('utf8')
760 def startTest(self, test):
761 """Mark a test as starting its test run."""
762 super(TestProtocolClient, self).startTest(test)
763 self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
766 def stopTest(self, test):
767 super(TestProtocolClient, self).stopTest(test)
770 def progress(self, offset, whence):
771 """Provide indication about the progress/length of the test run.
773 :param offset: Information about the number of tests remaining. If
774 whence is PROGRESS_CUR, then offset increases/decreases the
775 remaining test count. If whence is PROGRESS_SET, then offset
776 specifies exactly the remaining test count.
777 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
780 if whence == PROGRESS_CUR and offset > -1:
781 prefix = self._progress_plus
782 offset = _b(str(offset))
783 elif whence == PROGRESS_PUSH:
784 prefix = self._empty_bytes
785 offset = self._progress_push
786 elif whence == PROGRESS_POP:
787 prefix = self._empty_bytes
788 offset = self._progress_pop
790 prefix = self._empty_bytes
791 offset = _b(str(offset))
792 self._stream.write(self._progress_fmt + prefix + offset +
795 def tags(self, new_tags, gone_tags):
796 """Inform the client about tags added/removed from the stream."""
797 if not new_tags and not gone_tags:
799 tags = set([tag.encode('utf8') for tag in new_tags])
800 tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
801 tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
802 self._stream.write(tag_line)
804 def time(self, a_datetime):
805 """Inform the client of the time.
807 ":param datetime: A datetime.datetime object.
809 time = a_datetime.astimezone(iso8601.Utc())
810 self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
811 time.year, time.month, time.day, time.hour, time.minute,
812 time.second, time.microsecond)))
814 def _write_details(self, details):
815 """Output details to the stream.
817 :param details: An extended details dict for a test outcome.
819 self._stream.write(_b(" [ multipart\n"))
820 for name, content in sorted(details.items()):
821 self._stream.write(_b("Content-Type: %s/%s" %
822 (content.content_type.type, content.content_type.subtype)))
823 parameters = content.content_type.parameters
825 self._stream.write(_b(";"))
827 for param, value in parameters.items():
828 param_strs.append("%s=%s" % (param, value))
829 self._stream.write(_b(",".join(param_strs)))
830 self._stream.write(_b("\n%s\n" % name))
831 encoder = chunked.Encoder(self._stream)
832 list(map(encoder.write, content.iter_bytes()))
836 """Obey the testtools result.done() interface."""
839 def RemoteError(description=_u("")):
840 return (_StringException, _StringException(description), None)
843 class RemotedTestCase(unittest.TestCase):
844 """A class to represent test cases run in child processes.
846 Instances of this class are used to provide the Python test API a TestCase
847 that can be printed to the screen, introspected for metadata and so on.
848 However, as they are a simply a memoisation of a test that was actually
849 run in the past by a separate process, they cannot perform any interactive
853 def __eq__ (self, other):
855 return self.__description == other.__description
856 except AttributeError:
859 def __init__(self, description):
860 """Create a psuedo test case with description description."""
861 self.__description = description
863 def error(self, label):
864 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
871 self.error("tearDown")
873 def shortDescription(self):
874 return self.__description
877 return "%s" % (self.__description,)
880 return "%s (%s)" % (self.__description, self._strclass())
883 return "<%s description='%s'>" % \
884 (self._strclass(), self.__description)
886 def run(self, result=None):
887 if result is None: result = self.defaultTestResult()
888 result.startTest(self)
889 result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
890 result.stopTest(self)
894 return "%s.%s" % (cls.__module__, cls.__name__)
897 class ExecTestCase(unittest.TestCase):
898 """A test case which runs external scripts for test fixtures."""
900 def __init__(self, methodName='runTest'):
901 """Create an instance of the class that will use the named test
902 method when executed. Raises a ValueError if the instance does
903 not have a method with the specified name.
905 unittest.TestCase.__init__(self, methodName)
906 testMethod = getattr(self, methodName)
907 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
910 def countTestCases(self):
913 def run(self, result=None):
914 if result is None: result = self.defaultTestResult()
918 """Run the test without collecting errors in a TestResult"""
919 self._run(testresult.TestResult())
921 def _run(self, result):
922 protocol = TestProtocolServer(result)
923 process = subprocess.Popen(self.script, shell=True,
924 stdout=subprocess.PIPE)
925 make_stream_binary(process.stdout)
926 output = process.communicate()[0]
927 protocol.readFrom(BytesIO(output))
930 class IsolatedTestCase(unittest.TestCase):
931 """A TestCase which executes in a forked process.
933 Each test gets its own process, which has a performance overhead but will
934 provide excellent isolation from global state (such as django configs,
935 zope utilities and so on).
938 def run(self, result=None):
939 if result is None: result = self.defaultTestResult()
940 run_isolated(unittest.TestCase, self, result)
943 class IsolatedTestSuite(unittest.TestSuite):
944 """A TestSuite which runs its tests in a forked process.
946 This decorator that will fork() before running the tests and report the
947 results from the child process using a Subunit stream. This is useful for
948 handling tests that mutate global state, or are testing C extensions that
952 def run(self, result=None):
953 if result is None: result = testresult.TestResult()
954 run_isolated(unittest.TestSuite, self, result)
957 def run_isolated(klass, self, result):
958 """Run a test suite or case in a subprocess, using the run method on klass.
960 c2pread, c2pwrite = os.pipe()
961 # fixme - error -> result
966 # Close parent's pipe ends
973 # at this point, sys.stdin is redirected, now we want
974 # to filter it to escape ]'s.
975 ### XXX: test and write that bit.
976 stream = os.fdopen(1, 'wb')
977 result = TestProtocolClient(stream)
978 klass.run(self, result)
981 # exit HARD, exit NOW.
985 # Close child pipe ends
987 # hookup a protocol engine
988 protocol = TestProtocolServer(result)
989 fileobj = os.fdopen(c2pread, 'rb')
990 protocol.readFrom(fileobj)
992 # TODO return code evaluation.
996 def TAP2SubUnit(tap, output_stream):
997 """Filter a TAP pipe into a subunit pipe.
999 This should be invoked once per TAP script, as TAP scripts get
1000 mapped to a single runnable case with multiple components.
1002 :param tap: A tap pipe/stream/file object - should emit unicode strings.
1003 :param subunit: A pipe/stream/file object to write subunit results to.
1004 :return: The exit code to exit with.
1006 output = StreamResultToBytes(output_stream)
1007 UTF8_TEXT = 'text/plain; charset=UTF8'
1014 # Test data for the next test to emit
1018 def missing_test(plan_start):
1019 output.status(test_id='test %d' % plan_start,
1020 test_status='fail', runnable=False,
1021 mime_type=UTF8_TEXT, eof=True, file_name="tap meta",
1022 file_bytes=b"test missing from TAP output")
1025 if test_name is None:
1028 log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log)
1029 mime_type = UTF8_TEXT
1030 file_name = 'tap comment'
1038 output.status(test_id=test_name, test_status=result,
1039 file_bytes=log_bytes, mime_type=mime_type, eof=eof,
1040 file_name=file_name, runnable=False)
1042 if state == BEFORE_PLAN:
1043 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
1046 _, plan_stop, comment = match.groups()
1047 plan_stop = int(plan_stop)
1048 if plan_start > plan_stop and plan_stop == 0:
1051 output.status(test_id='file skip', test_status='skip',
1052 file_bytes=comment.encode('utf8'), eof=True,
1053 file_name='tap comment')
1055 # not a plan line, or have seen one before
1056 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1058 # new test, emit current one.
1060 status, number, description, directive, directive_comment = match.groups()
1065 if description is None:
1068 description = ' ' + description
1069 if directive is not None:
1070 if directive.upper() == 'TODO':
1072 elif directive.upper() == 'SKIP':
1074 if directive_comment is not None:
1075 log.append(directive_comment)
1076 if number is not None:
1077 number = int(number)
1078 while plan_start < number:
1079 missing_test(plan_start)
1081 test_name = "test %d%s" % (plan_start, description)
1084 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1086 reason, = match.groups()
1090 extra = ' %s' % reason
1092 test_name = "Bail out!%s" % extra
1096 match = re.match("\#.*\n", line)
1098 log.append(line[:-1])
1100 # Should look at buffering status and binding this to the prior result.
1101 output.status(file_bytes=line.encode('utf8'), file_name='stdout',
1102 mime_type=UTF8_TEXT)
1104 while plan_start <= plan_stop:
1105 # record missed tests
1106 missing_test(plan_start)
1111 def tag_stream(original, filtered, tags):
1112 """Alter tags on a stream.
1114 :param original: The input stream.
1115 :param filtered: The output stream.
1116 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1119 A 'TAG' command will add the tag to the output stream,
1120 and override any existing '-TAG' command in that stream.
1122 * A global 'tags: TAG' will be added to the start of the stream.
1123 * Any tags commands with -TAG will have the -TAG removed.
1125 A '-TAG' command will remove the TAG command from the stream.
1127 * A 'tags: -TAG' command will be added to the start of the stream.
1128 * Any 'tags: TAG' command will have 'TAG' removed from it.
1129 Additionally, any redundant tagging commands (adding a tag globally
1130 present, or removing a tag globally removed) are stripped as a
1131 by-product of the filtering.
1134 new_tags, gone_tags = tags_to_new_gone(tags)
1135 source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
1136 class Tagger(CopyStreamResult):
1137 def status(self, **kwargs):
1138 tags = kwargs.get('test_tags')
1141 tags.update(new_tags)
1142 tags.difference_update(gone_tags)
1144 kwargs['test_tags'] = tags
1146 kwargs['test_tags'] = None
1147 super(Tagger, self).status(**kwargs)
1148 output = Tagger([StreamResultToBytes(filtered)])
1153 class ProtocolTestCase(object):
1154 """Subunit wire protocol to unittest.TestCase adapter.
1156 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1157 calling a ProtocolTestCase or invoking the run() method will make a 'test
1158 run' happen. The 'test run' will simply be a replay of the test activity
1159 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1160 and ``countTestCases`` methods are not supported because there isn't a
1161 sensible mapping for those methods.
1163 # Get a stream (any object with a readline() method), in this case the
1164 # stream output by the example from ``subunit.TestProtocolClient``.
1165 stream = file('tests.log', 'rb')
1166 # Create a parser which will read from the stream and emit
1167 # activity to a unittest.TestResult when run() is called.
1168 suite = subunit.ProtocolTestCase(stream)
1169 # Create a result object to accept the contents of that stream.
1170 result = unittest._TextTestResult(sys.stdout)
1171 # 'run' the tests - process the stream and feed its contents to result.
1175 :seealso: TestProtocolServer (the subunit wire protocol parser).
1178 def __init__(self, stream, passthrough=None, forward=None):
1179 """Create a ProtocolTestCase reading from stream.
1181 :param stream: A filelike object which a subunit stream can be read
1183 :param passthrough: A stream pass non subunit input on to. If not
1184 supplied, the TestProtocolServer default is used.
1185 :param forward: A stream to pass subunit input on to. If not supplied
1186 subunit input is not forwarded.
1188 stream = make_stream_binary(stream)
1189 self._stream = stream
1190 self._passthrough = passthrough
1191 if forward is not None:
1192 forward = make_stream_binary(forward)
1193 self._forward = forward
1195 def __call__(self, result=None):
1196 return self.run(result)
1198 def run(self, result=None):
1200 result = self.defaultTestResult()
1201 protocol = TestProtocolServer(result, self._passthrough, self._forward)
1202 line = self._stream.readline()
1204 protocol.lineReceived(line)
1205 line = self._stream.readline()
1206 protocol.lostConnection()
1209 class TestResultStats(testresult.TestResult):
1210 """A pyunit TestResult interface implementation for making statistics.
1212 :ivar total_tests: The total tests seen.
1213 :ivar passed_tests: The tests that passed.
1214 :ivar failed_tests: The tests that failed.
1215 :ivar seen_tags: The tags seen across all tests.
1218 def __init__(self, stream):
1219 """Create a TestResultStats which outputs to stream."""
1220 testresult.TestResult.__init__(self)
1221 self._stream = stream
1222 self.failed_tests = 0
1223 self.skipped_tests = 0
1224 self.seen_tags = set()
1227 def total_tests(self):
1228 return self.testsRun
1230 def addError(self, test, err, details=None):
1231 self.failed_tests += 1
1233 def addFailure(self, test, err, details=None):
1234 self.failed_tests += 1
1236 def addSkip(self, test, reason, details=None):
1237 self.skipped_tests += 1
1239 def formatStats(self):
1240 self._stream.write("Total tests: %5d\n" % self.total_tests)
1241 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1242 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1243 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1244 tags = sorted(self.seen_tags)
1245 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1248 def passed_tests(self):
1249 return self.total_tests - self.failed_tests - self.skipped_tests
1251 def tags(self, new_tags, gone_tags):
1252 """Accumulate the seen tags."""
1253 self.seen_tags.update(new_tags)
1255 def wasSuccessful(self):
1256 """Tells whether or not this result was a success"""
1257 return self.failed_tests == 0
1260 def get_default_formatter():
1261 """Obtain the default formatter to write to.
1263 :return: A file-like object.
1265 formatter = os.getenv("SUBUNIT_FORMATTER")
1267 return os.popen(formatter, "w")
1270 if sys.version_info > (3, 0):
1271 if safe_hasattr(stream, 'buffer'):
1272 stream = stream.buffer
1276 def read_test_list(path):
1277 """Read a list of test ids from a file on disk.
1279 :param path: Path to the file
1280 :return: Sequence of test ids
1282 f = open(path, 'rb')
1284 return [l.rstrip("\n") for l in f.readlines()]
1289 def make_stream_binary(stream):
1290 """Ensure that a stream will be binary safe. See _make_binary_on_windows.
1292 :return: A binary version of the same stream (some streams cannot be
1293 'fixed' but can be unwrapped).
1296 fileno = stream.fileno()
1297 except (_UnsupportedOperation, AttributeError):
1300 _make_binary_on_windows(fileno)
1301 return _unwrap_text(stream)
1304 def _make_binary_on_windows(fileno):
1305 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1306 if sys.platform == "win32":
1308 msvcrt.setmode(fileno, os.O_BINARY)
1311 def _unwrap_text(stream):
1312 """Unwrap stream if it is a text stream to get the original buffer."""
1313 if sys.version_info > (3, 0):
1316 unicode_type = unicode
1319 if type(stream.read(0)) is unicode_type:
1320 return stream.buffer
1321 except (_UnsupportedOperation, IOError):
1322 # Cannot read from the stream: try via writes
1324 stream.write(_b(''))
1326 return stream.buffer