2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``,
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
81 $ python -m subunit.run mylib.tests.test_suite
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
89 `ExecTestCase`` is a convenience wrapper for running an external
90 program to get a Subunit stream and then report that back to an arbitrary
93 class AggregateTests(subunit.ExecTestCase):
95 def test_script_one(self):
98 def test_script_two(self):
101 # Normally your normal test loading would take of this automatically,
102 # It is only spelt out in detail here for clarity.
103 suite = unittest.TestSuite([AggregateTests("test_script_one"),
104 AggregateTests("test_script_two")])
105 # Create any TestResult class you like.
106 result = unittest._TextTestResult(sys.stdout)
107 # And run your suite as normal, Subunit will exec each external script as
108 # needed and report to your result object.
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
122 from StringIO import StringIO
129 import chunked, content, content_type
140 return subunit.tests.test_suite()
143 def join_dir(base_path, path):
145 Returns an absolute path to C{path}, calculated relative to the parent
148 @param base_path: A path to a file or directory.
149 @param path: An absolute path, or a path relative to the containing
150 directory of C{base_path}.
152 @return: An absolute path to C{path}.
154 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
157 def tags_to_new_gone(tags):
158 """Split a list of tags into a new_set and a gone_set."""
163 gone_tags.add(tag[1:])
166 return new_tags, gone_tags
169 class DiscardStream(object):
170 """A filelike object which discards what is written to it."""
172 def write(self, bytes):
176 class _ParserState(object):
177 """State for the subunit parser."""
179 def __init__(self, parser):
182 def addError(self, offset, line):
183 """An 'error:' directive has been read."""
184 self.parser.stdOutLineReceived(line)
186 def addExpectedFail(self, offset, line):
187 """An 'xfail:' directive has been read."""
188 self.parser.stdOutLineReceived(line)
190 def addFailure(self, offset, line):
191 """A 'failure:' directive has been read."""
192 self.parser.stdOutLineReceived(line)
194 def addSkip(self, offset, line):
195 """A 'skip:' directive has been read."""
196 self.parser.stdOutLineReceived(line)
198 def addSuccess(self, offset, line):
199 """A 'success:' directive has been read."""
200 self.parser.stdOutLineReceived(line)
202 def startTest(self, offset, line):
203 """A test start command received."""
204 self.parser.stdOutLineReceived(line)
207 class _InTest(_ParserState):
208 """State for the subunit parser after reading a test: directive."""
210 def addError(self, offset, line):
211 """An 'error:' directive has been read."""
212 if self.parser.current_test_description == line[offset:-1]:
213 self.parser._state = self.parser._outside_test
214 self.parser.state = TestProtocolServer.STATE_OBJECT
215 self.parser.current_test_description = None
216 self.parser.client.addError(self.parser._current_test, RemoteError(""))
217 self.parser.client.stopTest(self.parser._current_test)
218 self.parser._current_test = None
219 elif self.parser.current_test_description + " [" == line[offset:-1]:
220 self.parser.state = TestProtocolServer.READING_ERROR
221 self.parser._message = ""
223 self.parser.stdOutLineReceived(line)
225 def addExpectedFail(self, offset, line):
226 """An 'xfail:' directive has been read."""
227 if self.parser.current_test_description == line[offset:-1]:
228 self.parser._state = self.parser._outside_test
229 self.parser.state = TestProtocolServer.STATE_OBJECT
230 self.parser.current_test_description = None
231 xfail = getattr(self.parser.client, 'addExpectedFailure', None)
233 xfail(self.parser._current_test, RemoteError())
235 self.parser.client.addSuccess(self.parser._current_test)
236 self.parser.client.stopTest(self.parser._current_test)
237 elif self.parser.current_test_description + " [" == line[offset:-1]:
238 self.parser.state = TestProtocolServer.READING_XFAIL
239 self.parser._message = ""
241 self.parser.stdOutLineReceived(line)
243 def addFailure(self, offset, line):
244 """A 'failure:' directive has been read."""
245 if self.parser.current_test_description == line[offset:-1]:
246 self.parser._state = self.parser._outside_test
247 self.parser.state = TestProtocolServer.STATE_OBJECT
248 self.parser.current_test_description = None
249 self.parser.client.addFailure(self.parser._current_test, RemoteError())
250 self.parser.client.stopTest(self.parser._current_test)
251 elif self.parser.current_test_description + " [" == line[offset:-1]:
252 self.parser.state = TestProtocolServer.READING_FAILURE
253 self.parser._message = ""
255 self.parser.stdOutLineReceived(line)
257 def addSkip(self, offset, line):
258 """A 'skip:' directive has been read."""
259 if self.parser.current_test_description == line[offset:-1]:
260 self.parser._state = self.parser._outside_test
261 self.parser.state = TestProtocolServer.STATE_OBJECT
262 self.parser.current_test_description = None
263 self.parser._skip_or_error()
264 self.parser.client.stopTest(self.parser._current_test)
265 elif self.parser.current_test_description + " [" == line[offset:-1]:
266 self.parser.state = TestProtocolServer.READING_SKIP
267 self.parser._message = ""
269 self.parser.stdOutLineReceived(line)
271 def addSuccess(self, offset, line):
272 """A 'success:' directive has been read."""
273 if self.parser.current_test_description == line[offset:-1]:
274 self.parser._succeedTest()
275 elif self.parser.current_test_description + " [" == line[offset:-1]:
276 self.parser.state = TestProtocolServer.READING_SUCCESS
277 self.parser._message = ""
279 self.parser.stdOutLineReceived(line)
281 def lostConnection(self):
282 """Connection lost."""
283 self.parser._lostConnectionInTest('')
286 class _OutSideTest(_ParserState):
287 """State for the subunit parser outside of a test context."""
289 def lostConnection(self):
290 """Connection lost."""
292 def startTest(self, offset, line):
293 """A test start command received."""
294 self.parser._state = self.parser._in_test
295 self.parser.state = TestProtocolServer.STATE_OBJECT
296 self.parser._current_test = RemotedTestCase(line[offset:-1])
297 self.parser.current_test_description = line[offset:-1]
298 self.parser.client.startTest(self.parser._current_test)
301 class TestProtocolServer(object):
302 """A parser for subunit.
304 :ivar tags: The current tags associated with the protocol stream.
315 def __init__(self, client, stream=None):
316 """Create a TestProtocolServer instance.
318 :param client: An object meeting the unittest.TestResult protocol.
319 :param stream: The stream that lines received which are not part of the
320 subunit protocol should be written to. This allows custom handling
321 of mixed protocols. By default, sys.stdout will be used for
327 self._stream = stream
328 # state objects we can switch too
329 self._in_test = _InTest(self)
330 self._outside_test = _OutSideTest(self)
331 # start with outside test.
332 self._state = self._outside_test
333 self.state = TestProtocolServer.STATE_OBJECT
335 def _addError(self, offset, line):
336 if self.state in TestProtocolServer.STATE_OBJECTS:
337 self._state.addError(offset, line)
339 self.stdOutLineReceived(line)
341 def _addExpectedFail(self, offset, line):
342 if self.state in TestProtocolServer.STATE_OBJECTS:
343 self._state.addExpectedFail(offset, line)
345 self.stdOutLineReceived(line)
347 def _addFailure(self, offset, line):
348 if self.state in TestProtocolServer.STATE_OBJECTS:
349 self._state.addFailure(offset, line)
351 self.stdOutLineReceived(line)
353 def _addSkip(self, offset, line):
354 if self.state in TestProtocolServer.STATE_OBJECTS:
355 self._state.addSkip(offset, line)
357 self.stdOutLineReceived(line)
359 def _skip_or_error(self, message=None):
360 """Report the current test as a skip if possible, or else an error."""
361 addSkip = getattr(self.client, 'addSkip', None)
362 if not callable(addSkip):
363 self.client.addError(self._current_test, RemoteError(message))
366 message = "No reason given"
367 addSkip(self._current_test, message)
369 def _addSuccess(self, offset, line):
370 if self.state in TestProtocolServer.STATE_OBJECTS:
371 self._state.addSuccess(offset, line)
373 self.stdOutLineReceived(line)
375 def _appendMessage(self, line):
376 if line[0:2] == " ]":
378 self._message += line[1:]
380 self._message += line
382 def endQuote(self, line):
383 if self.state == TestProtocolServer.READING_FAILURE:
384 self._state = self._outside_test
385 self.state = TestProtocolServer.STATE_OBJECT
386 self.current_test_description = None
387 self.client.addFailure(self._current_test,
388 RemoteError(self._message))
389 self.client.stopTest(self._current_test)
390 elif self.state == TestProtocolServer.READING_ERROR:
391 self._state = self._outside_test
392 self.state = TestProtocolServer.STATE_OBJECT
393 self.current_test_description = None
394 self.client.addError(self._current_test,
395 RemoteError(self._message))
396 self.client.stopTest(self._current_test)
397 elif self.state == TestProtocolServer.READING_SKIP:
398 self._state = self._outside_test
399 self.state = TestProtocolServer.STATE_OBJECT
400 self.current_test_description = None
401 self._skip_or_error(self._message)
402 self.client.stopTest(self._current_test)
403 elif self.state == TestProtocolServer.READING_XFAIL:
404 self._state = self._outside_test
405 self.state = TestProtocolServer.STATE_OBJECT
406 self.current_test_description = None
407 xfail = getattr(self.client, 'addExpectedFailure', None)
409 xfail(self._current_test, RemoteError(self._message))
411 self.client.addSuccess(self._current_test)
412 self.client.stopTest(self._current_test)
413 elif self.state == TestProtocolServer.READING_SUCCESS:
416 self.stdOutLineReceived(line)
418 def _handleProgress(self, offset, line):
419 """Process a progress directive."""
420 line = line[offset:].strip()
422 whence = PROGRESS_CUR
425 whence = PROGRESS_PUSH
428 whence = PROGRESS_POP
431 whence = PROGRESS_SET
433 progress_method = getattr(self.client, 'progress', None)
434 if callable(progress_method):
435 progress_method(delta, whence)
437 def _handleTags(self, offset, line):
438 """Process a tags command."""
439 tags = line[offset:].split()
440 new_tags, gone_tags = tags_to_new_gone(tags)
441 tags_method = getattr(self.client, 'tags', None)
442 if tags_method is not None:
443 tags_method(new_tags, gone_tags)
445 def _handleTime(self, offset, line):
446 # Accept it, but do not do anything with it yet.
448 event_time = iso8601.parse_date(line[offset:-1])
450 raise TypeError("Failed to parse %r, got %r" % (line, e))
451 time_method = getattr(self.client, 'time', None)
452 if callable(time_method):
453 time_method(event_time)
455 def lineReceived(self, line):
456 """Call the appropriate local method for the received line."""
459 elif self.state in (TestProtocolServer.READING_FAILURE,
460 TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP,
461 TestProtocolServer.READING_SUCCESS,
462 TestProtocolServer.READING_XFAIL
464 self._appendMessage(line)
466 parts = line.split(None, 1)
469 offset = len(cmd) + 1
471 if cmd in ('test', 'testing'):
472 self._startTest(offset, line)
474 self._addError(offset, line)
475 elif cmd == 'failure':
476 self._addFailure(offset, line)
477 elif cmd == 'progress':
478 self._handleProgress(offset, line)
480 self._addSkip(offset, line)
481 elif cmd in ('success', 'successful'):
482 self._addSuccess(offset, line)
483 elif cmd in ('tags',):
484 self._handleTags(offset, line)
485 elif cmd in ('time',):
486 self._handleTime(offset, line)
488 self._addExpectedFail(offset, line)
490 self.stdOutLineReceived(line)
492 self.stdOutLineReceived(line)
494 def _lostConnectionInTest(self, state_string):
495 error_string = "lost connection during %stest '%s'" % (
496 state_string, self.current_test_description)
497 self.client.addError(self._current_test, RemoteError(error_string))
498 self.client.stopTest(self._current_test)
500 def lostConnection(self):
501 """The input connection has finished."""
502 if self.state in TestProtocolServer.STATE_OBJECTS:
503 self._state.lostConnection()
505 if self.state == TestProtocolServer.READING_ERROR:
506 self._lostConnectionInTest('error report of ')
507 elif self.state == TestProtocolServer.READING_FAILURE:
508 self._lostConnectionInTest('failure report of ')
509 elif self.state == TestProtocolServer.READING_SUCCESS:
510 self._lostConnectionInTest('success report of ')
511 elif self.state == TestProtocolServer.READING_SKIP:
512 self._lostConnectionInTest('skip report of ')
513 elif self.state == TestProtocolServer.READING_XFAIL:
514 self._lostConnectionInTest('xfail report of ')
516 self._lostConnectionInTest('unknown state of ')
518 def readFrom(self, pipe):
519 for line in pipe.readlines():
520 self.lineReceived(line)
521 self.lostConnection()
523 def _startTest(self, offset, line):
524 """Internal call to change state machine. Override startTest()."""
525 if self.state in TestProtocolServer.STATE_OBJECTS:
526 self._state.startTest(offset, line)
528 self.stdOutLineReceived(line)
530 def stdOutLineReceived(self, line):
531 self._stream.write(line)
533 def _succeedTest(self):
534 self.client.addSuccess(self._current_test)
535 self.client.stopTest(self._current_test)
536 self.current_test_description = None
537 self._current_test = None
538 self._state = self._outside_test
539 self.state = TestProtocolServer.STATE_OBJECT
542 class RemoteException(Exception):
543 """An exception that occured remotely to Python."""
545 def __eq__(self, other):
547 return self.args == other.args
548 except AttributeError:
552 class TestProtocolClient(unittest.TestResult):
553 """A TestResult which generates a subunit stream for a test run.
555 # Get a TestSuite or TestCase to run
557 # Create a stream (any object with a 'write' method)
558 stream = file('tests.log', 'wb')
559 # Create a subunit result object which will output to the stream
560 result = subunit.TestProtocolClient(stream)
561 # Optionally, to get timing data for performance analysis, wrap the
562 # serialiser with a timing decorator
563 result = subunit.test_results.AutoTimingTestResultDecorator(result)
564 # Run the test suite reporting to the subunit result object
570 def __init__(self, stream):
571 unittest.TestResult.__init__(self)
572 self._stream = stream
574 def addError(self, test, error=None, details=None):
575 """Report an error in test test.
577 Only one of error and details should be provided: conceptually there
578 are two separate methods:
579 addError(self, test, error)
580 addError(self, test, details)
582 :param error: Standard unittest positional argument form - an
584 :param details: New Testing-in-python drafted API; a dict from string
585 to subunit.Content objects.
587 self._addOutcome("error", test, error=error, details=details)
589 def addExpectedFailure(self, test, error=None, details=None):
590 """Report an expected failure in test test.
592 Only one of error and details should be provided: conceptually there
593 are two separate methods:
594 addError(self, test, error)
595 addError(self, test, details)
597 :param error: Standard unittest positional argument form - an
599 :param details: New Testing-in-python drafted API; a dict from string
600 to subunit.Content objects.
602 self._addOutcome("xfail", test, error=error, details=details)
604 def addFailure(self, test, error=None, details=None):
605 """Report a failure in test test.
607 Only one of error and details should be provided: conceptually there
608 are two separate methods:
609 addFailure(self, test, error)
610 addFailure(self, test, details)
612 :param error: Standard unittest positional argument form - an
614 :param details: New Testing-in-python drafted API; a dict from string
615 to subunit.Content objects.
617 self._addOutcome("failure", test, error=error, details=details)
619 def _addOutcome(self, outcome, test, error=None, details=None):
620 """Report a failure in test test.
622 Only one of error and details should be provided: conceptually there
623 are two separate methods:
624 addOutcome(self, test, error)
625 addOutcome(self, test, details)
627 :param outcome: A string describing the outcome - used as the
628 event name in the subunit stream.
629 :param error: Standard unittest positional argument form - an
631 :param details: New Testing-in-python drafted API; a dict from string
632 to subunit.Content objects.
634 self._stream.write("%s: %s" % (outcome, test.id()))
635 if error is None and details is None:
637 if error is not None:
638 self._stream.write(" [\n")
639 for line in self._exc_info_to_string(error, test).splitlines():
640 self._stream.write("%s\n" % line)
642 self._write_details(details)
643 self._stream.write("]\n")
645 def addSkip(self, test, reason=None, details=None):
646 """Report a skipped test."""
648 self._addOutcome("skip", test, error=None, details=details)
650 self._stream.write("skip: %s [\n" % test.id())
651 self._stream.write("%s\n" % reason)
652 self._stream.write("]\n")
654 def addSuccess(self, test, details=None):
655 """Report a success in a test."""
656 self._stream.write("successful: %s" % test.id())
658 self._stream.write("\n")
660 self._write_details(details)
661 self._stream.write("]\n")
662 addUnexpectedSuccess = addSuccess
664 def startTest(self, test):
665 """Mark a test as starting its test run."""
666 self._stream.write("test: %s\n" % test.id())
668 def progress(self, offset, whence):
669 """Provide indication about the progress/length of the test run.
671 :param offset: Information about the number of tests remaining. If
672 whence is PROGRESS_CUR, then offset increases/decreases the
673 remaining test count. If whence is PROGRESS_SET, then offset
674 specifies exactly the remaining test count.
675 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
678 if whence == PROGRESS_CUR and offset > -1:
680 elif whence == PROGRESS_PUSH:
683 elif whence == PROGRESS_POP:
688 self._stream.write("progress: %s%s\n" % (prefix, offset))
690 def time(self, a_datetime):
691 """Inform the client of the time.
693 ":param datetime: A datetime.datetime object.
695 time = a_datetime.astimezone(iso8601.Utc())
696 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
697 time.year, time.month, time.day, time.hour, time.minute,
698 time.second, time.microsecond))
700 def _write_details(self, details):
701 """Output details to the stream.
703 :param details: An extended details dict for a test outcome.
705 self._stream.write(" [ multipart\n")
706 for name, content in sorted(details.iteritems()):
707 self._stream.write("Content-Type: %s/%s" %
708 (content.content_type.type, content.content_type.subtype))
709 parameters = content.content_type.parameters
711 self._stream.write(";")
713 for param, value in parameters.iteritems():
714 param_strs.append("%s=%s" % (param, value))
715 self._stream.write(",".join(param_strs))
716 self._stream.write("\n%s\n" % name)
717 encoder = chunked.Encoder(self._stream)
718 map(encoder.write, content.iter_bytes())
722 """Obey the testtools result.done() interface."""
725 def RemoteError(description=""):
726 if description == "":
728 return (RemoteException, RemoteException(description), None)
731 class RemotedTestCase(unittest.TestCase):
732 """A class to represent test cases run in child processes.
734 Instances of this class are used to provide the Python test API a TestCase
735 that can be printed to the screen, introspected for metadata and so on.
736 However, as they are a simply a memoisation of a test that was actually
737 run in the past by a separate process, they cannot perform any interactive
741 def __eq__ (self, other):
743 return self.__description == other.__description
744 except AttributeError:
747 def __init__(self, description):
748 """Create a psuedo test case with description description."""
749 self.__description = description
751 def error(self, label):
752 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
759 self.error("tearDown")
761 def shortDescription(self):
762 return self.__description
765 return "%s" % (self.__description,)
768 return "%s (%s)" % (self.__description, self._strclass())
771 return "<%s description='%s'>" % \
772 (self._strclass(), self.__description)
774 def run(self, result=None):
775 if result is None: result = self.defaultTestResult()
776 result.startTest(self)
777 result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
778 result.stopTest(self)
782 return "%s.%s" % (cls.__module__, cls.__name__)
785 class ExecTestCase(unittest.TestCase):
786 """A test case which runs external scripts for test fixtures."""
788 def __init__(self, methodName='runTest'):
789 """Create an instance of the class that will use the named test
790 method when executed. Raises a ValueError if the instance does
791 not have a method with the specified name.
793 unittest.TestCase.__init__(self, methodName)
794 testMethod = getattr(self, methodName)
795 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
798 def countTestCases(self):
801 def run(self, result=None):
802 if result is None: result = self.defaultTestResult()
806 """Run the test without collecting errors in a TestResult"""
807 self._run(unittest.TestResult())
809 def _run(self, result):
810 protocol = TestProtocolServer(result)
811 output = subprocess.Popen(self.script, shell=True,
812 stdout=subprocess.PIPE).communicate()[0]
813 protocol.readFrom(StringIO(output))
816 class IsolatedTestCase(unittest.TestCase):
817 """A TestCase which executes in a forked process.
819 Each test gets its own process, which has a performance overhead but will
820 provide excellent isolation from global state (such as django configs,
821 zope utilities and so on).
824 def run(self, result=None):
825 if result is None: result = self.defaultTestResult()
826 run_isolated(unittest.TestCase, self, result)
829 class IsolatedTestSuite(unittest.TestSuite):
830 """A TestSuite which runs its tests in a forked process.
832 This decorator that will fork() before running the tests and report the
833 results from the child process using a Subunit stream. This is useful for
834 handling tests that mutate global state, or are testing C extensions that
838 def run(self, result=None):
839 if result is None: result = unittest.TestResult()
840 run_isolated(unittest.TestSuite, self, result)
843 def run_isolated(klass, self, result):
844 """Run a test suite or case in a subprocess, using the run method on klass.
846 c2pread, c2pwrite = os.pipe()
847 # fixme - error -> result
852 # Close parent's pipe ends
859 # at this point, sys.stdin is redirected, now we want
860 # to filter it to escape ]'s.
861 ### XXX: test and write that bit.
863 result = TestProtocolClient(sys.stdout)
864 klass.run(self, result)
867 # exit HARD, exit NOW.
871 # Close child pipe ends
873 # hookup a protocol engine
874 protocol = TestProtocolServer(result)
875 protocol.readFrom(os.fdopen(c2pread, 'rU'))
877 # TODO return code evaluation.
881 def TAP2SubUnit(tap, subunit):
882 """Filter a TAP pipe into a subunit pipe.
884 :param tap: A tap pipe/stream/file object.
885 :param subunit: A pipe/stream/file object to write subunit results to.
886 :return: The exit code to exit with.
891 client = TestProtocolClient(subunit)
895 def _skipped_test(subunit, plan_start):
896 # Some tests were skipped.
897 subunit.write('test test %d\n' % plan_start)
898 subunit.write('error test %d [\n' % plan_start)
899 subunit.write('test missing from TAP output\n')
901 return plan_start + 1
902 # Test data for the next test to emit
908 if test_name is None:
910 subunit.write("test %s\n" % test_name)
912 subunit.write("%s %s\n" % (result, test_name))
914 subunit.write("%s %s [\n" % (result, test_name))
917 subunit.write("%s\n" % line)
921 if state == BEFORE_PLAN:
922 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
925 _, plan_stop, comment = match.groups()
926 plan_stop = int(plan_stop)
927 if plan_start > plan_stop and plan_stop == 0:
930 subunit.write("test file skip\n")
931 subunit.write("skip file skip [\n")
932 subunit.write("%s\n" % comment)
935 # not a plan line, or have seen one before
936 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
938 # new test, emit current one.
940 status, number, description, directive, directive_comment = match.groups()
945 if description is None:
948 description = ' ' + description
949 if directive is not None:
950 if directive == 'TODO':
952 elif directive == 'SKIP':
954 if directive_comment is not None:
955 log.append(directive_comment)
956 if number is not None:
958 while plan_start < number:
959 plan_start = _skipped_test(subunit, plan_start)
960 test_name = "test %d%s" % (plan_start, description)
963 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
965 reason, = match.groups()
969 extra = ' %s' % reason
971 test_name = "Bail out!%s" % extra
975 match = re.match("\#.*\n", line)
977 log.append(line[:-1])
981 while plan_start <= plan_stop:
982 # record missed tests
983 plan_start = _skipped_test(subunit, plan_start)
987 def tag_stream(original, filtered, tags):
988 """Alter tags on a stream.
990 :param original: The input stream.
991 :param filtered: The output stream.
992 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
995 A 'TAG' command will add the tag to the output stream,
996 and override any existing '-TAG' command in that stream.
998 * A global 'tags: TAG' will be added to the start of the stream.
999 * Any tags commands with -TAG will have the -TAG removed.
1001 A '-TAG' command will remove the TAG command from the stream.
1003 * A 'tags: -TAG' command will be added to the start of the stream.
1004 * Any 'tags: TAG' command will have 'TAG' removed from it.
1005 Additionally, any redundant tagging commands (adding a tag globally
1006 present, or removing a tag globally removed) are stripped as a
1007 by-product of the filtering.
1010 new_tags, gone_tags = tags_to_new_gone(tags)
1011 def write_tags(new_tags, gone_tags):
1012 if new_tags or gone_tags:
1013 filtered.write("tags: " + ' '.join(new_tags))
1015 for tag in gone_tags:
1016 filtered.write("-" + tag)
1017 filtered.write("\n")
1018 write_tags(new_tags, gone_tags)
1019 # TODO: use the protocol parser and thus don't mangle test comments.
1020 for line in original:
1021 if line.startswith("tags:"):
1022 line_tags = line[5:].split()
1023 line_new, line_gone = tags_to_new_gone(line_tags)
1024 line_new = line_new - gone_tags
1025 line_gone = line_gone - new_tags
1026 write_tags(line_new, line_gone)
1028 filtered.write(line)
1032 class ProtocolTestCase(object):
1033 """Subunit wire protocol to unittest.TestCase adapter.
1035 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1036 calling a ProtocolTestCase or invoking the run() method will make a 'test
1037 run' happen. The 'test run' will simply be a replay of the test activity
1038 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1039 and ``countTestCases`` methods are not supported because there isn't a
1040 sensible mapping for those methods.
1042 # Get a stream (any object with a readline() method), in this case the
1043 # stream output by the example from ``subunit.TestProtocolClient``.
1044 stream = file('tests.log', 'rb')
1045 # Create a parser which will read from the stream and emit
1046 # activity to a unittest.TestResult when run() is called.
1047 suite = subunit.ProtocolTestCase(stream)
1048 # Create a result object to accept the contents of that stream.
1049 result = unittest._TextTestResult(sys.stdout)
1050 # 'run' the tests - process the stream and feed its contents to result.
1054 :seealso: TestProtocolServer (the subunit wire protocol parser).
1057 def __init__(self, stream, passthrough=None):
1058 """Create a ProtocolTestCase reading from stream.
1060 :param stream: A filelike object which a subunit stream can be read
1062 :param passthrough: A stream pass non subunit input on to. If not
1063 supplied, the TestProtocolServer default is used.
1065 self._stream = stream
1066 self._passthrough = passthrough
1068 def __call__(self, result=None):
1069 return self.run(result)
1071 def run(self, result=None):
1073 result = self.defaultTestResult()
1074 protocol = TestProtocolServer(result, self._passthrough)
1075 line = self._stream.readline()
1077 protocol.lineReceived(line)
1078 line = self._stream.readline()
1079 protocol.lostConnection()
1082 class TestResultStats(unittest.TestResult):
1083 """A pyunit TestResult interface implementation for making statistics.
1085 :ivar total_tests: The total tests seen.
1086 :ivar passed_tests: The tests that passed.
1087 :ivar failed_tests: The tests that failed.
1088 :ivar seen_tags: The tags seen across all tests.
1091 def __init__(self, stream):
1092 """Create a TestResultStats which outputs to stream."""
1093 unittest.TestResult.__init__(self)
1094 self._stream = stream
1095 self.failed_tests = 0
1096 self.skipped_tests = 0
1097 self.seen_tags = set()
1100 def total_tests(self):
1101 return self.testsRun
1103 def addError(self, test, err):
1104 self.failed_tests += 1
1106 def addFailure(self, test, err):
1107 self.failed_tests += 1
1109 def addSkip(self, test, reason):
1110 self.skipped_tests += 1
1112 def formatStats(self):
1113 self._stream.write("Total tests: %5d\n" % self.total_tests)
1114 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1115 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1116 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1117 tags = sorted(self.seen_tags)
1118 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1121 def passed_tests(self):
1122 return self.total_tests - self.failed_tests - self.skipped_tests
1124 def tags(self, new_tags, gone_tags):
1125 """Accumulate the seen tags."""
1126 self.seen_tags.update(new_tags)
1128 def wasSuccessful(self):
1129 """Tells whether or not this result was a success"""
1130 return self.failed_tests == 0
1133 class TestResultFilter(unittest.TestResult):
1134 """A pyunit TestResult interface implementation which filters tests.
1136 Tests that pass the filter are handed on to another TestResult instance
1137 for further processing/reporting. To obtain the filtered results,
1138 the other instance must be interrogated.
1140 :ivar result: The result that tests are passed to after filtering.
1141 :ivar filter_predicate: The callback run to decide whether to pass
1145 def __init__(self, result, filter_error=False, filter_failure=False,
1146 filter_success=True, filter_skip=False,
1147 filter_predicate=None):
1148 """Create a FilterResult object filtering to result.
1150 :param filter_error: Filter out errors.
1151 :param filter_failure: Filter out failures.
1152 :param filter_success: Filter out successful tests.
1153 :param filter_skip: Filter out skipped tests.
1154 :param filter_predicate: A callable taking (test, err) and
1155 returning True if the result should be passed through.
1156 err is None for success.
1158 unittest.TestResult.__init__(self)
1159 self.result = result
1160 self._filter_error = filter_error
1161 self._filter_failure = filter_failure
1162 self._filter_success = filter_success
1163 self._filter_skip = filter_skip
1164 if filter_predicate is None:
1165 filter_predicate = lambda test, err: True
1166 self.filter_predicate = filter_predicate
1167 # The current test (for filtering tags)
1168 self._current_test = None
1169 # Has the current test been filtered (for outputting test tags)
1170 self._current_test_filtered = None
1171 # The (new, gone) tags for the current test.
1172 self._current_test_tags = None
1174 def addError(self, test, err):
1175 if not self._filter_error and self.filter_predicate(test, err):
1176 self.result.startTest(test)
1177 self.result.addError(test, err)
1179 def addFailure(self, test, err):
1180 if not self._filter_failure and self.filter_predicate(test, err):
1181 self.result.startTest(test)
1182 self.result.addFailure(test, err)
1184 def addSkip(self, test, reason):
1185 if not self._filter_skip and self.filter_predicate(test, reason):
1186 self.result.startTest(test)
1187 # This is duplicated, it would be nice to have on a 'calls
1188 # TestResults' mixin perhaps.
1189 addSkip = getattr(self.result, 'addSkip', None)
1190 if not callable(addSkip):
1191 self.result.addError(test, RemoteError(reason))
1193 self.result.addSkip(test, reason)
1195 def addSuccess(self, test):
1196 if not self._filter_success and self.filter_predicate(test, None):
1197 self.result.startTest(test)
1198 self.result.addSuccess(test)
1200 def startTest(self, test):
1203 Not directly passed to the client, but used for handling of tags
1206 self._current_test = test
1207 self._current_test_filtered = False
1208 self._current_test_tags = set(), set()
1210 def stopTest(self, test):
1213 Not directly passed to the client, but used for handling of tags
1216 if not self._current_test_filtered:
1217 # Tags to output for this test.
1218 if self._current_test_tags[0] or self._current_test_tags[1]:
1219 tags_method = getattr(self.result, 'tags', None)
1220 if callable(tags_method):
1221 self.result.tags(*self._current_test_tags)
1222 self.result.stopTest(test)
1223 self._current_test = None
1224 self._current_test_filtered = None
1225 self._current_test_tags = None
1227 def tags(self, new_tags, gone_tags):
1228 """Handle tag instructions.
1230 Adds and removes tags as appropriate. If a test is currently running,
1231 tags are not affected for subsequent tests.
1233 :param new_tags: Tags to add,
1234 :param gone_tags: Tags to remove.
1236 if self._current_test is not None:
1237 # gather the tags until the test stops.
1238 self._current_test_tags[0].update(new_tags)
1239 self._current_test_tags[0].difference_update(gone_tags)
1240 self._current_test_tags[1].update(gone_tags)
1241 self._current_test_tags[1].difference_update(new_tags)
1242 tags_method = getattr(self.result, 'tags', None)
1243 if tags_method is None:
1245 return tags_method(new_tags, gone_tags)
1247 def id_to_orig_id(self, id):
1248 if id.startswith("subunit.RemotedTestCase."):
1249 return id[len("subunit.RemotedTestCase."):]