Support SUBUNIT_FORMATTER environment variable.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ++++++++
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
58
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``, 
62 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
64
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occured at, to allow reconstructing
68 test timing from a stream.
69
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
74
75
76 Python test support
77 -------------------
78
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
81
82   $ python -m subunit.run mylib.tests.test_suite
83
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
86
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
89
90 `ExecTestCase`` is a convenience wrapper for running an external 
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
93
94  class AggregateTests(subunit.ExecTestCase):
95
96      def test_script_one(self):
97          './bin/script_one'
98
99      def test_script_two(self):
100          './bin/script_two'
101  
102  # Normally your normal test loading would take of this automatically,
103  # It is only spelt out in detail here for clarity.
104  suite = unittest.TestSuite([AggregateTests("test_script_one"),
105      AggregateTests("test_script_two")])
106  # Create any TestResult class you like.
107  result = unittest._TextTestResult(sys.stdout)
108  # And run your suite as normal, Subunit will exec each external script as
109  # needed and report to your result object.
110  suite.run(result)
111
112 Utility modules
113 ---------------
114
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
117 """
118
119 import datetime
120 import os
121 import re
122 from StringIO import StringIO
123 import subprocess
124 import sys
125 import unittest
126
127 import iso8601
128 from testtools import content, content_type, ExtendedToOriginalDecorator
129 try:
130     from testtools.testresult.real import _StringException
131     RemoteException = _StringException
132     _remote_exception_str = '_StringException' # For testing.
133 except ImportError:
134     raise ImportError ("testtools.testresult.real does not contain "
135         "_StringException, check your version.")
136
137
138 from testtools.testresult.real import _StringException
139
140 import chunked, details, test_results
141
142
143 PROGRESS_SET = 0
144 PROGRESS_CUR = 1
145 PROGRESS_PUSH = 2
146 PROGRESS_POP = 3
147
148
149 def test_suite():
150     import subunit.tests
151     return subunit.tests.test_suite()
152
153
154 def join_dir(base_path, path):
155     """
156     Returns an absolute path to C{path}, calculated relative to the parent
157     of C{base_path}.
158
159     @param base_path: A path to a file or directory.
160     @param path: An absolute path, or a path relative to the containing
161     directory of C{base_path}.
162
163     @return: An absolute path to C{path}.
164     """
165     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
166
167
168 def tags_to_new_gone(tags):
169     """Split a list of tags into a new_set and a gone_set."""
170     new_tags = set()
171     gone_tags = set()
172     for tag in tags:
173         if tag[0] == '-':
174             gone_tags.add(tag[1:])
175         else:
176             new_tags.add(tag)
177     return new_tags, gone_tags
178
179
180 class DiscardStream(object):
181     """A filelike object which discards what is written to it."""
182
183     def write(self, bytes):
184         pass
185
186
187 class _ParserState(object):
188     """State for the subunit parser."""
189
190     def __init__(self, parser):
191         self.parser = parser
192
193     def addError(self, offset, line):
194         """An 'error:' directive has been read."""
195         self.parser.stdOutLineReceived(line)
196
197     def addExpectedFail(self, offset, line):
198         """An 'xfail:' directive has been read."""
199         self.parser.stdOutLineReceived(line)
200
201     def addFailure(self, offset, line):
202         """A 'failure:' directive has been read."""
203         self.parser.stdOutLineReceived(line)
204
205     def addSkip(self, offset, line):
206         """A 'skip:' directive has been read."""
207         self.parser.stdOutLineReceived(line)
208
209     def addSuccess(self, offset, line):
210         """A 'success:' directive has been read."""
211         self.parser.stdOutLineReceived(line)
212
213     def lineReceived(self, line):
214         """a line has been received."""
215         parts = line.split(None, 1)
216         if len(parts) == 2:
217             cmd, rest = parts
218             offset = len(cmd) + 1
219             cmd = cmd.strip(':')
220             if cmd in ('test', 'testing'):
221                 self.startTest(offset, line)
222             elif cmd == 'error':
223                 self.addError(offset, line)
224             elif cmd == 'failure':
225                 self.addFailure(offset, line)
226             elif cmd == 'progress':
227                 self.parser._handleProgress(offset, line)
228             elif cmd == 'skip':
229                 self.addSkip(offset, line)
230             elif cmd in ('success', 'successful'):
231                 self.addSuccess(offset, line)
232             elif cmd in ('tags',):
233                 self.parser._handleTags(offset, line)
234                 self.parser.subunitLineReceived(line)
235             elif cmd in ('time',):
236                 self.parser._handleTime(offset, line)
237                 self.parser.subunitLineReceived(line)
238             elif cmd == 'xfail':
239                 self.addExpectedFail(offset, line)
240             else:
241                 self.parser.stdOutLineReceived(line)
242         else:
243             self.parser.stdOutLineReceived(line)
244
245     def lostConnection(self):
246         """Connection lost."""
247         self.parser._lostConnectionInTest('unknown state of ')
248
249     def startTest(self, offset, line):
250         """A test start command received."""
251         self.parser.stdOutLineReceived(line)
252
253
254 class _InTest(_ParserState):
255     """State for the subunit parser after reading a test: directive."""
256
257     def _outcome(self, offset, line, no_details, details_state):
258         """An outcome directive has been read.
259         
260         :param no_details: Callable to call when no details are presented.
261         :param details_state: The state to switch to for details
262             processing of this outcome.
263         """
264         if self.parser.current_test_description == line[offset:-1]:
265             self.parser._state = self.parser._outside_test
266             self.parser.current_test_description = None
267             no_details()
268             self.parser.client.stopTest(self.parser._current_test)
269             self.parser._current_test = None
270             self.parser.subunitLineReceived(line)
271         elif self.parser.current_test_description + " [" == line[offset:-1]:
272             self.parser._state = details_state
273             details_state.set_simple()
274             self.parser.subunitLineReceived(line)
275         elif self.parser.current_test_description + " [ multipart" == \
276             line[offset:-1]:
277             self.parser._state = details_state
278             details_state.set_multipart()
279             self.parser.subunitLineReceived(line)
280         else:
281             self.parser.stdOutLineReceived(line)
282
283     def _error(self):
284         self.parser.client.addError(self.parser._current_test,
285             details={})
286
287     def addError(self, offset, line):
288         """An 'error:' directive has been read."""
289         self._outcome(offset, line, self._error,
290             self.parser._reading_error_details)
291
292     def _xfail(self):
293         self.parser.client.addExpectedFailure(self.parser._current_test,
294             details={})
295
296     def addExpectedFail(self, offset, line):
297         """An 'xfail:' directive has been read."""
298         self._outcome(offset, line, self._xfail,
299             self.parser._reading_xfail_details)
300
301     def _failure(self):
302         self.parser.client.addFailure(self.parser._current_test, details={})
303
304     def addFailure(self, offset, line):
305         """A 'failure:' directive has been read."""
306         self._outcome(offset, line, self._failure,
307             self.parser._reading_failure_details)
308
309     def _skip(self):
310         self.parser.client.addSkip(self.parser._current_test, details={})
311
312     def addSkip(self, offset, line):
313         """A 'skip:' directive has been read."""
314         self._outcome(offset, line, self._skip,
315             self.parser._reading_skip_details)
316
317     def _succeed(self):
318         self.parser.client.addSuccess(self.parser._current_test, details={})
319
320     def addSuccess(self, offset, line):
321         """A 'success:' directive has been read."""
322         self._outcome(offset, line, self._succeed,
323             self.parser._reading_success_details)
324
325     def lostConnection(self):
326         """Connection lost."""
327         self.parser._lostConnectionInTest('')
328
329
330 class _OutSideTest(_ParserState):
331     """State for the subunit parser outside of a test context."""
332
333     def lostConnection(self):
334         """Connection lost."""
335
336     def startTest(self, offset, line):
337         """A test start command received."""
338         self.parser._state = self.parser._in_test
339         self.parser._current_test = RemotedTestCase(line[offset:-1])
340         self.parser.current_test_description = line[offset:-1]
341         self.parser.client.startTest(self.parser._current_test)
342         self.parser.subunitLineReceived(line)
343
344
345 class _ReadingDetails(_ParserState):
346     """Common logic for readin state details."""
347
348     def endDetails(self):
349         """The end of a details section has been reached."""
350         self.parser._state = self.parser._outside_test
351         self.parser.current_test_description = None
352         self._report_outcome()
353         self.parser.client.stopTest(self.parser._current_test)
354
355     def lineReceived(self, line):
356         """a line has been received."""
357         self.details_parser.lineReceived(line)
358         self.parser.subunitLineReceived(line)
359
360     def lostConnection(self):
361         """Connection lost."""
362         self.parser._lostConnectionInTest('%s report of ' %
363             self._outcome_label())
364
365     def _outcome_label(self):
366         """The label to describe this outcome."""
367         raise NotImplementedError(self._outcome_label)
368
369     def set_simple(self):
370         """Start a simple details parser."""
371         self.details_parser = details.SimpleDetailsParser(self)
372
373     def set_multipart(self):
374         """Start a multipart details parser."""
375         self.details_parser = details.MultipartDetailsParser(self)
376
377
378 class _ReadingFailureDetails(_ReadingDetails):
379     """State for the subunit parser when reading failure details."""
380
381     def _report_outcome(self):
382         self.parser.client.addFailure(self.parser._current_test,
383             details=self.details_parser.get_details())
384
385     def _outcome_label(self):
386         return "failure"
387  
388
389 class _ReadingErrorDetails(_ReadingDetails):
390     """State for the subunit parser when reading error details."""
391
392     def _report_outcome(self):
393         self.parser.client.addError(self.parser._current_test,
394             details=self.details_parser.get_details())
395
396     def _outcome_label(self):
397         return "error"
398
399
400 class _ReadingExpectedFailureDetails(_ReadingDetails):
401     """State for the subunit parser when reading xfail details."""
402
403     def _report_outcome(self):
404         self.parser.client.addExpectedFailure(self.parser._current_test,
405             details=self.details_parser.get_details())
406
407     def _outcome_label(self):
408         return "xfail"
409
410
411 class _ReadingSkipDetails(_ReadingDetails):
412     """State for the subunit parser when reading skip details."""
413
414     def _report_outcome(self):
415         self.parser.client.addSkip(self.parser._current_test,
416             details=self.details_parser.get_details("skip"))
417
418     def _outcome_label(self):
419         return "skip"
420
421
422 class _ReadingSuccessDetails(_ReadingDetails):
423     """State for the subunit parser when reading success details."""
424
425     def _report_outcome(self):
426         self.parser.client.addSuccess(self.parser._current_test,
427             details=self.details_parser.get_details("success"))
428
429     def _outcome_label(self):
430         return "success"
431
432
433 class TestProtocolServer(object):
434     """A parser for subunit.
435     
436     :ivar tags: The current tags associated with the protocol stream.
437     """
438
439     def __init__(self, client, stream=None, forward_stream=None):
440         """Create a TestProtocolServer instance.
441
442         :param client: An object meeting the unittest.TestResult protocol.
443         :param stream: The stream that lines received which are not part of the
444             subunit protocol should be written to. This allows custom handling
445             of mixed protocols. By default, sys.stdout will be used for
446             convenience.
447         :param forward_stream: A stream to forward subunit lines to. This 
448             allows a filter to forward the entire stream while still parsing
449             and acting on it. By default forward_stream is set to
450             DiscardStream() and no forwarding happens.
451         """
452         self.client = ExtendedToOriginalDecorator(client)
453         if stream is None:
454             stream = sys.stdout
455         self._stream = stream
456         self._forward_stream = forward_stream or DiscardStream()
457         # state objects we can switch too
458         self._in_test = _InTest(self)
459         self._outside_test = _OutSideTest(self)
460         self._reading_error_details = _ReadingErrorDetails(self)
461         self._reading_failure_details = _ReadingFailureDetails(self)
462         self._reading_skip_details = _ReadingSkipDetails(self)
463         self._reading_success_details = _ReadingSuccessDetails(self)
464         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
465         # start with outside test.
466         self._state = self._outside_test
467
468     def _handleProgress(self, offset, line):
469         """Process a progress directive."""
470         line = line[offset:].strip()
471         if line[0] in '+-':
472             whence = PROGRESS_CUR
473             delta = int(line)
474         elif line == "push":
475             whence = PROGRESS_PUSH
476             delta = None
477         elif line == "pop":
478             whence = PROGRESS_POP
479             delta = None
480         else:
481             whence = PROGRESS_SET
482             delta = int(line)
483         self.client.progress(delta, whence)
484
485     def _handleTags(self, offset, line):
486         """Process a tags command."""
487         tags = line[offset:].split()
488         new_tags, gone_tags = tags_to_new_gone(tags)
489         self.client.tags(new_tags, gone_tags)
490
491     def _handleTime(self, offset, line):
492         # Accept it, but do not do anything with it yet.
493         try:
494             event_time = iso8601.parse_date(line[offset:-1])
495         except TypeError, e:
496             raise TypeError("Failed to parse %r, got %r" % (line, e))
497         self.client.time(event_time)
498
499     def lineReceived(self, line):
500         """Call the appropriate local method for the received line."""
501         self._state.lineReceived(line)
502
503     def _lostConnectionInTest(self, state_string):
504         error_string = "lost connection during %stest '%s'" % (
505             state_string, self.current_test_description)
506         self.client.addError(self._current_test, RemoteError(error_string))
507         self.client.stopTest(self._current_test)
508
509     def lostConnection(self):
510         """The input connection has finished."""
511         self._state.lostConnection()
512
513     def readFrom(self, pipe):
514         """Blocking convenience API to parse an entire stream.
515         
516         :param pipe: A file-like object supporting readlines().
517         :return: None.
518         """
519         for line in pipe.readlines():
520             self.lineReceived(line)
521         self.lostConnection()
522
523     def _startTest(self, offset, line):
524         """Internal call to change state machine. Override startTest()."""
525         self._state.startTest(offset, line)
526
527     def subunitLineReceived(self, line):
528         self._forward_stream.write(line)
529
530     def stdOutLineReceived(self, line):
531         self._stream.write(line)
532
533
534 class TestProtocolClient(unittest.TestResult):
535     """A TestResult which generates a subunit stream for a test run.
536     
537     # Get a TestSuite or TestCase to run
538     suite = make_suite()
539     # Create a stream (any object with a 'write' method)
540     stream = file('tests.log', 'wb')
541     # Create a subunit result object which will output to the stream
542     result = subunit.TestProtocolClient(stream)
543     # Optionally, to get timing data for performance analysis, wrap the
544     # serialiser with a timing decorator
545     result = subunit.test_results.AutoTimingTestResultDecorator(result)
546     # Run the test suite reporting to the subunit result object
547     suite.run(result)
548     # Close the stream.
549     stream.close()
550     """
551
552     def __init__(self, stream):
553         unittest.TestResult.__init__(self)
554         self._stream = stream
555
556     def addError(self, test, error=None, details=None):
557         """Report an error in test test.
558         
559         Only one of error and details should be provided: conceptually there
560         are two separate methods:
561             addError(self, test, error)
562             addError(self, test, details)
563
564         :param error: Standard unittest positional argument form - an
565             exc_info tuple.
566         :param details: New Testing-in-python drafted API; a dict from string
567             to subunit.Content objects.
568         """
569         self._addOutcome("error", test, error=error, details=details)
570
571     def addExpectedFailure(self, test, error=None, details=None):
572         """Report an expected failure in test test.
573         
574         Only one of error and details should be provided: conceptually there
575         are two separate methods:
576             addError(self, test, error)
577             addError(self, test, details)
578
579         :param error: Standard unittest positional argument form - an
580             exc_info tuple.
581         :param details: New Testing-in-python drafted API; a dict from string
582             to subunit.Content objects.
583         """
584         self._addOutcome("xfail", test, error=error, details=details)
585
586     def addFailure(self, test, error=None, details=None):
587         """Report a failure in test test.
588         
589         Only one of error and details should be provided: conceptually there
590         are two separate methods:
591             addFailure(self, test, error)
592             addFailure(self, test, details)
593
594         :param error: Standard unittest positional argument form - an
595             exc_info tuple.
596         :param details: New Testing-in-python drafted API; a dict from string
597             to subunit.Content objects.
598         """
599         self._addOutcome("failure", test, error=error, details=details)
600
601     def _addOutcome(self, outcome, test, error=None, details=None):
602         """Report a failure in test test.
603         
604         Only one of error and details should be provided: conceptually there
605         are two separate methods:
606             addOutcome(self, test, error)
607             addOutcome(self, test, details)
608
609         :param outcome: A string describing the outcome - used as the
610             event name in the subunit stream.
611         :param error: Standard unittest positional argument form - an
612             exc_info tuple.
613         :param details: New Testing-in-python drafted API; a dict from string
614             to subunit.Content objects.
615         """
616         self._stream.write("%s: %s" % (outcome, test.id()))
617         if error is None and details is None:
618             raise ValueError
619         if error is not None:
620             self._stream.write(" [\n")
621             for line in self._exc_info_to_string(error, test).splitlines():
622                 self._stream.write("%s\n" % line)
623         else:
624             self._write_details(details)
625         self._stream.write("]\n")
626
627     def addSkip(self, test, reason=None, details=None):
628         """Report a skipped test."""
629         if reason is None:
630             self._addOutcome("skip", test, error=None, details=details)
631         else:
632             self._stream.write("skip: %s [\n" % test.id())
633             self._stream.write("%s\n" % reason)
634             self._stream.write("]\n")
635
636     def addSuccess(self, test, details=None):
637         """Report a success in a test."""
638         self._stream.write("successful: %s" % test.id())
639         if not details:
640             self._stream.write("\n")
641         else:
642             self._write_details(details)
643             self._stream.write("]\n")
644     addUnexpectedSuccess = addSuccess
645
646     def startTest(self, test):
647         """Mark a test as starting its test run."""
648         self._stream.write("test: %s\n" % test.id())
649
650     def progress(self, offset, whence):
651         """Provide indication about the progress/length of the test run.
652
653         :param offset: Information about the number of tests remaining. If
654             whence is PROGRESS_CUR, then offset increases/decreases the
655             remaining test count. If whence is PROGRESS_SET, then offset
656             specifies exactly the remaining test count.
657         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
658             PROGRESS_POP.
659         """
660         if whence == PROGRESS_CUR and offset > -1:
661             prefix = "+"
662         elif whence == PROGRESS_PUSH:
663             prefix = ""
664             offset = "push"
665         elif whence == PROGRESS_POP:
666             prefix = ""
667             offset = "pop"
668         else:
669             prefix = ""
670         self._stream.write("progress: %s%s\n" % (prefix, offset))
671
672     def time(self, a_datetime):
673         """Inform the client of the time.
674
675         ":param datetime: A datetime.datetime object.
676         """
677         time = a_datetime.astimezone(iso8601.Utc())
678         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
679             time.year, time.month, time.day, time.hour, time.minute,
680             time.second, time.microsecond))
681
682     def _write_details(self, details):
683         """Output details to the stream.
684
685         :param details: An extended details dict for a test outcome.
686         """
687         self._stream.write(" [ multipart\n")
688         for name, content in sorted(details.iteritems()):
689             self._stream.write("Content-Type: %s/%s" %
690                 (content.content_type.type, content.content_type.subtype))
691             parameters = content.content_type.parameters
692             if parameters:
693                 self._stream.write(";")
694                 param_strs = []
695                 for param, value in parameters.iteritems():
696                     param_strs.append("%s=%s" % (param, value))
697                 self._stream.write(",".join(param_strs))
698             self._stream.write("\n%s\n" % name)
699             encoder = chunked.Encoder(self._stream)
700             map(encoder.write, content.iter_bytes())
701             encoder.close()
702
703     def done(self):
704         """Obey the testtools result.done() interface."""
705
706
707 def RemoteError(description=""):
708     return (_StringException, _StringException(description), None)
709
710
711 class RemotedTestCase(unittest.TestCase):
712     """A class to represent test cases run in child processes.
713     
714     Instances of this class are used to provide the Python test API a TestCase
715     that can be printed to the screen, introspected for metadata and so on.
716     However, as they are a simply a memoisation of a test that was actually
717     run in the past by a separate process, they cannot perform any interactive
718     actions.
719     """
720
721     def __eq__ (self, other):
722         try:
723             return self.__description == other.__description
724         except AttributeError:
725             return False
726
727     def __init__(self, description):
728         """Create a psuedo test case with description description."""
729         self.__description = description
730
731     def error(self, label):
732         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
733             label)
734
735     def setUp(self):
736         self.error("setUp")
737
738     def tearDown(self):
739         self.error("tearDown")
740
741     def shortDescription(self):
742         return self.__description
743
744     def id(self):
745         return "%s" % (self.__description,)
746
747     def __str__(self):
748         return "%s (%s)" % (self.__description, self._strclass())
749
750     def __repr__(self):
751         return "<%s description='%s'>" % \
752                (self._strclass(), self.__description)
753
754     def run(self, result=None):
755         if result is None: result = self.defaultTestResult()
756         result.startTest(self)
757         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
758         result.stopTest(self)
759
760     def _strclass(self):
761         cls = self.__class__
762         return "%s.%s" % (cls.__module__, cls.__name__)
763
764
765 class ExecTestCase(unittest.TestCase):
766     """A test case which runs external scripts for test fixtures."""
767
768     def __init__(self, methodName='runTest'):
769         """Create an instance of the class that will use the named test
770            method when executed. Raises a ValueError if the instance does
771            not have a method with the specified name.
772         """
773         unittest.TestCase.__init__(self, methodName)
774         testMethod = getattr(self, methodName)
775         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
776                                testMethod.__doc__)
777
778     def countTestCases(self):
779         return 1
780
781     def run(self, result=None):
782         if result is None: result = self.defaultTestResult()
783         self._run(result)
784
785     def debug(self):
786         """Run the test without collecting errors in a TestResult"""
787         self._run(unittest.TestResult())
788
789     def _run(self, result):
790         protocol = TestProtocolServer(result)
791         output = subprocess.Popen(self.script, shell=True,
792             stdout=subprocess.PIPE).communicate()[0]
793         protocol.readFrom(StringIO(output))
794
795
796 class IsolatedTestCase(unittest.TestCase):
797     """A TestCase which executes in a forked process.
798     
799     Each test gets its own process, which has a performance overhead but will
800     provide excellent isolation from global state (such as django configs,
801     zope utilities and so on).
802     """
803
804     def run(self, result=None):
805         if result is None: result = self.defaultTestResult()
806         run_isolated(unittest.TestCase, self, result)
807
808
809 class IsolatedTestSuite(unittest.TestSuite):
810     """A TestSuite which runs its tests in a forked process.
811     
812     This decorator that will fork() before running the tests and report the
813     results from the child process using a Subunit stream.  This is useful for
814     handling tests that mutate global state, or are testing C extensions that
815     could crash the VM.
816     """
817
818     def run(self, result=None):
819         if result is None: result = unittest.TestResult()
820         run_isolated(unittest.TestSuite, self, result)
821
822
823 def run_isolated(klass, self, result):
824     """Run a test suite or case in a subprocess, using the run method on klass.
825     """
826     c2pread, c2pwrite = os.pipe()
827     # fixme - error -> result
828     # now fork
829     pid = os.fork()
830     if pid == 0:
831         # Child
832         # Close parent's pipe ends
833         os.close(c2pread)
834         # Dup fds for child
835         os.dup2(c2pwrite, 1)
836         # Close pipe fds.
837         os.close(c2pwrite)
838
839         # at this point, sys.stdin is redirected, now we want
840         # to filter it to escape ]'s.
841         ### XXX: test and write that bit.
842
843         result = TestProtocolClient(sys.stdout)
844         klass.run(self, result)
845         sys.stdout.flush()
846         sys.stderr.flush()
847         # exit HARD, exit NOW.
848         os._exit(0)
849     else:
850         # Parent
851         # Close child pipe ends
852         os.close(c2pwrite)
853         # hookup a protocol engine
854         protocol = TestProtocolServer(result)
855         protocol.readFrom(os.fdopen(c2pread, 'rU'))
856         os.waitpid(pid, 0)
857         # TODO return code evaluation.
858     return result
859
860
861 def TAP2SubUnit(tap, subunit):
862     """Filter a TAP pipe into a subunit pipe.
863     
864     :param tap: A tap pipe/stream/file object.
865     :param subunit: A pipe/stream/file object to write subunit results to.
866     :return: The exit code to exit with.
867     """
868     BEFORE_PLAN = 0
869     AFTER_PLAN = 1
870     SKIP_STREAM = 2
871     client = TestProtocolClient(subunit)
872     state = BEFORE_PLAN
873     plan_start = 1
874     plan_stop = 0
875     def _skipped_test(subunit, plan_start):
876         # Some tests were skipped.
877         subunit.write('test test %d\n' % plan_start)
878         subunit.write('error test %d [\n' % plan_start)
879         subunit.write('test missing from TAP output\n')
880         subunit.write(']\n')
881         return plan_start + 1
882     # Test data for the next test to emit
883     test_name = None
884     log = []
885     result = None
886     def _emit_test():
887         "write out a test"
888         if test_name is None:
889             return
890         subunit.write("test %s\n" % test_name)
891         if not log:
892             subunit.write("%s %s\n" % (result, test_name))
893         else:
894             subunit.write("%s %s [\n" % (result, test_name))
895         if log:
896             for line in log:
897                 subunit.write("%s\n" % line)
898             subunit.write("]\n")
899         del log[:]
900     for line in tap:
901         if state == BEFORE_PLAN:
902             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
903             if match:
904                 state = AFTER_PLAN
905                 _, plan_stop, comment = match.groups()
906                 plan_stop = int(plan_stop)
907                 if plan_start > plan_stop and plan_stop == 0:
908                     # skipped file
909                     state = SKIP_STREAM
910                     subunit.write("test file skip\n")
911                     subunit.write("skip file skip [\n")
912                     subunit.write("%s\n" % comment)
913                     subunit.write("]\n")
914                 continue
915         # not a plan line, or have seen one before
916         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
917         if match:
918             # new test, emit current one.
919             _emit_test()
920             status, number, description, directive, directive_comment = match.groups()
921             if status == 'ok':
922                 result = 'success'
923             else:
924                 result = "failure"
925             if description is None:
926                 description = ''
927             else:
928                 description = ' ' + description
929             if directive is not None:
930                 if directive == 'TODO':
931                     result = 'xfail'
932                 elif directive == 'SKIP':
933                     result = 'skip'
934                 if directive_comment is not None:
935                     log.append(directive_comment)
936             if number is not None:
937                 number = int(number)
938                 while plan_start < number:
939                     plan_start = _skipped_test(subunit, plan_start)
940             test_name = "test %d%s" % (plan_start, description)
941             plan_start += 1
942             continue
943         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
944         if match:
945             reason, = match.groups()
946             if reason is None:
947                 extra = ''
948             else:
949                 extra = ' %s' % reason
950             _emit_test()
951             test_name = "Bail out!%s" % extra
952             result = "error"
953             state = SKIP_STREAM
954             continue
955         match = re.match("\#.*\n", line)
956         if match:
957             log.append(line[:-1])
958             continue
959         subunit.write(line)
960     _emit_test()
961     while plan_start <= plan_stop:
962         # record missed tests
963         plan_start = _skipped_test(subunit, plan_start)
964     return 0
965
966
967 def tag_stream(original, filtered, tags):
968     """Alter tags on a stream.
969
970     :param original: The input stream.
971     :param filtered: The output stream.
972     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
973         '-TAG' commands.
974
975         A 'TAG' command will add the tag to the output stream,
976         and override any existing '-TAG' command in that stream.
977         Specifically:
978          * A global 'tags: TAG' will be added to the start of the stream.
979          * Any tags commands with -TAG will have the -TAG removed.
980
981         A '-TAG' command will remove the TAG command from the stream.
982         Specifically:
983          * A 'tags: -TAG' command will be added to the start of the stream.
984          * Any 'tags: TAG' command will have 'TAG' removed from it.
985         Additionally, any redundant tagging commands (adding a tag globally
986         present, or removing a tag globally removed) are stripped as a
987         by-product of the filtering.
988     :return: 0
989     """
990     new_tags, gone_tags = tags_to_new_gone(tags)
991     def write_tags(new_tags, gone_tags):
992         if new_tags or gone_tags:
993             filtered.write("tags: " + ' '.join(new_tags))
994             if gone_tags:
995                 for tag in gone_tags:
996                     filtered.write("-" + tag)
997             filtered.write("\n")
998     write_tags(new_tags, gone_tags)
999     # TODO: use the protocol parser and thus don't mangle test comments.
1000     for line in original:
1001         if line.startswith("tags:"):
1002             line_tags = line[5:].split()
1003             line_new, line_gone = tags_to_new_gone(line_tags)
1004             line_new = line_new - gone_tags
1005             line_gone = line_gone - new_tags
1006             write_tags(line_new, line_gone)
1007         else:
1008             filtered.write(line)
1009     return 0
1010
1011
1012 class ProtocolTestCase(object):
1013     """Subunit wire protocol to unittest.TestCase adapter.
1014
1015     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1016     calling a ProtocolTestCase or invoking the run() method will make a 'test
1017     run' happen. The 'test run' will simply be a replay of the test activity
1018     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1019     and ``countTestCases`` methods are not supported because there isn't a
1020     sensible mapping for those methods.
1021     
1022     # Get a stream (any object with a readline() method), in this case the
1023     # stream output by the example from ``subunit.TestProtocolClient``.
1024     stream = file('tests.log', 'rb')
1025     # Create a parser which will read from the stream and emit 
1026     # activity to a unittest.TestResult when run() is called.
1027     suite = subunit.ProtocolTestCase(stream)
1028     # Create a result object to accept the contents of that stream.
1029     result = unittest._TextTestResult(sys.stdout)
1030     # 'run' the tests - process the stream and feed its contents to result.
1031     suite.run(result)
1032     stream.close()
1033
1034     :seealso: TestProtocolServer (the subunit wire protocol parser).
1035     """
1036
1037     def __init__(self, stream, passthrough=None, forward=False):
1038         """Create a ProtocolTestCase reading from stream.
1039
1040         :param stream: A filelike object which a subunit stream can be read
1041             from.
1042         :param passthrough: A stream pass non subunit input on to. If not
1043             supplied, the TestProtocolServer default is used.
1044         :param forward: A stream to pass subunit input on to. If not supplied
1045             subunit input is not forwarded.
1046         """
1047         self._stream = stream
1048         self._passthrough = passthrough
1049         self._forward = forward
1050
1051     def __call__(self, result=None):
1052         return self.run(result)
1053
1054     def run(self, result=None):
1055         if result is None:
1056             result = self.defaultTestResult()
1057         protocol = TestProtocolServer(result, self._passthrough, self._forward)
1058         line = self._stream.readline()
1059         while line:
1060             protocol.lineReceived(line)
1061             line = self._stream.readline()
1062         protocol.lostConnection()
1063
1064
1065 class TestResultStats(unittest.TestResult):
1066     """A pyunit TestResult interface implementation for making statistics.
1067     
1068     :ivar total_tests: The total tests seen.
1069     :ivar passed_tests: The tests that passed.
1070     :ivar failed_tests: The tests that failed.
1071     :ivar seen_tags: The tags seen across all tests.
1072     """
1073
1074     def __init__(self, stream):
1075         """Create a TestResultStats which outputs to stream."""
1076         unittest.TestResult.__init__(self)
1077         self._stream = stream
1078         self.failed_tests = 0
1079         self.skipped_tests = 0
1080         self.seen_tags = set()
1081
1082     @property
1083     def total_tests(self):
1084         return self.testsRun
1085
1086     def addError(self, test, err, details=None):
1087         self.failed_tests += 1
1088
1089     def addFailure(self, test, err, details=None):
1090         self.failed_tests += 1
1091
1092     def addSkip(self, test, reason, details=None):
1093         self.skipped_tests += 1
1094
1095     def formatStats(self):
1096         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1097         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1098         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1099         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1100         tags = sorted(self.seen_tags)
1101         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1102
1103     @property
1104     def passed_tests(self):
1105         return self.total_tests - self.failed_tests - self.skipped_tests
1106
1107     def tags(self, new_tags, gone_tags):
1108         """Accumulate the seen tags."""
1109         self.seen_tags.update(new_tags)
1110
1111     def wasSuccessful(self):
1112         """Tells whether or not this result was a success"""
1113         return self.failed_tests == 0
1114
1115
1116 def get_default_formatter():
1117     """Obtain the default formatter to write to.
1118     
1119     :return: A file-like object.
1120     """
1121     formatter = os.getenv("SUBUNIT_FORMATTER")
1122     if formatter is not None:
1123         return os.popen(formatter, "w")
1124     else:
1125         return sys.stdout
1126