* Old style tracebacks with no encoding info are now treated as UTF8 rather
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ++++++++
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
58
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``, 
62 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
64
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occured at, to allow reconstructing
68 test timing from a stream.
69
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
74
75
76 Python test support
77 -------------------
78
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
81
82   $ python -m subunit.run mylib.tests.test_suite
83
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
86
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
89
90 `ExecTestCase`` is a convenience wrapper for running an external 
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
93
94  class AggregateTests(subunit.ExecTestCase):
95
96      def test_script_one(self):
97          './bin/script_one'
98
99      def test_script_two(self):
100          './bin/script_two'
101  
102  # Normally your normal test loading would take of this automatically,
103  # It is only spelt out in detail here for clarity.
104  suite = unittest.TestSuite([AggregateTests("test_script_one"),
105      AggregateTests("test_script_two")])
106  # Create any TestResult class you like.
107  result = unittest._TextTestResult(sys.stdout)
108  # And run your suite as normal, Subunit will exec each external script as
109  # needed and report to your result object.
110  suite.run(result)
111
112 Utility modules
113 ---------------
114
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
117 """
118
119 import datetime
120 import os
121 import re
122 from StringIO import StringIO
123 import subprocess
124 import sys
125 import unittest
126
127 import iso8601
128 from testtools import content, content_type, ExtendedToOriginalDecorator
129 try:
130     from testtools.testresult.real import _StringException
131     RemoteException = _StringException
132     _remote_exception_str = '_StringException' # For testing.
133 except ImportError:
134     raise ImportError ("testtools.testresult.real does not contain "
135         "_StringException, check your version.")
136
137
138 from testtools.testresult.real import _StringException
139
140 import chunked, details, test_results
141
142
143 PROGRESS_SET = 0
144 PROGRESS_CUR = 1
145 PROGRESS_PUSH = 2
146 PROGRESS_POP = 3
147
148
149 def test_suite():
150     import subunit.tests
151     return subunit.tests.test_suite()
152
153
154 def join_dir(base_path, path):
155     """
156     Returns an absolute path to C{path}, calculated relative to the parent
157     of C{base_path}.
158
159     @param base_path: A path to a file or directory.
160     @param path: An absolute path, or a path relative to the containing
161     directory of C{base_path}.
162
163     @return: An absolute path to C{path}.
164     """
165     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
166
167
168 def tags_to_new_gone(tags):
169     """Split a list of tags into a new_set and a gone_set."""
170     new_tags = set()
171     gone_tags = set()
172     for tag in tags:
173         if tag[0] == '-':
174             gone_tags.add(tag[1:])
175         else:
176             new_tags.add(tag)
177     return new_tags, gone_tags
178
179
180 class DiscardStream(object):
181     """A filelike object which discards what is written to it."""
182
183     def write(self, bytes):
184         pass
185
186
187 class _ParserState(object):
188     """State for the subunit parser."""
189
190     def __init__(self, parser):
191         self.parser = parser
192
193     def addError(self, offset, line):
194         """An 'error:' directive has been read."""
195         self.parser.stdOutLineReceived(line)
196
197     def addExpectedFail(self, offset, line):
198         """An 'xfail:' directive has been read."""
199         self.parser.stdOutLineReceived(line)
200
201     def addFailure(self, offset, line):
202         """A 'failure:' directive has been read."""
203         self.parser.stdOutLineReceived(line)
204
205     def addSkip(self, offset, line):
206         """A 'skip:' directive has been read."""
207         self.parser.stdOutLineReceived(line)
208
209     def addSuccess(self, offset, line):
210         """A 'success:' directive has been read."""
211         self.parser.stdOutLineReceived(line)
212
213     def lineReceived(self, line):
214         """a line has been received."""
215         parts = line.split(None, 1)
216         if len(parts) == 2 and line.startswith(parts[0]):
217             cmd, rest = parts
218             offset = len(cmd) + 1
219             cmd = cmd.rstrip(':')
220             if cmd in ('test', 'testing'):
221                 self.startTest(offset, line)
222             elif cmd == 'error':
223                 self.addError(offset, line)
224             elif cmd == 'failure':
225                 self.addFailure(offset, line)
226             elif cmd == 'progress':
227                 self.parser._handleProgress(offset, line)
228             elif cmd == 'skip':
229                 self.addSkip(offset, line)
230             elif cmd in ('success', 'successful'):
231                 self.addSuccess(offset, line)
232             elif cmd in ('tags',):
233                 self.parser._handleTags(offset, line)
234                 self.parser.subunitLineReceived(line)
235             elif cmd in ('time',):
236                 self.parser._handleTime(offset, line)
237                 self.parser.subunitLineReceived(line)
238             elif cmd == 'xfail':
239                 self.addExpectedFail(offset, line)
240             else:
241                 self.parser.stdOutLineReceived(line)
242         else:
243             self.parser.stdOutLineReceived(line)
244
245     def lostConnection(self):
246         """Connection lost."""
247         self.parser._lostConnectionInTest('unknown state of ')
248
249     def startTest(self, offset, line):
250         """A test start command received."""
251         self.parser.stdOutLineReceived(line)
252
253
254 class _InTest(_ParserState):
255     """State for the subunit parser after reading a test: directive."""
256
257     def _outcome(self, offset, line, no_details, details_state):
258         """An outcome directive has been read.
259         
260         :param no_details: Callable to call when no details are presented.
261         :param details_state: The state to switch to for details
262             processing of this outcome.
263         """
264         if self.parser.current_test_description == line[offset:-1]:
265             self.parser._state = self.parser._outside_test
266             self.parser.current_test_description = None
267             no_details()
268             self.parser.client.stopTest(self.parser._current_test)
269             self.parser._current_test = None
270             self.parser.subunitLineReceived(line)
271         elif self.parser.current_test_description + " [" == line[offset:-1]:
272             self.parser._state = details_state
273             details_state.set_simple()
274             self.parser.subunitLineReceived(line)
275         elif self.parser.current_test_description + " [ multipart" == \
276             line[offset:-1]:
277             self.parser._state = details_state
278             details_state.set_multipart()
279             self.parser.subunitLineReceived(line)
280         else:
281             self.parser.stdOutLineReceived(line)
282
283     def _error(self):
284         self.parser.client.addError(self.parser._current_test,
285             details={})
286
287     def addError(self, offset, line):
288         """An 'error:' directive has been read."""
289         self._outcome(offset, line, self._error,
290             self.parser._reading_error_details)
291
292     def _xfail(self):
293         self.parser.client.addExpectedFailure(self.parser._current_test,
294             details={})
295
296     def addExpectedFail(self, offset, line):
297         """An 'xfail:' directive has been read."""
298         self._outcome(offset, line, self._xfail,
299             self.parser._reading_xfail_details)
300
301     def _failure(self):
302         self.parser.client.addFailure(self.parser._current_test, details={})
303
304     def addFailure(self, offset, line):
305         """A 'failure:' directive has been read."""
306         self._outcome(offset, line, self._failure,
307             self.parser._reading_failure_details)
308
309     def _skip(self):
310         self.parser.client.addSkip(self.parser._current_test, details={})
311
312     def addSkip(self, offset, line):
313         """A 'skip:' directive has been read."""
314         self._outcome(offset, line, self._skip,
315             self.parser._reading_skip_details)
316
317     def _succeed(self):
318         self.parser.client.addSuccess(self.parser._current_test, details={})
319
320     def addSuccess(self, offset, line):
321         """A 'success:' directive has been read."""
322         self._outcome(offset, line, self._succeed,
323             self.parser._reading_success_details)
324
325     def lostConnection(self):
326         """Connection lost."""
327         self.parser._lostConnectionInTest('')
328
329
330 class _OutSideTest(_ParserState):
331     """State for the subunit parser outside of a test context."""
332
333     def lostConnection(self):
334         """Connection lost."""
335
336     def startTest(self, offset, line):
337         """A test start command received."""
338         self.parser._state = self.parser._in_test
339         self.parser._current_test = RemotedTestCase(line[offset:-1])
340         self.parser.current_test_description = line[offset:-1]
341         self.parser.client.startTest(self.parser._current_test)
342         self.parser.subunitLineReceived(line)
343
344
345 class _ReadingDetails(_ParserState):
346     """Common logic for readin state details."""
347
348     def endDetails(self):
349         """The end of a details section has been reached."""
350         self.parser._state = self.parser._outside_test
351         self.parser.current_test_description = None
352         self._report_outcome()
353         self.parser.client.stopTest(self.parser._current_test)
354
355     def lineReceived(self, line):
356         """a line has been received."""
357         self.details_parser.lineReceived(line)
358         self.parser.subunitLineReceived(line)
359
360     def lostConnection(self):
361         """Connection lost."""
362         self.parser._lostConnectionInTest('%s report of ' %
363             self._outcome_label())
364
365     def _outcome_label(self):
366         """The label to describe this outcome."""
367         raise NotImplementedError(self._outcome_label)
368
369     def set_simple(self):
370         """Start a simple details parser."""
371         self.details_parser = details.SimpleDetailsParser(self)
372
373     def set_multipart(self):
374         """Start a multipart details parser."""
375         self.details_parser = details.MultipartDetailsParser(self)
376
377
378 class _ReadingFailureDetails(_ReadingDetails):
379     """State for the subunit parser when reading failure details."""
380
381     def _report_outcome(self):
382         self.parser.client.addFailure(self.parser._current_test,
383             details=self.details_parser.get_details())
384
385     def _outcome_label(self):
386         return "failure"
387  
388
389 class _ReadingErrorDetails(_ReadingDetails):
390     """State for the subunit parser when reading error details."""
391
392     def _report_outcome(self):
393         self.parser.client.addError(self.parser._current_test,
394             details=self.details_parser.get_details())
395
396     def _outcome_label(self):
397         return "error"
398
399
400 class _ReadingExpectedFailureDetails(_ReadingDetails):
401     """State for the subunit parser when reading xfail details."""
402
403     def _report_outcome(self):
404         self.parser.client.addExpectedFailure(self.parser._current_test,
405             details=self.details_parser.get_details())
406
407     def _outcome_label(self):
408         return "xfail"
409
410
411 class _ReadingSkipDetails(_ReadingDetails):
412     """State for the subunit parser when reading skip details."""
413
414     def _report_outcome(self):
415         self.parser.client.addSkip(self.parser._current_test,
416             details=self.details_parser.get_details("skip"))
417
418     def _outcome_label(self):
419         return "skip"
420
421
422 class _ReadingSuccessDetails(_ReadingDetails):
423     """State for the subunit parser when reading success details."""
424
425     def _report_outcome(self):
426         self.parser.client.addSuccess(self.parser._current_test,
427             details=self.details_parser.get_details("success"))
428
429     def _outcome_label(self):
430         return "success"
431
432
433 class TestProtocolServer(object):
434     """A parser for subunit.
435     
436     :ivar tags: The current tags associated with the protocol stream.
437     """
438
439     def __init__(self, client, stream=None, forward_stream=None):
440         """Create a TestProtocolServer instance.
441
442         :param client: An object meeting the unittest.TestResult protocol.
443         :param stream: The stream that lines received which are not part of the
444             subunit protocol should be written to. This allows custom handling
445             of mixed protocols. By default, sys.stdout will be used for
446             convenience.
447         :param forward_stream: A stream to forward subunit lines to. This 
448             allows a filter to forward the entire stream while still parsing
449             and acting on it. By default forward_stream is set to
450             DiscardStream() and no forwarding happens.
451         """
452         self.client = ExtendedToOriginalDecorator(client)
453         if stream is None:
454             stream = sys.stdout
455         self._stream = stream
456         self._forward_stream = forward_stream or DiscardStream()
457         # state objects we can switch too
458         self._in_test = _InTest(self)
459         self._outside_test = _OutSideTest(self)
460         self._reading_error_details = _ReadingErrorDetails(self)
461         self._reading_failure_details = _ReadingFailureDetails(self)
462         self._reading_skip_details = _ReadingSkipDetails(self)
463         self._reading_success_details = _ReadingSuccessDetails(self)
464         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
465         # start with outside test.
466         self._state = self._outside_test
467
468     def _handleProgress(self, offset, line):
469         """Process a progress directive."""
470         line = line[offset:].strip()
471         if line[0] in '+-':
472             whence = PROGRESS_CUR
473             delta = int(line)
474         elif line == "push":
475             whence = PROGRESS_PUSH
476             delta = None
477         elif line == "pop":
478             whence = PROGRESS_POP
479             delta = None
480         else:
481             whence = PROGRESS_SET
482             delta = int(line)
483         self.client.progress(delta, whence)
484
485     def _handleTags(self, offset, line):
486         """Process a tags command."""
487         tags = line[offset:].split()
488         new_tags, gone_tags = tags_to_new_gone(tags)
489         self.client.tags(new_tags, gone_tags)
490
491     def _handleTime(self, offset, line):
492         # Accept it, but do not do anything with it yet.
493         try:
494             event_time = iso8601.parse_date(line[offset:-1])
495         except TypeError, e:
496             raise TypeError("Failed to parse %r, got %r" % (line, e))
497         self.client.time(event_time)
498
499     def lineReceived(self, line):
500         """Call the appropriate local method for the received line."""
501         self._state.lineReceived(line)
502
503     def _lostConnectionInTest(self, state_string):
504         error_string = "lost connection during %stest '%s'" % (
505             state_string, self.current_test_description)
506         self.client.addError(self._current_test, RemoteError(error_string))
507         self.client.stopTest(self._current_test)
508
509     def lostConnection(self):
510         """The input connection has finished."""
511         self._state.lostConnection()
512
513     def readFrom(self, pipe):
514         """Blocking convenience API to parse an entire stream.
515         
516         :param pipe: A file-like object supporting readlines().
517         :return: None.
518         """
519         for line in pipe.readlines():
520             self.lineReceived(line)
521         self.lostConnection()
522
523     def _startTest(self, offset, line):
524         """Internal call to change state machine. Override startTest()."""
525         self._state.startTest(offset, line)
526
527     def subunitLineReceived(self, line):
528         self._forward_stream.write(line)
529
530     def stdOutLineReceived(self, line):
531         self._stream.write(line)
532
533
534 class TestProtocolClient(unittest.TestResult):
535     """A TestResult which generates a subunit stream for a test run.
536     
537     # Get a TestSuite or TestCase to run
538     suite = make_suite()
539     # Create a stream (any object with a 'write' method)
540     stream = file('tests.log', 'wb')
541     # Create a subunit result object which will output to the stream
542     result = subunit.TestProtocolClient(stream)
543     # Optionally, to get timing data for performance analysis, wrap the
544     # serialiser with a timing decorator
545     result = subunit.test_results.AutoTimingTestResultDecorator(result)
546     # Run the test suite reporting to the subunit result object
547     suite.run(result)
548     # Close the stream.
549     stream.close()
550     """
551
552     def __init__(self, stream):
553         unittest.TestResult.__init__(self)
554         self._stream = stream
555         _make_stream_binary(stream)
556
557     def addError(self, test, error=None, details=None):
558         """Report an error in test test.
559         
560         Only one of error and details should be provided: conceptually there
561         are two separate methods:
562             addError(self, test, error)
563             addError(self, test, details)
564
565         :param error: Standard unittest positional argument form - an
566             exc_info tuple.
567         :param details: New Testing-in-python drafted API; a dict from string
568             to subunit.Content objects.
569         """
570         self._addOutcome("error", test, error=error, details=details)
571
572     def addExpectedFailure(self, test, error=None, details=None):
573         """Report an expected failure in test test.
574         
575         Only one of error and details should be provided: conceptually there
576         are two separate methods:
577             addError(self, test, error)
578             addError(self, test, details)
579
580         :param error: Standard unittest positional argument form - an
581             exc_info tuple.
582         :param details: New Testing-in-python drafted API; a dict from string
583             to subunit.Content objects.
584         """
585         self._addOutcome("xfail", test, error=error, details=details)
586
587     def addFailure(self, test, error=None, details=None):
588         """Report a failure in test test.
589         
590         Only one of error and details should be provided: conceptually there
591         are two separate methods:
592             addFailure(self, test, error)
593             addFailure(self, test, details)
594
595         :param error: Standard unittest positional argument form - an
596             exc_info tuple.
597         :param details: New Testing-in-python drafted API; a dict from string
598             to subunit.Content objects.
599         """
600         self._addOutcome("failure", test, error=error, details=details)
601
602     def _addOutcome(self, outcome, test, error=None, details=None):
603         """Report a failure in test test.
604         
605         Only one of error and details should be provided: conceptually there
606         are two separate methods:
607             addOutcome(self, test, error)
608             addOutcome(self, test, details)
609
610         :param outcome: A string describing the outcome - used as the
611             event name in the subunit stream.
612         :param error: Standard unittest positional argument form - an
613             exc_info tuple.
614         :param details: New Testing-in-python drafted API; a dict from string
615             to subunit.Content objects.
616         """
617         self._stream.write("%s: %s" % (outcome, test.id()))
618         if error is None and details is None:
619             raise ValueError
620         if error is not None:
621             self._stream.write(" [\n")
622             # XXX: this needs to be made much stricter, along the lines of
623             # Martin[gz]'s work in testtools. Perhaps subunit can use that?
624             for line in self._exc_info_to_string(error, test).splitlines():
625                 self._stream.write("%s\n" % line)
626         else:
627             self._write_details(details)
628         self._stream.write("]\n")
629
630     def addSkip(self, test, reason=None, details=None):
631         """Report a skipped test."""
632         if reason is None:
633             self._addOutcome("skip", test, error=None, details=details)
634         else:
635             self._stream.write("skip: %s [\n" % test.id())
636             self._stream.write("%s\n" % reason)
637             self._stream.write("]\n")
638
639     def addSuccess(self, test, details=None):
640         """Report a success in a test."""
641         self._stream.write("successful: %s" % test.id())
642         if not details:
643             self._stream.write("\n")
644         else:
645             self._write_details(details)
646             self._stream.write("]\n")
647     addUnexpectedSuccess = addSuccess
648
649     def startTest(self, test):
650         """Mark a test as starting its test run."""
651         self._stream.write("test: %s\n" % test.id())
652
653     def progress(self, offset, whence):
654         """Provide indication about the progress/length of the test run.
655
656         :param offset: Information about the number of tests remaining. If
657             whence is PROGRESS_CUR, then offset increases/decreases the
658             remaining test count. If whence is PROGRESS_SET, then offset
659             specifies exactly the remaining test count.
660         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
661             PROGRESS_POP.
662         """
663         if whence == PROGRESS_CUR and offset > -1:
664             prefix = "+"
665         elif whence == PROGRESS_PUSH:
666             prefix = ""
667             offset = "push"
668         elif whence == PROGRESS_POP:
669             prefix = ""
670             offset = "pop"
671         else:
672             prefix = ""
673         self._stream.write("progress: %s%s\n" % (prefix, offset))
674
675     def time(self, a_datetime):
676         """Inform the client of the time.
677
678         ":param datetime: A datetime.datetime object.
679         """
680         time = a_datetime.astimezone(iso8601.Utc())
681         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
682             time.year, time.month, time.day, time.hour, time.minute,
683             time.second, time.microsecond))
684
685     def _write_details(self, details):
686         """Output details to the stream.
687
688         :param details: An extended details dict for a test outcome.
689         """
690         self._stream.write(" [ multipart\n")
691         for name, content in sorted(details.iteritems()):
692             self._stream.write("Content-Type: %s/%s" %
693                 (content.content_type.type, content.content_type.subtype))
694             parameters = content.content_type.parameters
695             if parameters:
696                 self._stream.write(";")
697                 param_strs = []
698                 for param, value in parameters.iteritems():
699                     param_strs.append("%s=%s" % (param, value))
700                 self._stream.write(",".join(param_strs))
701             self._stream.write("\n%s\n" % name)
702             encoder = chunked.Encoder(self._stream)
703             map(encoder.write, content.iter_bytes())
704             encoder.close()
705
706     def done(self):
707         """Obey the testtools result.done() interface."""
708
709
710 def RemoteError(description=""):
711     return (_StringException, _StringException(description), None)
712
713
714 class RemotedTestCase(unittest.TestCase):
715     """A class to represent test cases run in child processes.
716     
717     Instances of this class are used to provide the Python test API a TestCase
718     that can be printed to the screen, introspected for metadata and so on.
719     However, as they are a simply a memoisation of a test that was actually
720     run in the past by a separate process, they cannot perform any interactive
721     actions.
722     """
723
724     def __eq__ (self, other):
725         try:
726             return self.__description == other.__description
727         except AttributeError:
728             return False
729
730     def __init__(self, description):
731         """Create a psuedo test case with description description."""
732         self.__description = description
733
734     def error(self, label):
735         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
736             label)
737
738     def setUp(self):
739         self.error("setUp")
740
741     def tearDown(self):
742         self.error("tearDown")
743
744     def shortDescription(self):
745         return self.__description
746
747     def id(self):
748         return "%s" % (self.__description,)
749
750     def __str__(self):
751         return "%s (%s)" % (self.__description, self._strclass())
752
753     def __repr__(self):
754         return "<%s description='%s'>" % \
755                (self._strclass(), self.__description)
756
757     def run(self, result=None):
758         if result is None: result = self.defaultTestResult()
759         result.startTest(self)
760         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
761         result.stopTest(self)
762
763     def _strclass(self):
764         cls = self.__class__
765         return "%s.%s" % (cls.__module__, cls.__name__)
766
767
768 class ExecTestCase(unittest.TestCase):
769     """A test case which runs external scripts for test fixtures."""
770
771     def __init__(self, methodName='runTest'):
772         """Create an instance of the class that will use the named test
773            method when executed. Raises a ValueError if the instance does
774            not have a method with the specified name.
775         """
776         unittest.TestCase.__init__(self, methodName)
777         testMethod = getattr(self, methodName)
778         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
779                                testMethod.__doc__)
780
781     def countTestCases(self):
782         return 1
783
784     def run(self, result=None):
785         if result is None: result = self.defaultTestResult()
786         self._run(result)
787
788     def debug(self):
789         """Run the test without collecting errors in a TestResult"""
790         self._run(unittest.TestResult())
791
792     def _run(self, result):
793         protocol = TestProtocolServer(result)
794         output = subprocess.Popen(self.script, shell=True,
795             stdout=subprocess.PIPE).communicate()[0]
796         protocol.readFrom(StringIO(output))
797
798
799 class IsolatedTestCase(unittest.TestCase):
800     """A TestCase which executes in a forked process.
801     
802     Each test gets its own process, which has a performance overhead but will
803     provide excellent isolation from global state (such as django configs,
804     zope utilities and so on).
805     """
806
807     def run(self, result=None):
808         if result is None: result = self.defaultTestResult()
809         run_isolated(unittest.TestCase, self, result)
810
811
812 class IsolatedTestSuite(unittest.TestSuite):
813     """A TestSuite which runs its tests in a forked process.
814     
815     This decorator that will fork() before running the tests and report the
816     results from the child process using a Subunit stream.  This is useful for
817     handling tests that mutate global state, or are testing C extensions that
818     could crash the VM.
819     """
820
821     def run(self, result=None):
822         if result is None: result = unittest.TestResult()
823         run_isolated(unittest.TestSuite, self, result)
824
825
826 def run_isolated(klass, self, result):
827     """Run a test suite or case in a subprocess, using the run method on klass.
828     """
829     c2pread, c2pwrite = os.pipe()
830     # fixme - error -> result
831     # now fork
832     pid = os.fork()
833     if pid == 0:
834         # Child
835         # Close parent's pipe ends
836         os.close(c2pread)
837         # Dup fds for child
838         os.dup2(c2pwrite, 1)
839         # Close pipe fds.
840         os.close(c2pwrite)
841
842         # at this point, sys.stdin is redirected, now we want
843         # to filter it to escape ]'s.
844         ### XXX: test and write that bit.
845
846         result = TestProtocolClient(sys.stdout)
847         klass.run(self, result)
848         sys.stdout.flush()
849         sys.stderr.flush()
850         # exit HARD, exit NOW.
851         os._exit(0)
852     else:
853         # Parent
854         # Close child pipe ends
855         os.close(c2pwrite)
856         # hookup a protocol engine
857         protocol = TestProtocolServer(result)
858         protocol.readFrom(os.fdopen(c2pread, 'rU'))
859         os.waitpid(pid, 0)
860         # TODO return code evaluation.
861     return result
862
863
864 def TAP2SubUnit(tap, subunit):
865     """Filter a TAP pipe into a subunit pipe.
866     
867     :param tap: A tap pipe/stream/file object.
868     :param subunit: A pipe/stream/file object to write subunit results to.
869     :return: The exit code to exit with.
870     """
871     BEFORE_PLAN = 0
872     AFTER_PLAN = 1
873     SKIP_STREAM = 2
874     client = TestProtocolClient(subunit)
875     state = BEFORE_PLAN
876     plan_start = 1
877     plan_stop = 0
878     def _skipped_test(subunit, plan_start):
879         # Some tests were skipped.
880         subunit.write('test test %d\n' % plan_start)
881         subunit.write('error test %d [\n' % plan_start)
882         subunit.write('test missing from TAP output\n')
883         subunit.write(']\n')
884         return plan_start + 1
885     # Test data for the next test to emit
886     test_name = None
887     log = []
888     result = None
889     def _emit_test():
890         "write out a test"
891         if test_name is None:
892             return
893         subunit.write("test %s\n" % test_name)
894         if not log:
895             subunit.write("%s %s\n" % (result, test_name))
896         else:
897             subunit.write("%s %s [\n" % (result, test_name))
898         if log:
899             for line in log:
900                 subunit.write("%s\n" % line)
901             subunit.write("]\n")
902         del log[:]
903     for line in tap:
904         if state == BEFORE_PLAN:
905             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
906             if match:
907                 state = AFTER_PLAN
908                 _, plan_stop, comment = match.groups()
909                 plan_stop = int(plan_stop)
910                 if plan_start > plan_stop and plan_stop == 0:
911                     # skipped file
912                     state = SKIP_STREAM
913                     subunit.write("test file skip\n")
914                     subunit.write("skip file skip [\n")
915                     subunit.write("%s\n" % comment)
916                     subunit.write("]\n")
917                 continue
918         # not a plan line, or have seen one before
919         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
920         if match:
921             # new test, emit current one.
922             _emit_test()
923             status, number, description, directive, directive_comment = match.groups()
924             if status == 'ok':
925                 result = 'success'
926             else:
927                 result = "failure"
928             if description is None:
929                 description = ''
930             else:
931                 description = ' ' + description
932             if directive is not None:
933                 if directive.upper() == 'TODO':
934                     result = 'xfail'
935                 elif directive.upper() == 'SKIP':
936                     result = 'skip'
937                 if directive_comment is not None:
938                     log.append(directive_comment)
939             if number is not None:
940                 number = int(number)
941                 while plan_start < number:
942                     plan_start = _skipped_test(subunit, plan_start)
943             test_name = "test %d%s" % (plan_start, description)
944             plan_start += 1
945             continue
946         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
947         if match:
948             reason, = match.groups()
949             if reason is None:
950                 extra = ''
951             else:
952                 extra = ' %s' % reason
953             _emit_test()
954             test_name = "Bail out!%s" % extra
955             result = "error"
956             state = SKIP_STREAM
957             continue
958         match = re.match("\#.*\n", line)
959         if match:
960             log.append(line[:-1])
961             continue
962         subunit.write(line)
963     _emit_test()
964     while plan_start <= plan_stop:
965         # record missed tests
966         plan_start = _skipped_test(subunit, plan_start)
967     return 0
968
969
970 def tag_stream(original, filtered, tags):
971     """Alter tags on a stream.
972
973     :param original: The input stream.
974     :param filtered: The output stream.
975     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
976         '-TAG' commands.
977
978         A 'TAG' command will add the tag to the output stream,
979         and override any existing '-TAG' command in that stream.
980         Specifically:
981          * A global 'tags: TAG' will be added to the start of the stream.
982          * Any tags commands with -TAG will have the -TAG removed.
983
984         A '-TAG' command will remove the TAG command from the stream.
985         Specifically:
986          * A 'tags: -TAG' command will be added to the start of the stream.
987          * Any 'tags: TAG' command will have 'TAG' removed from it.
988         Additionally, any redundant tagging commands (adding a tag globally
989         present, or removing a tag globally removed) are stripped as a
990         by-product of the filtering.
991     :return: 0
992     """
993     new_tags, gone_tags = tags_to_new_gone(tags)
994     def write_tags(new_tags, gone_tags):
995         if new_tags or gone_tags:
996             filtered.write("tags: " + ' '.join(new_tags))
997             if gone_tags:
998                 for tag in gone_tags:
999                     filtered.write("-" + tag)
1000             filtered.write("\n")
1001     write_tags(new_tags, gone_tags)
1002     # TODO: use the protocol parser and thus don't mangle test comments.
1003     for line in original:
1004         if line.startswith("tags:"):
1005             line_tags = line[5:].split()
1006             line_new, line_gone = tags_to_new_gone(line_tags)
1007             line_new = line_new - gone_tags
1008             line_gone = line_gone - new_tags
1009             write_tags(line_new, line_gone)
1010         else:
1011             filtered.write(line)
1012     return 0
1013
1014
1015 class ProtocolTestCase(object):
1016     """Subunit wire protocol to unittest.TestCase adapter.
1017
1018     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1019     calling a ProtocolTestCase or invoking the run() method will make a 'test
1020     run' happen. The 'test run' will simply be a replay of the test activity
1021     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1022     and ``countTestCases`` methods are not supported because there isn't a
1023     sensible mapping for those methods.
1024     
1025     # Get a stream (any object with a readline() method), in this case the
1026     # stream output by the example from ``subunit.TestProtocolClient``.
1027     stream = file('tests.log', 'rb')
1028     # Create a parser which will read from the stream and emit 
1029     # activity to a unittest.TestResult when run() is called.
1030     suite = subunit.ProtocolTestCase(stream)
1031     # Create a result object to accept the contents of that stream.
1032     result = unittest._TextTestResult(sys.stdout)
1033     # 'run' the tests - process the stream and feed its contents to result.
1034     suite.run(result)
1035     stream.close()
1036
1037     :seealso: TestProtocolServer (the subunit wire protocol parser).
1038     """
1039
1040     def __init__(self, stream, passthrough=None, forward=False):
1041         """Create a ProtocolTestCase reading from stream.
1042
1043         :param stream: A filelike object which a subunit stream can be read
1044             from.
1045         :param passthrough: A stream pass non subunit input on to. If not
1046             supplied, the TestProtocolServer default is used.
1047         :param forward: A stream to pass subunit input on to. If not supplied
1048             subunit input is not forwarded.
1049         """
1050         self._stream = stream
1051         _make_stream_binary(stream)
1052         self._passthrough = passthrough
1053         self._forward = forward
1054         _make_stream_binary(forward)
1055
1056     def __call__(self, result=None):
1057         return self.run(result)
1058
1059     def run(self, result=None):
1060         if result is None:
1061             result = self.defaultTestResult()
1062         protocol = TestProtocolServer(result, self._passthrough, self._forward)
1063         line = self._stream.readline()
1064         while line:
1065             protocol.lineReceived(line)
1066             line = self._stream.readline()
1067         protocol.lostConnection()
1068
1069
1070 class TestResultStats(unittest.TestResult):
1071     """A pyunit TestResult interface implementation for making statistics.
1072     
1073     :ivar total_tests: The total tests seen.
1074     :ivar passed_tests: The tests that passed.
1075     :ivar failed_tests: The tests that failed.
1076     :ivar seen_tags: The tags seen across all tests.
1077     """
1078
1079     def __init__(self, stream):
1080         """Create a TestResultStats which outputs to stream."""
1081         unittest.TestResult.__init__(self)
1082         self._stream = stream
1083         self.failed_tests = 0
1084         self.skipped_tests = 0
1085         self.seen_tags = set()
1086
1087     @property
1088     def total_tests(self):
1089         return self.testsRun
1090
1091     def addError(self, test, err, details=None):
1092         self.failed_tests += 1
1093
1094     def addFailure(self, test, err, details=None):
1095         self.failed_tests += 1
1096
1097     def addSkip(self, test, reason, details=None):
1098         self.skipped_tests += 1
1099
1100     def formatStats(self):
1101         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1102         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1103         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1104         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1105         tags = sorted(self.seen_tags)
1106         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1107
1108     @property
1109     def passed_tests(self):
1110         return self.total_tests - self.failed_tests - self.skipped_tests
1111
1112     def tags(self, new_tags, gone_tags):
1113         """Accumulate the seen tags."""
1114         self.seen_tags.update(new_tags)
1115
1116     def wasSuccessful(self):
1117         """Tells whether or not this result was a success"""
1118         return self.failed_tests == 0
1119
1120
1121 def get_default_formatter():
1122     """Obtain the default formatter to write to.
1123     
1124     :return: A file-like object.
1125     """
1126     formatter = os.getenv("SUBUNIT_FORMATTER")
1127     if formatter:
1128         return os.popen(formatter, "w")
1129     else:
1130         return sys.stdout
1131
1132
1133 def _make_stream_binary(stream):
1134     """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
1135     if getattr(stream, 'fileno', None) is not None:
1136         _make_binary_on_windows(stream.fileno())
1137
1138 def _make_binary_on_windows(fileno):
1139     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1140     if sys.platform == "win32":
1141         import msvcrt
1142         msvcrt.setmode(fileno, os.O_BINARY)