b5f622da0d9f9b9ac5974f56e0a995c00914c98a
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ========
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
57
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``, 
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
63
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
68
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
73
74
75 Python test support
76 -------------------
77
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
80
81   $ python -m subunit.run mylib.tests.test_suite
82
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
85
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
88
89 `ExecTestCase`` is a convenience wrapper for running an external 
90 program to get a Subunit stream and then report that back to an arbitrary
91 result object::
92
93  class AggregateTests(subunit.ExecTestCase):
94
95      def test_script_one(self):
96          './bin/script_one'
97
98      def test_script_two(self):
99          './bin/script_two'
100  
101  # Normally your normal test loading would take of this automatically,
102  # It is only spelt out in detail here for clarity.
103  suite = unittest.TestSuite([AggregateTests("test_script_one"),
104      AggregateTests("test_script_two")])
105  # Create any TestResult class you like.
106  result = unittest._TextTestResult(sys.stdout)
107  # And run your suite as normal, Subunit will exec each external script as
108  # needed and report to your result object.
109  suite.run(result)
110
111 Utility modules
112 ---------------
113
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * testtools.content contains a minimal assumptions MIME content representation.
116 * testtools.content_type contains a MIME Content-Type representation.
117 * subunit.test_results contains TestResult helper classes.
118 """
119
120 import datetime
121 import os
122 import re
123 from StringIO import StringIO
124 import subprocess
125 import sys
126 import unittest
127
128 import iso8601
129 from testtools import content, content_type, ExtendedToOriginalDecorator
130 from testtools.testresult import _StringException
131
132 import chunked, details, test_results
133
134
135 PROGRESS_SET = 0
136 PROGRESS_CUR = 1
137 PROGRESS_PUSH = 2
138 PROGRESS_POP = 3
139
140
141 def test_suite():
142     import subunit.tests
143     return subunit.tests.test_suite()
144
145
146 def join_dir(base_path, path):
147     """
148     Returns an absolute path to C{path}, calculated relative to the parent
149     of C{base_path}.
150
151     @param base_path: A path to a file or directory.
152     @param path: An absolute path, or a path relative to the containing
153     directory of C{base_path}.
154
155     @return: An absolute path to C{path}.
156     """
157     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
158
159
160 def tags_to_new_gone(tags):
161     """Split a list of tags into a new_set and a gone_set."""
162     new_tags = set()
163     gone_tags = set()
164     for tag in tags:
165         if tag[0] == '-':
166             gone_tags.add(tag[1:])
167         else:
168             new_tags.add(tag)
169     return new_tags, gone_tags
170
171
172 class DiscardStream(object):
173     """A filelike object which discards what is written to it."""
174
175     def write(self, bytes):
176         pass
177
178
179 class _ParserState(object):
180     """State for the subunit parser."""
181
182     def __init__(self, parser):
183         self.parser = parser
184
185     def addError(self, offset, line):
186         """An 'error:' directive has been read."""
187         self.parser.stdOutLineReceived(line)
188
189     def addExpectedFail(self, offset, line):
190         """An 'xfail:' directive has been read."""
191         self.parser.stdOutLineReceived(line)
192
193     def addFailure(self, offset, line):
194         """A 'failure:' directive has been read."""
195         self.parser.stdOutLineReceived(line)
196
197     def addSkip(self, offset, line):
198         """A 'skip:' directive has been read."""
199         self.parser.stdOutLineReceived(line)
200
201     def addSuccess(self, offset, line):
202         """A 'success:' directive has been read."""
203         self.parser.stdOutLineReceived(line)
204
205     def lineReceived(self, line):
206         """a line has been received."""
207         parts = line.split(None, 1)
208         if len(parts) == 2:
209             cmd, rest = parts
210             offset = len(cmd) + 1
211             cmd = cmd.strip(':')
212             if cmd in ('test', 'testing'):
213                 self.startTest(offset, line)
214             elif cmd == 'error':
215                 self.addError(offset, line)
216             elif cmd == 'failure':
217                 self.addFailure(offset, line)
218             elif cmd == 'progress':
219                 self.parser._handleProgress(offset, line)
220             elif cmd == 'skip':
221                 self.addSkip(offset, line)
222             elif cmd in ('success', 'successful'):
223                 self.addSuccess(offset, line)
224             elif cmd in ('tags',):
225                 self.parser._handleTags(offset, line)
226             elif cmd in ('time',):
227                 self.parser._handleTime(offset, line)
228             elif cmd == 'xfail':
229                 self.addExpectedFail(offset, line)
230             else:
231                 self.parser.stdOutLineReceived(line)
232         else:
233             self.parser.stdOutLineReceived(line)
234
235     def lostConnection(self):
236         """Connection lost."""
237         self.parser._lostConnectionInTest('unknown state of ')
238
239     def startTest(self, offset, line):
240         """A test start command received."""
241         self.parser.stdOutLineReceived(line)
242
243
244 class _InTest(_ParserState):
245     """State for the subunit parser after reading a test: directive."""
246
247     def _outcome(self, offset, line, no_details, details_state):
248         """An outcome directive has been read.
249         
250         :param no_details: Callable to call when no details are presented.
251         :param details_state: The state to switch to for details
252             processing of this outcome.
253         """
254         if self.parser.current_test_description == line[offset:-1]:
255             self.parser._state = self.parser._outside_test
256             self.parser.current_test_description = None
257             no_details()
258             self.parser.client.stopTest(self.parser._current_test)
259             self.parser._current_test = None
260         elif self.parser.current_test_description + " [" == line[offset:-1]:
261             self.parser._state = details_state
262             details_state.set_simple()
263         elif self.parser.current_test_description + " [ multipart" == \
264             line[offset:-1]:
265             self.parser._state = details_state
266             details_state.set_multipart()
267         else:
268             self.parser.stdOutLineReceived(line)
269
270     def _error(self):
271         self.parser.client.addError(self.parser._current_test,
272             details={})
273
274     def addError(self, offset, line):
275         """An 'error:' directive has been read."""
276         self._outcome(offset, line, self._error,
277             self.parser._reading_error_details)
278
279     def _xfail(self):
280         self.parser.client.addExpectedFailure(self.parser._current_test,
281             details={})
282
283     def addExpectedFail(self, offset, line):
284         """An 'xfail:' directive has been read."""
285         self._outcome(offset, line, self._xfail,
286             self.parser._reading_xfail_details)
287
288     def _failure(self):
289         self.parser.client.addFailure(self.parser._current_test, details={})
290
291     def addFailure(self, offset, line):
292         """A 'failure:' directive has been read."""
293         self._outcome(offset, line, self._failure,
294             self.parser._reading_failure_details)
295
296     def _skip(self):
297         self.parser.client.addSkip(self.parser._current_test, details={})
298
299     def addSkip(self, offset, line):
300         """A 'skip:' directive has been read."""
301         self._outcome(offset, line, self._skip,
302             self.parser._reading_skip_details)
303
304     def _succeed(self):
305         self.parser.client.addSuccess(self.parser._current_test, details={})
306
307     def addSuccess(self, offset, line):
308         """A 'success:' directive has been read."""
309         self._outcome(offset, line, self._succeed,
310             self.parser._reading_success_details)
311
312     def lostConnection(self):
313         """Connection lost."""
314         self.parser._lostConnectionInTest('')
315
316
317 class _OutSideTest(_ParserState):
318     """State for the subunit parser outside of a test context."""
319
320     def lostConnection(self):
321         """Connection lost."""
322
323     def startTest(self, offset, line):
324         """A test start command received."""
325         self.parser._state = self.parser._in_test
326         self.parser._current_test = RemotedTestCase(line[offset:-1])
327         self.parser.current_test_description = line[offset:-1]
328         self.parser.client.startTest(self.parser._current_test)
329
330
331 class _ReadingDetails(_ParserState):
332     """Common logic for readin state details."""
333
334     def endDetails(self):
335         """The end of a details section has been reached."""
336         self.parser._state = self.parser._outside_test
337         self.parser.current_test_description = None
338         self._report_outcome()
339         self.parser.client.stopTest(self.parser._current_test)
340
341     def lineReceived(self, line):
342         """a line has been received."""
343         self.details_parser.lineReceived(line)
344
345     def lostConnection(self):
346         """Connection lost."""
347         self.parser._lostConnectionInTest('%s report of ' %
348             self._outcome_label())
349
350     def _outcome_label(self):
351         """The label to describe this outcome."""
352         raise NotImplementedError(self._outcome_label)
353
354     def set_simple(self):
355         """Start a simple details parser."""
356         self.details_parser = details.SimpleDetailsParser(self)
357
358     def set_multipart(self):
359         """Start a multipart details parser."""
360         self.details_parser = details.MultipartDetailsParser(self)
361
362
363 class _ReadingFailureDetails(_ReadingDetails):
364     """State for the subunit parser when reading failure details."""
365
366     def _report_outcome(self):
367         self.parser.client.addFailure(self.parser._current_test,
368             details=self.details_parser.get_details())
369
370     def _outcome_label(self):
371         return "failure"
372  
373
374 class _ReadingErrorDetails(_ReadingDetails):
375     """State for the subunit parser when reading error details."""
376
377     def _report_outcome(self):
378         self.parser.client.addError(self.parser._current_test,
379             details=self.details_parser.get_details())
380
381     def _outcome_label(self):
382         return "error"
383
384
385 class _ReadingExpectedFailureDetails(_ReadingDetails):
386     """State for the subunit parser when reading xfail details."""
387
388     def _report_outcome(self):
389         self.parser.client.addExpectedFailure(self.parser._current_test,
390             details=self.details_parser.get_details())
391
392     def _outcome_label(self):
393         return "xfail"
394
395
396 class _ReadingSkipDetails(_ReadingDetails):
397     """State for the subunit parser when reading skip details."""
398
399     def _report_outcome(self):
400         self.parser.client.addSkip(self.parser._current_test,
401             details=self.details_parser.get_details("skip"))
402
403     def _outcome_label(self):
404         return "skip"
405
406
407 class _ReadingSuccessDetails(_ReadingDetails):
408     """State for the subunit parser when reading success details."""
409
410     def _report_outcome(self):
411         self.parser.client.addSuccess(self.parser._current_test,
412             details=self.details_parser.get_details("success"))
413
414     def _outcome_label(self):
415         return "success"
416
417
418 class TestProtocolServer(object):
419     """A parser for subunit.
420     
421     :ivar tags: The current tags associated with the protocol stream.
422     """
423
424     def __init__(self, client, stream=None):
425         """Create a TestProtocolServer instance.
426
427         :param client: An object meeting the unittest.TestResult protocol.
428         :param stream: The stream that lines received which are not part of the
429             subunit protocol should be written to. This allows custom handling
430             of mixed protocols. By default, sys.stdout will be used for
431             convenience.
432         """
433         self.client = ExtendedToOriginalDecorator(client)
434         if stream is None:
435             stream = sys.stdout
436         self._stream = stream
437         # state objects we can switch too
438         self._in_test = _InTest(self)
439         self._outside_test = _OutSideTest(self)
440         self._reading_error_details = _ReadingErrorDetails(self)
441         self._reading_failure_details = _ReadingFailureDetails(self)
442         self._reading_skip_details = _ReadingSkipDetails(self)
443         self._reading_success_details = _ReadingSuccessDetails(self)
444         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
445         # start with outside test.
446         self._state = self._outside_test
447
448     def _handleProgress(self, offset, line):
449         """Process a progress directive."""
450         line = line[offset:].strip()
451         if line[0] in '+-':
452             whence = PROGRESS_CUR
453             delta = int(line)
454         elif line == "push":
455             whence = PROGRESS_PUSH
456             delta = None
457         elif line == "pop":
458             whence = PROGRESS_POP
459             delta = None
460         else:
461             whence = PROGRESS_SET
462             delta = int(line)
463         self.client.progress(delta, whence)
464
465     def _handleTags(self, offset, line):
466         """Process a tags command."""
467         tags = line[offset:].split()
468         new_tags, gone_tags = tags_to_new_gone(tags)
469         self.client.tags(new_tags, gone_tags)
470
471     def _handleTime(self, offset, line):
472         # Accept it, but do not do anything with it yet.
473         try:
474             event_time = iso8601.parse_date(line[offset:-1])
475         except TypeError, e:
476             raise TypeError("Failed to parse %r, got %r" % (line, e))
477         self.client.time(event_time)
478
479     def lineReceived(self, line):
480         """Call the appropriate local method for the received line."""
481         self._state.lineReceived(line)
482
483     def _lostConnectionInTest(self, state_string):
484         error_string = "lost connection during %stest '%s'" % (
485             state_string, self.current_test_description)
486         self.client.addError(self._current_test, RemoteError(error_string))
487         self.client.stopTest(self._current_test)
488
489     def lostConnection(self):
490         """The input connection has finished."""
491         self._state.lostConnection()
492
493     def readFrom(self, pipe):
494         """Blocking convenience API to parse an entire stream.
495         
496         :param pipe: A file-like object supporting readlines().
497         :return: None.
498         """
499         for line in pipe.readlines():
500             self.lineReceived(line)
501         self.lostConnection()
502
503     def _startTest(self, offset, line):
504         """Internal call to change state machine. Override startTest()."""
505         self._state.startTest(offset, line)
506
507     def stdOutLineReceived(self, line):
508         self._stream.write(line)
509
510
511 try:
512     from testtools.testresult import _StringException as RemoteException
513     _remote_exception_str = '_StringException' # For testing.
514 except ImportError:
515     raise ImportError ("testtools.testresult does not contain _StringException, check your version.")
516
517
518 class TestProtocolClient(unittest.TestResult):
519     """A TestResult which generates a subunit stream for a test run.
520     
521     # Get a TestSuite or TestCase to run
522     suite = make_suite()
523     # Create a stream (any object with a 'write' method)
524     stream = file('tests.log', 'wb')
525     # Create a subunit result object which will output to the stream
526     result = subunit.TestProtocolClient(stream)
527     # Optionally, to get timing data for performance analysis, wrap the
528     # serialiser with a timing decorator
529     result = subunit.test_results.AutoTimingTestResultDecorator(result)
530     # Run the test suite reporting to the subunit result object
531     suite.run(result)
532     # Close the stream.
533     stream.close()
534     """
535
536     def __init__(self, stream):
537         unittest.TestResult.__init__(self)
538         self._stream = stream
539
540     def addError(self, test, error=None, details=None):
541         """Report an error in test test.
542         
543         Only one of error and details should be provided: conceptually there
544         are two separate methods:
545             addError(self, test, error)
546             addError(self, test, details)
547
548         :param error: Standard unittest positional argument form - an
549             exc_info tuple.
550         :param details: New Testing-in-python drafted API; a dict from string
551             to subunit.Content objects.
552         """
553         self._addOutcome("error", test, error=error, details=details)
554
555     def addExpectedFailure(self, test, error=None, details=None):
556         """Report an expected failure in test test.
557         
558         Only one of error and details should be provided: conceptually there
559         are two separate methods:
560             addError(self, test, error)
561             addError(self, test, details)
562
563         :param error: Standard unittest positional argument form - an
564             exc_info tuple.
565         :param details: New Testing-in-python drafted API; a dict from string
566             to subunit.Content objects.
567         """
568         self._addOutcome("xfail", test, error=error, details=details)
569
570     def addFailure(self, test, error=None, details=None):
571         """Report a failure in test test.
572         
573         Only one of error and details should be provided: conceptually there
574         are two separate methods:
575             addFailure(self, test, error)
576             addFailure(self, test, details)
577
578         :param error: Standard unittest positional argument form - an
579             exc_info tuple.
580         :param details: New Testing-in-python drafted API; a dict from string
581             to subunit.Content objects.
582         """
583         self._addOutcome("failure", test, error=error, details=details)
584
585     def _addOutcome(self, outcome, test, error=None, details=None):
586         """Report a failure in test test.
587         
588         Only one of error and details should be provided: conceptually there
589         are two separate methods:
590             addOutcome(self, test, error)
591             addOutcome(self, test, details)
592
593         :param outcome: A string describing the outcome - used as the
594             event name in the subunit stream.
595         :param error: Standard unittest positional argument form - an
596             exc_info tuple.
597         :param details: New Testing-in-python drafted API; a dict from string
598             to subunit.Content objects.
599         """
600         self._stream.write("%s: %s" % (outcome, test.id()))
601         if error is None and details is None:
602             raise ValueError
603         if error is not None:
604             self._stream.write(" [\n")
605             for line in self._exc_info_to_string(error, test).splitlines():
606                 self._stream.write("%s\n" % line)
607         else:
608             self._write_details(details)
609         self._stream.write("]\n")
610
611     def addSkip(self, test, reason=None, details=None):
612         """Report a skipped test."""
613         if reason is None:
614             self._addOutcome("skip", test, error=None, details=details)
615         else:
616             self._stream.write("skip: %s [\n" % test.id())
617             self._stream.write("%s\n" % reason)
618             self._stream.write("]\n")
619
620     def addSuccess(self, test, details=None):
621         """Report a success in a test."""
622         self._stream.write("successful: %s" % test.id())
623         if not details:
624             self._stream.write("\n")
625         else:
626             self._write_details(details)
627             self._stream.write("]\n")
628     addUnexpectedSuccess = addSuccess
629
630     def startTest(self, test):
631         """Mark a test as starting its test run."""
632         self._stream.write("test: %s\n" % test.id())
633
634     def progress(self, offset, whence):
635         """Provide indication about the progress/length of the test run.
636
637         :param offset: Information about the number of tests remaining. If
638             whence is PROGRESS_CUR, then offset increases/decreases the
639             remaining test count. If whence is PROGRESS_SET, then offset
640             specifies exactly the remaining test count.
641         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
642             PROGRESS_POP.
643         """
644         if whence == PROGRESS_CUR and offset > -1:
645             prefix = "+"
646         elif whence == PROGRESS_PUSH:
647             prefix = ""
648             offset = "push"
649         elif whence == PROGRESS_POP:
650             prefix = ""
651             offset = "pop"
652         else:
653             prefix = ""
654         self._stream.write("progress: %s%s\n" % (prefix, offset))
655
656     def time(self, a_datetime):
657         """Inform the client of the time.
658
659         ":param datetime: A datetime.datetime object.
660         """
661         time = a_datetime.astimezone(iso8601.Utc())
662         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
663             time.year, time.month, time.day, time.hour, time.minute,
664             time.second, time.microsecond))
665
666     def _write_details(self, details):
667         """Output details to the stream.
668
669         :param details: An extended details dict for a test outcome.
670         """
671         self._stream.write(" [ multipart\n")
672         for name, content in sorted(details.iteritems()):
673             self._stream.write("Content-Type: %s/%s" %
674                 (content.content_type.type, content.content_type.subtype))
675             parameters = content.content_type.parameters
676             if parameters:
677                 self._stream.write(";")
678                 param_strs = []
679                 for param, value in parameters.iteritems():
680                     param_strs.append("%s=%s" % (param, value))
681                 self._stream.write(",".join(param_strs))
682             self._stream.write("\n%s\n" % name)
683             encoder = chunked.Encoder(self._stream)
684             map(encoder.write, content.iter_bytes())
685             encoder.close()
686
687     def done(self):
688         """Obey the testtools result.done() interface."""
689
690
691 def RemoteError(description=""):
692     return (_StringException, _StringException(description), None)
693
694
695 class RemotedTestCase(unittest.TestCase):
696     """A class to represent test cases run in child processes.
697     
698     Instances of this class are used to provide the Python test API a TestCase
699     that can be printed to the screen, introspected for metadata and so on.
700     However, as they are a simply a memoisation of a test that was actually
701     run in the past by a separate process, they cannot perform any interactive
702     actions.
703     """
704
705     def __eq__ (self, other):
706         try:
707             return self.__description == other.__description
708         except AttributeError:
709             return False
710
711     def __init__(self, description):
712         """Create a psuedo test case with description description."""
713         self.__description = description
714
715     def error(self, label):
716         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
717             label)
718
719     def setUp(self):
720         self.error("setUp")
721
722     def tearDown(self):
723         self.error("tearDown")
724
725     def shortDescription(self):
726         return self.__description
727
728     def id(self):
729         return "%s" % (self.__description,)
730
731     def __str__(self):
732         return "%s (%s)" % (self.__description, self._strclass())
733
734     def __repr__(self):
735         return "<%s description='%s'>" % \
736                (self._strclass(), self.__description)
737
738     def run(self, result=None):
739         if result is None: result = self.defaultTestResult()
740         result.startTest(self)
741         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
742         result.stopTest(self)
743
744     def _strclass(self):
745         cls = self.__class__
746         return "%s.%s" % (cls.__module__, cls.__name__)
747
748
749 class ExecTestCase(unittest.TestCase):
750     """A test case which runs external scripts for test fixtures."""
751
752     def __init__(self, methodName='runTest'):
753         """Create an instance of the class that will use the named test
754            method when executed. Raises a ValueError if the instance does
755            not have a method with the specified name.
756         """
757         unittest.TestCase.__init__(self, methodName)
758         testMethod = getattr(self, methodName)
759         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
760                                testMethod.__doc__)
761
762     def countTestCases(self):
763         return 1
764
765     def run(self, result=None):
766         if result is None: result = self.defaultTestResult()
767         self._run(result)
768
769     def debug(self):
770         """Run the test without collecting errors in a TestResult"""
771         self._run(unittest.TestResult())
772
773     def _run(self, result):
774         protocol = TestProtocolServer(result)
775         output = subprocess.Popen(self.script, shell=True,
776             stdout=subprocess.PIPE).communicate()[0]
777         protocol.readFrom(StringIO(output))
778
779
780 class IsolatedTestCase(unittest.TestCase):
781     """A TestCase which executes in a forked process.
782     
783     Each test gets its own process, which has a performance overhead but will
784     provide excellent isolation from global state (such as django configs,
785     zope utilities and so on).
786     """
787
788     def run(self, result=None):
789         if result is None: result = self.defaultTestResult()
790         run_isolated(unittest.TestCase, self, result)
791
792
793 class IsolatedTestSuite(unittest.TestSuite):
794     """A TestSuite which runs its tests in a forked process.
795     
796     This decorator that will fork() before running the tests and report the
797     results from the child process using a Subunit stream.  This is useful for
798     handling tests that mutate global state, or are testing C extensions that
799     could crash the VM.
800     """
801
802     def run(self, result=None):
803         if result is None: result = unittest.TestResult()
804         run_isolated(unittest.TestSuite, self, result)
805
806
807 def run_isolated(klass, self, result):
808     """Run a test suite or case in a subprocess, using the run method on klass.
809     """
810     c2pread, c2pwrite = os.pipe()
811     # fixme - error -> result
812     # now fork
813     pid = os.fork()
814     if pid == 0:
815         # Child
816         # Close parent's pipe ends
817         os.close(c2pread)
818         # Dup fds for child
819         os.dup2(c2pwrite, 1)
820         # Close pipe fds.
821         os.close(c2pwrite)
822
823         # at this point, sys.stdin is redirected, now we want
824         # to filter it to escape ]'s.
825         ### XXX: test and write that bit.
826
827         result = TestProtocolClient(sys.stdout)
828         klass.run(self, result)
829         sys.stdout.flush()
830         sys.stderr.flush()
831         # exit HARD, exit NOW.
832         os._exit(0)
833     else:
834         # Parent
835         # Close child pipe ends
836         os.close(c2pwrite)
837         # hookup a protocol engine
838         protocol = TestProtocolServer(result)
839         protocol.readFrom(os.fdopen(c2pread, 'rU'))
840         os.waitpid(pid, 0)
841         # TODO return code evaluation.
842     return result
843
844
845 def TAP2SubUnit(tap, subunit):
846     """Filter a TAP pipe into a subunit pipe.
847     
848     :param tap: A tap pipe/stream/file object.
849     :param subunit: A pipe/stream/file object to write subunit results to.
850     :return: The exit code to exit with.
851     """
852     BEFORE_PLAN = 0
853     AFTER_PLAN = 1
854     SKIP_STREAM = 2
855     client = TestProtocolClient(subunit)
856     state = BEFORE_PLAN
857     plan_start = 1
858     plan_stop = 0
859     def _skipped_test(subunit, plan_start):
860         # Some tests were skipped.
861         subunit.write('test test %d\n' % plan_start)
862         subunit.write('error test %d [\n' % plan_start)
863         subunit.write('test missing from TAP output\n')
864         subunit.write(']\n')
865         return plan_start + 1
866     # Test data for the next test to emit
867     test_name = None
868     log = []
869     result = None
870     def _emit_test():
871         "write out a test"
872         if test_name is None:
873             return
874         subunit.write("test %s\n" % test_name)
875         if not log:
876             subunit.write("%s %s\n" % (result, test_name))
877         else:
878             subunit.write("%s %s [\n" % (result, test_name))
879         if log:
880             for line in log:
881                 subunit.write("%s\n" % line)
882             subunit.write("]\n")
883         del log[:]
884     for line in tap:
885         if state == BEFORE_PLAN:
886             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
887             if match:
888                 state = AFTER_PLAN
889                 _, plan_stop, comment = match.groups()
890                 plan_stop = int(plan_stop)
891                 if plan_start > plan_stop and plan_stop == 0:
892                     # skipped file
893                     state = SKIP_STREAM
894                     subunit.write("test file skip\n")
895                     subunit.write("skip file skip [\n")
896                     subunit.write("%s\n" % comment)
897                     subunit.write("]\n")
898                 continue
899         # not a plan line, or have seen one before
900         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
901         if match:
902             # new test, emit current one.
903             _emit_test()
904             status, number, description, directive, directive_comment = match.groups()
905             if status == 'ok':
906                 result = 'success'
907             else:
908                 result = "failure"
909             if description is None:
910                 description = ''
911             else:
912                 description = ' ' + description
913             if directive is not None:
914                 if directive == 'TODO':
915                     result = 'xfail'
916                 elif directive == 'SKIP':
917                     result = 'skip'
918                 if directive_comment is not None:
919                     log.append(directive_comment)
920             if number is not None:
921                 number = int(number)
922                 while plan_start < number:
923                     plan_start = _skipped_test(subunit, plan_start)
924             test_name = "test %d%s" % (plan_start, description)
925             plan_start += 1
926             continue
927         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
928         if match:
929             reason, = match.groups()
930             if reason is None:
931                 extra = ''
932             else:
933                 extra = ' %s' % reason
934             _emit_test()
935             test_name = "Bail out!%s" % extra
936             result = "error"
937             state = SKIP_STREAM
938             continue
939         match = re.match("\#.*\n", line)
940         if match:
941             log.append(line[:-1])
942             continue
943         subunit.write(line)
944     _emit_test()
945     while plan_start <= plan_stop:
946         # record missed tests
947         plan_start = _skipped_test(subunit, plan_start)
948     return 0
949
950
951 def tag_stream(original, filtered, tags):
952     """Alter tags on a stream.
953
954     :param original: The input stream.
955     :param filtered: The output stream.
956     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
957         '-TAG' commands.
958
959         A 'TAG' command will add the tag to the output stream,
960         and override any existing '-TAG' command in that stream.
961         Specifically:
962          * A global 'tags: TAG' will be added to the start of the stream.
963          * Any tags commands with -TAG will have the -TAG removed.
964
965         A '-TAG' command will remove the TAG command from the stream.
966         Specifically:
967          * A 'tags: -TAG' command will be added to the start of the stream.
968          * Any 'tags: TAG' command will have 'TAG' removed from it.
969         Additionally, any redundant tagging commands (adding a tag globally
970         present, or removing a tag globally removed) are stripped as a
971         by-product of the filtering.
972     :return: 0
973     """
974     new_tags, gone_tags = tags_to_new_gone(tags)
975     def write_tags(new_tags, gone_tags):
976         if new_tags or gone_tags:
977             filtered.write("tags: " + ' '.join(new_tags))
978             if gone_tags:
979                 for tag in gone_tags:
980                     filtered.write("-" + tag)
981             filtered.write("\n")
982     write_tags(new_tags, gone_tags)
983     # TODO: use the protocol parser and thus don't mangle test comments.
984     for line in original:
985         if line.startswith("tags:"):
986             line_tags = line[5:].split()
987             line_new, line_gone = tags_to_new_gone(line_tags)
988             line_new = line_new - gone_tags
989             line_gone = line_gone - new_tags
990             write_tags(line_new, line_gone)
991         else:
992             filtered.write(line)
993     return 0
994
995
996 class ProtocolTestCase(object):
997     """Subunit wire protocol to unittest.TestCase adapter.
998
999     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1000     calling a ProtocolTestCase or invoking the run() method will make a 'test
1001     run' happen. The 'test run' will simply be a replay of the test activity
1002     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1003     and ``countTestCases`` methods are not supported because there isn't a
1004     sensible mapping for those methods.
1005     
1006     # Get a stream (any object with a readline() method), in this case the
1007     # stream output by the example from ``subunit.TestProtocolClient``.
1008     stream = file('tests.log', 'rb')
1009     # Create a parser which will read from the stream and emit 
1010     # activity to a unittest.TestResult when run() is called.
1011     suite = subunit.ProtocolTestCase(stream)
1012     # Create a result object to accept the contents of that stream.
1013     result = unittest._TextTestResult(sys.stdout)
1014     # 'run' the tests - process the stream and feed its contents to result.
1015     suite.run(result)
1016     stream.close()
1017
1018     :seealso: TestProtocolServer (the subunit wire protocol parser).
1019     """
1020
1021     def __init__(self, stream, passthrough=None):
1022         """Create a ProtocolTestCase reading from stream.
1023
1024         :param stream: A filelike object which a subunit stream can be read
1025             from.
1026         :param passthrough: A stream pass non subunit input on to. If not
1027             supplied, the TestProtocolServer default is used.
1028         """
1029         self._stream = stream
1030         self._passthrough = passthrough
1031
1032     def __call__(self, result=None):
1033         return self.run(result)
1034
1035     def run(self, result=None):
1036         if result is None:
1037             result = self.defaultTestResult()
1038         protocol = TestProtocolServer(result, self._passthrough)
1039         line = self._stream.readline()
1040         while line:
1041             protocol.lineReceived(line)
1042             line = self._stream.readline()
1043         protocol.lostConnection()
1044
1045
1046 class TestResultStats(unittest.TestResult):
1047     """A pyunit TestResult interface implementation for making statistics.
1048     
1049     :ivar total_tests: The total tests seen.
1050     :ivar passed_tests: The tests that passed.
1051     :ivar failed_tests: The tests that failed.
1052     :ivar seen_tags: The tags seen across all tests.
1053     """
1054
1055     def __init__(self, stream):
1056         """Create a TestResultStats which outputs to stream."""
1057         unittest.TestResult.__init__(self)
1058         self._stream = stream
1059         self.failed_tests = 0
1060         self.skipped_tests = 0
1061         self.seen_tags = set()
1062
1063     @property
1064     def total_tests(self):
1065         return self.testsRun
1066
1067     def addError(self, test, err, details=None):
1068         self.failed_tests += 1
1069
1070     def addFailure(self, test, err, details=None):
1071         self.failed_tests += 1
1072
1073     def addSkip(self, test, reason, details=None):
1074         self.skipped_tests += 1
1075
1076     def formatStats(self):
1077         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1078         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1079         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1080         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1081         tags = sorted(self.seen_tags)
1082         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1083
1084     @property
1085     def passed_tests(self):
1086         return self.total_tests - self.failed_tests - self.skipped_tests
1087
1088     def tags(self, new_tags, gone_tags):
1089         """Accumulate the seen tags."""
1090         self.seen_tags.update(new_tags)
1091
1092     def wasSuccessful(self):
1093         """Tells whether or not this result was a success"""
1094         return self.failed_tests == 0