Move expected failures to the details API.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ========
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
57
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``, 
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
63
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
68
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
73
74
75 Python test support
76 -------------------
77
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
80
81   $ python -m subunit.run mylib.tests.test_suite
82
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
85
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
88
89 `ExecTestCase`` is a convenience wrapper for running an external 
90 program to get a Subunit stream and then report that back to an arbitrary
91 result object::
92
93  class AggregateTests(subunit.ExecTestCase):
94
95      def test_script_one(self):
96          './bin/script_one'
97
98      def test_script_two(self):
99          './bin/script_two'
100  
101  # Normally your normal test loading would take of this automatically,
102  # It is only spelt out in detail here for clarity.
103  suite = unittest.TestSuite([AggregateTests("test_script_one"),
104      AggregateTests("test_script_two")])
105  # Create any TestResult class you like.
106  result = unittest._TextTestResult(sys.stdout)
107  # And run your suite as normal, Subunit will exec each external script as
108  # needed and report to your result object.
109  suite.run(result)
110
111 Utility modules
112 ---------------
113
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
117 * subunit.test_results contains TestResult helper classes.
118 """
119
120 import datetime
121 import os
122 import re
123 from StringIO import StringIO
124 import subprocess
125 import sys
126 import unittest
127
128 import iso8601
129
130 import chunked, content, content_type, details, test_results
131
132
133 PROGRESS_SET = 0
134 PROGRESS_CUR = 1
135 PROGRESS_PUSH = 2
136 PROGRESS_POP = 3
137
138
139 def test_suite():
140     import subunit.tests
141     return subunit.tests.test_suite()
142
143
144 def join_dir(base_path, path):
145     """
146     Returns an absolute path to C{path}, calculated relative to the parent
147     of C{base_path}.
148
149     @param base_path: A path to a file or directory.
150     @param path: An absolute path, or a path relative to the containing
151     directory of C{base_path}.
152
153     @return: An absolute path to C{path}.
154     """
155     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
156
157
158 def tags_to_new_gone(tags):
159     """Split a list of tags into a new_set and a gone_set."""
160     new_tags = set()
161     gone_tags = set()
162     for tag in tags:
163         if tag[0] == '-':
164             gone_tags.add(tag[1:])
165         else:
166             new_tags.add(tag)
167     return new_tags, gone_tags
168
169
170 class DiscardStream(object):
171     """A filelike object which discards what is written to it."""
172
173     def write(self, bytes):
174         pass
175
176
177 class _ParserState(object):
178     """State for the subunit parser."""
179
180     def __init__(self, parser):
181         self.parser = parser
182
183     def addError(self, offset, line):
184         """An 'error:' directive has been read."""
185         self.parser.stdOutLineReceived(line)
186
187     def addExpectedFail(self, offset, line):
188         """An 'xfail:' directive has been read."""
189         self.parser.stdOutLineReceived(line)
190
191     def addFailure(self, offset, line):
192         """A 'failure:' directive has been read."""
193         self.parser.stdOutLineReceived(line)
194
195     def addSkip(self, offset, line):
196         """A 'skip:' directive has been read."""
197         self.parser.stdOutLineReceived(line)
198
199     def addSuccess(self, offset, line):
200         """A 'success:' directive has been read."""
201         self.parser.stdOutLineReceived(line)
202
203     def lineReceived(self, line):
204         """a line has been received."""
205         parts = line.split(None, 1)
206         if len(parts) == 2:
207             cmd, rest = parts
208             offset = len(cmd) + 1
209             cmd = cmd.strip(':')
210             if cmd in ('test', 'testing'):
211                 self.startTest(offset, line)
212             elif cmd == 'error':
213                 self.addError(offset, line)
214             elif cmd == 'failure':
215                 self.addFailure(offset, line)
216             elif cmd == 'progress':
217                 self.parser._handleProgress(offset, line)
218             elif cmd == 'skip':
219                 self.addSkip(offset, line)
220             elif cmd in ('success', 'successful'):
221                 self.addSuccess(offset, line)
222             elif cmd in ('tags',):
223                 self.parser._handleTags(offset, line)
224             elif cmd in ('time',):
225                 self.parser._handleTime(offset, line)
226             elif cmd == 'xfail':
227                 self.addExpectedFail(offset, line)
228             else:
229                 self.parser.stdOutLineReceived(line)
230         else:
231             self.parser.stdOutLineReceived(line)
232
233     def lostConnection(self):
234         """Connection lost."""
235         self.parser._lostConnectionInTest('unknown state of ')
236
237     def startTest(self, offset, line):
238         """A test start command received."""
239         self.parser.stdOutLineReceived(line)
240
241
242 class _InTest(_ParserState):
243     """State for the subunit parser after reading a test: directive."""
244
245     def _outcome(self, offset, line, no_details, details_state):
246         """An outcome directive has been read.
247         
248         :param no_details: Callable to call when no details are presented.
249         :param details_state: The state to switch to for details
250             processing of this outcome.
251         """
252         if self.parser.current_test_description == line[offset:-1]:
253             self.parser._state = self.parser._outside_test
254             self.parser.current_test_description = None
255             no_details()
256             self.parser.client.stopTest(self.parser._current_test)
257             self.parser._current_test = None
258         elif self.parser.current_test_description + " [" == line[offset:-1]:
259             self.parser._state = details_state
260             details_state.set_simple()
261         elif self.parser.current_test_description + " [ multipart" == \
262             line[offset:-1]:
263             self.parser._state = details_state
264             details_state.set_multipart()
265         else:
266             self.parser.stdOutLineReceived(line)
267
268     def _error(self):
269         self.parser.client.addError(self.parser._current_test,
270             details={})
271
272     def addError(self, offset, line):
273         """An 'error:' directive has been read."""
274         self._outcome(offset, line, self._error,
275             self.parser._reading_error_details)
276
277     def _xfail(self):
278         self.parser.client.addExpectedFailure(self.parser._current_test,
279             details={})
280
281     def addExpectedFail(self, offset, line):
282         """An 'xfail:' directive has been read."""
283         self._outcome(offset, line, self._xfail,
284             self.parser._reading_xfail_details)
285
286     def _failure(self):
287         self.parser.client.addFailure(self.parser._current_test,
288             details={})
289
290     def addFailure(self, offset, line):
291         """A 'failure:' directive has been read."""
292         self._outcome(offset, line, self._failure,
293             self.parser._reading_failure_details)
294
295     def _skip(self):
296         self.parser._skip_or_error()
297
298     def addSkip(self, offset, line):
299         """A 'skip:' directive has been read."""
300         self._outcome(offset, line, self._skip,
301             self.parser._reading_skip_details)
302
303     def _succeed(self):
304         self.parser.client.addSuccess(self.parser._current_test)
305
306     def addSuccess(self, offset, line):
307         """A 'success:' directive has been read."""
308         self._outcome(offset, line, self._succeed,
309             self.parser._reading_success_details)
310
311     def lostConnection(self):
312         """Connection lost."""
313         self.parser._lostConnectionInTest('')
314
315
316 class _OutSideTest(_ParserState):
317     """State for the subunit parser outside of a test context."""
318
319     def lostConnection(self):
320         """Connection lost."""
321
322     def startTest(self, offset, line):
323         """A test start command received."""
324         self.parser._state = self.parser._in_test
325         self.parser._current_test = RemotedTestCase(line[offset:-1])
326         self.parser.current_test_description = line[offset:-1]
327         self.parser.client.startTest(self.parser._current_test)
328
329
330 class _ReadingDetails(_ParserState):
331     """Common logic for readin state details."""
332
333     def endDetails(self):
334         """The end of a details section has been reached."""
335         self.parser._state = self.parser._outside_test
336         self.parser.current_test_description = None
337         self._report_outcome()
338         self.parser.client.stopTest(self.parser._current_test)
339
340     def lineReceived(self, line):
341         """a line has been received."""
342         self.details_parser.lineReceived(line)
343
344     def lostConnection(self):
345         """Connection lost."""
346         self.parser._lostConnectionInTest('%s report of ' %
347             self._outcome_label())
348
349     def _outcome_label(self):
350         """The label to describe this outcome."""
351         raise NotImplementedError(self._outcome_label)
352
353     def set_simple(self):
354         """Start a simple details parser."""
355         self.details_parser = details.SimpleDetailsParser(self)
356
357     def set_multipart(self):
358         """Start a multipart details parser."""
359         self.details_parser = details.MultipartDetailsParser(self)
360
361
362 class _ReadingFailureDetails(_ReadingDetails):
363     """State for the subunit parser when reading failure details."""
364
365     def _report_outcome(self):
366         self.parser.client.addFailure(self.parser._current_test,
367             details=self.details_parser.get_details())
368
369     def _outcome_label(self):
370         return "failure"
371  
372
373 class _ReadingErrorDetails(_ReadingDetails):
374     """State for the subunit parser when reading error details."""
375
376     def _report_outcome(self):
377         self.parser.client.addError(self.parser._current_test,
378             details=self.details_parser.get_details())
379
380     def _outcome_label(self):
381         return "error"
382
383
384 class _ReadingExpectedFailureDetails(_ReadingDetails):
385     """State for the subunit parser when reading xfail details."""
386
387     def _report_outcome(self):
388         self.parser.client.addExpectedFailure(self.parser._current_test,
389             details=self.details_parser.get_details())
390
391     def _outcome_label(self):
392         return "xfail"
393
394
395 class _ReadingSkipDetails(_ReadingDetails):
396     """State for the subunit parser when reading skip details."""
397
398     def _report_outcome(self):
399         self.parser._skip_or_error(self.details_parser.get_message())
400
401     def _outcome_label(self):
402         return "skip"
403
404
405 class _ReadingSuccessDetails(_ReadingDetails):
406     """State for the subunit parser when reading success details."""
407
408     def _report_outcome(self):
409         self.parser.client.addSuccess(self.parser._current_test)
410
411     def _outcome_label(self):
412         return "success"
413
414
415 class TestProtocolServer(object):
416     """A parser for subunit.
417     
418     :ivar tags: The current tags associated with the protocol stream.
419     """
420
421     def __init__(self, client, stream=None):
422         """Create a TestProtocolServer instance.
423
424         :param client: An object meeting the unittest.TestResult protocol.
425         :param stream: The stream that lines received which are not part of the
426             subunit protocol should be written to. This allows custom handling
427             of mixed protocols. By default, sys.stdout will be used for
428             convenience.
429         """
430         self.client = test_results.ExtendedToOriginalDecorator(client)
431         if stream is None:
432             stream = sys.stdout
433         self._stream = stream
434         # state objects we can switch too
435         self._in_test = _InTest(self)
436         self._outside_test = _OutSideTest(self)
437         self._reading_error_details = _ReadingErrorDetails(self)
438         self._reading_failure_details = _ReadingFailureDetails(self)
439         self._reading_skip_details = _ReadingSkipDetails(self)
440         self._reading_success_details = _ReadingSuccessDetails(self)
441         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
442         # start with outside test.
443         self._state = self._outside_test
444
445     def _skip_or_error(self, message=None):
446         """Report the current test as a skip if possible, or else an error."""
447         if not message:
448             message = "No reason given"
449         self.client.addSkip(self._current_test, message)
450
451     def _handleProgress(self, offset, line):
452         """Process a progress directive."""
453         line = line[offset:].strip()
454         if line[0] in '+-':
455             whence = PROGRESS_CUR
456             delta = int(line)
457         elif line == "push":
458             whence = PROGRESS_PUSH
459             delta = None
460         elif line == "pop":
461             whence = PROGRESS_POP
462             delta = None
463         else:
464             whence = PROGRESS_SET
465             delta = int(line)
466         self.client.progress(delta, whence)
467
468     def _handleTags(self, offset, line):
469         """Process a tags command."""
470         tags = line[offset:].split()
471         new_tags, gone_tags = tags_to_new_gone(tags)
472         self.client.tags(new_tags, gone_tags)
473
474     def _handleTime(self, offset, line):
475         # Accept it, but do not do anything with it yet.
476         try:
477             event_time = iso8601.parse_date(line[offset:-1])
478         except TypeError, e:
479             raise TypeError("Failed to parse %r, got %r" % (line, e))
480         self.client.time(event_time)
481
482     def lineReceived(self, line):
483         """Call the appropriate local method for the received line."""
484         self._state.lineReceived(line)
485
486     def _lostConnectionInTest(self, state_string):
487         error_string = "lost connection during %stest '%s'" % (
488             state_string, self.current_test_description)
489         self.client.addError(self._current_test, RemoteError(error_string))
490         self.client.stopTest(self._current_test)
491
492     def lostConnection(self):
493         """The input connection has finished."""
494         self._state.lostConnection()
495
496     def readFrom(self, pipe):
497         """Blocking convenience API to parse an entire stream.
498         
499         :param pipe: A file-like object supporting readlines().
500         :return: None.
501         """
502         for line in pipe.readlines():
503             self.lineReceived(line)
504         self.lostConnection()
505
506     def _startTest(self, offset, line):
507         """Internal call to change state machine. Override startTest()."""
508         self._state.startTest(offset, line)
509
510     def stdOutLineReceived(self, line):
511         self._stream.write(line)
512
513
514 class RemoteException(Exception):
515     """An exception that occured remotely to Python."""
516
517     def __eq__(self, other):
518         try:
519             return self.args == other.args
520         except AttributeError:
521             return False
522
523
524 class TestProtocolClient(unittest.TestResult):
525     """A TestResult which generates a subunit stream for a test run.
526     
527     # Get a TestSuite or TestCase to run
528     suite = make_suite()
529     # Create a stream (any object with a 'write' method)
530     stream = file('tests.log', 'wb')
531     # Create a subunit result object which will output to the stream
532     result = subunit.TestProtocolClient(stream)
533     # Optionally, to get timing data for performance analysis, wrap the
534     # serialiser with a timing decorator
535     result = subunit.test_results.AutoTimingTestResultDecorator(result)
536     # Run the test suite reporting to the subunit result object
537     suite.run(result)
538     # Close the stream.
539     stream.close()
540     """
541
542     def __init__(self, stream):
543         unittest.TestResult.__init__(self)
544         self._stream = stream
545
546     def addError(self, test, error=None, details=None):
547         """Report an error in test test.
548         
549         Only one of error and details should be provided: conceptually there
550         are two separate methods:
551             addError(self, test, error)
552             addError(self, test, details)
553
554         :param error: Standard unittest positional argument form - an
555             exc_info tuple.
556         :param details: New Testing-in-python drafted API; a dict from string
557             to subunit.Content objects.
558         """
559         self._addOutcome("error", test, error=error, details=details)
560
561     def addExpectedFailure(self, test, error=None, details=None):
562         """Report an expected failure in test test.
563         
564         Only one of error and details should be provided: conceptually there
565         are two separate methods:
566             addError(self, test, error)
567             addError(self, test, details)
568
569         :param error: Standard unittest positional argument form - an
570             exc_info tuple.
571         :param details: New Testing-in-python drafted API; a dict from string
572             to subunit.Content objects.
573         """
574         self._addOutcome("xfail", test, error=error, details=details)
575
576     def addFailure(self, test, error=None, details=None):
577         """Report a failure in test test.
578         
579         Only one of error and details should be provided: conceptually there
580         are two separate methods:
581             addFailure(self, test, error)
582             addFailure(self, test, details)
583
584         :param error: Standard unittest positional argument form - an
585             exc_info tuple.
586         :param details: New Testing-in-python drafted API; a dict from string
587             to subunit.Content objects.
588         """
589         self._addOutcome("failure", test, error=error, details=details)
590
591     def _addOutcome(self, outcome, test, error=None, details=None):
592         """Report a failure in test test.
593         
594         Only one of error and details should be provided: conceptually there
595         are two separate methods:
596             addOutcome(self, test, error)
597             addOutcome(self, test, details)
598
599         :param outcome: A string describing the outcome - used as the
600             event name in the subunit stream.
601         :param error: Standard unittest positional argument form - an
602             exc_info tuple.
603         :param details: New Testing-in-python drafted API; a dict from string
604             to subunit.Content objects.
605         """
606         self._stream.write("%s: %s" % (outcome, test.id()))
607         if error is None and details is None:
608             raise ValueError
609         if error is not None:
610             self._stream.write(" [\n")
611             for line in self._exc_info_to_string(error, test).splitlines():
612                 self._stream.write("%s\n" % line)
613         else:
614             self._write_details(details)
615         self._stream.write("]\n")
616
617     def addSkip(self, test, reason=None, details=None):
618         """Report a skipped test."""
619         if reason is None:
620             self._addOutcome("skip", test, error=None, details=details)
621         else:
622             self._stream.write("skip: %s [\n" % test.id())
623             self._stream.write("%s\n" % reason)
624             self._stream.write("]\n")
625
626     def addSuccess(self, test, details=None):
627         """Report a success in a test."""
628         self._stream.write("successful: %s" % test.id())
629         if not details:
630             self._stream.write("\n")
631         else:
632             self._write_details(details)
633             self._stream.write("]\n")
634     addUnexpectedSuccess = addSuccess
635
636     def startTest(self, test):
637         """Mark a test as starting its test run."""
638         self._stream.write("test: %s\n" % test.id())
639
640     def progress(self, offset, whence):
641         """Provide indication about the progress/length of the test run.
642
643         :param offset: Information about the number of tests remaining. If
644             whence is PROGRESS_CUR, then offset increases/decreases the
645             remaining test count. If whence is PROGRESS_SET, then offset
646             specifies exactly the remaining test count.
647         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
648             PROGRESS_POP.
649         """
650         if whence == PROGRESS_CUR and offset > -1:
651             prefix = "+"
652         elif whence == PROGRESS_PUSH:
653             prefix = ""
654             offset = "push"
655         elif whence == PROGRESS_POP:
656             prefix = ""
657             offset = "pop"
658         else:
659             prefix = ""
660         self._stream.write("progress: %s%s\n" % (prefix, offset))
661
662     def time(self, a_datetime):
663         """Inform the client of the time.
664
665         ":param datetime: A datetime.datetime object.
666         """
667         time = a_datetime.astimezone(iso8601.Utc())
668         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
669             time.year, time.month, time.day, time.hour, time.minute,
670             time.second, time.microsecond))
671
672     def _write_details(self, details):
673         """Output details to the stream.
674
675         :param details: An extended details dict for a test outcome.
676         """
677         self._stream.write(" [ multipart\n")
678         for name, content in sorted(details.iteritems()):
679             self._stream.write("Content-Type: %s/%s" %
680                 (content.content_type.type, content.content_type.subtype))
681             parameters = content.content_type.parameters
682             if parameters:
683                 self._stream.write(";")
684                 param_strs = []
685                 for param, value in parameters.iteritems():
686                     param_strs.append("%s=%s" % (param, value))
687                 self._stream.write(",".join(param_strs))
688             self._stream.write("\n%s\n" % name)
689             encoder = chunked.Encoder(self._stream)
690             map(encoder.write, content.iter_bytes())
691             encoder.close()
692
693     def done(self):
694         """Obey the testtools result.done() interface."""
695
696
697 def RemoteError(description=""):
698     if description == "":
699         description = "\n"
700     return (RemoteException, RemoteException(description), None)
701
702
703 class RemotedTestCase(unittest.TestCase):
704     """A class to represent test cases run in child processes.
705     
706     Instances of this class are used to provide the Python test API a TestCase
707     that can be printed to the screen, introspected for metadata and so on.
708     However, as they are a simply a memoisation of a test that was actually
709     run in the past by a separate process, they cannot perform any interactive
710     actions.
711     """
712
713     def __eq__ (self, other):
714         try:
715             return self.__description == other.__description
716         except AttributeError:
717             return False
718
719     def __init__(self, description):
720         """Create a psuedo test case with description description."""
721         self.__description = description
722
723     def error(self, label):
724         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
725             label)
726
727     def setUp(self):
728         self.error("setUp")
729
730     def tearDown(self):
731         self.error("tearDown")
732
733     def shortDescription(self):
734         return self.__description
735
736     def id(self):
737         return "%s" % (self.__description,)
738
739     def __str__(self):
740         return "%s (%s)" % (self.__description, self._strclass())
741
742     def __repr__(self):
743         return "<%s description='%s'>" % \
744                (self._strclass(), self.__description)
745
746     def run(self, result=None):
747         if result is None: result = self.defaultTestResult()
748         result.startTest(self)
749         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
750         result.stopTest(self)
751
752     def _strclass(self):
753         cls = self.__class__
754         return "%s.%s" % (cls.__module__, cls.__name__)
755
756
757 class ExecTestCase(unittest.TestCase):
758     """A test case which runs external scripts for test fixtures."""
759
760     def __init__(self, methodName='runTest'):
761         """Create an instance of the class that will use the named test
762            method when executed. Raises a ValueError if the instance does
763            not have a method with the specified name.
764         """
765         unittest.TestCase.__init__(self, methodName)
766         testMethod = getattr(self, methodName)
767         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
768                                testMethod.__doc__)
769
770     def countTestCases(self):
771         return 1
772
773     def run(self, result=None):
774         if result is None: result = self.defaultTestResult()
775         self._run(result)
776
777     def debug(self):
778         """Run the test without collecting errors in a TestResult"""
779         self._run(unittest.TestResult())
780
781     def _run(self, result):
782         protocol = TestProtocolServer(result)
783         output = subprocess.Popen(self.script, shell=True,
784             stdout=subprocess.PIPE).communicate()[0]
785         protocol.readFrom(StringIO(output))
786
787
788 class IsolatedTestCase(unittest.TestCase):
789     """A TestCase which executes in a forked process.
790     
791     Each test gets its own process, which has a performance overhead but will
792     provide excellent isolation from global state (such as django configs,
793     zope utilities and so on).
794     """
795
796     def run(self, result=None):
797         if result is None: result = self.defaultTestResult()
798         run_isolated(unittest.TestCase, self, result)
799
800
801 class IsolatedTestSuite(unittest.TestSuite):
802     """A TestSuite which runs its tests in a forked process.
803     
804     This decorator that will fork() before running the tests and report the
805     results from the child process using a Subunit stream.  This is useful for
806     handling tests that mutate global state, or are testing C extensions that
807     could crash the VM.
808     """
809
810     def run(self, result=None):
811         if result is None: result = unittest.TestResult()
812         run_isolated(unittest.TestSuite, self, result)
813
814
815 def run_isolated(klass, self, result):
816     """Run a test suite or case in a subprocess, using the run method on klass.
817     """
818     c2pread, c2pwrite = os.pipe()
819     # fixme - error -> result
820     # now fork
821     pid = os.fork()
822     if pid == 0:
823         # Child
824         # Close parent's pipe ends
825         os.close(c2pread)
826         # Dup fds for child
827         os.dup2(c2pwrite, 1)
828         # Close pipe fds.
829         os.close(c2pwrite)
830
831         # at this point, sys.stdin is redirected, now we want
832         # to filter it to escape ]'s.
833         ### XXX: test and write that bit.
834
835         result = TestProtocolClient(sys.stdout)
836         klass.run(self, result)
837         sys.stdout.flush()
838         sys.stderr.flush()
839         # exit HARD, exit NOW.
840         os._exit(0)
841     else:
842         # Parent
843         # Close child pipe ends
844         os.close(c2pwrite)
845         # hookup a protocol engine
846         protocol = TestProtocolServer(result)
847         protocol.readFrom(os.fdopen(c2pread, 'rU'))
848         os.waitpid(pid, 0)
849         # TODO return code evaluation.
850     return result
851
852
853 def TAP2SubUnit(tap, subunit):
854     """Filter a TAP pipe into a subunit pipe.
855     
856     :param tap: A tap pipe/stream/file object.
857     :param subunit: A pipe/stream/file object to write subunit results to.
858     :return: The exit code to exit with.
859     """
860     BEFORE_PLAN = 0
861     AFTER_PLAN = 1
862     SKIP_STREAM = 2
863     client = TestProtocolClient(subunit)
864     state = BEFORE_PLAN
865     plan_start = 1
866     plan_stop = 0
867     def _skipped_test(subunit, plan_start):
868         # Some tests were skipped.
869         subunit.write('test test %d\n' % plan_start)
870         subunit.write('error test %d [\n' % plan_start)
871         subunit.write('test missing from TAP output\n')
872         subunit.write(']\n')
873         return plan_start + 1
874     # Test data for the next test to emit
875     test_name = None
876     log = []
877     result = None
878     def _emit_test():
879         "write out a test"
880         if test_name is None:
881             return
882         subunit.write("test %s\n" % test_name)
883         if not log:
884             subunit.write("%s %s\n" % (result, test_name))
885         else:
886             subunit.write("%s %s [\n" % (result, test_name))
887         if log:
888             for line in log:
889                 subunit.write("%s\n" % line)
890             subunit.write("]\n")
891         del log[:]
892     for line in tap:
893         if state == BEFORE_PLAN:
894             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
895             if match:
896                 state = AFTER_PLAN
897                 _, plan_stop, comment = match.groups()
898                 plan_stop = int(plan_stop)
899                 if plan_start > plan_stop and plan_stop == 0:
900                     # skipped file
901                     state = SKIP_STREAM
902                     subunit.write("test file skip\n")
903                     subunit.write("skip file skip [\n")
904                     subunit.write("%s\n" % comment)
905                     subunit.write("]\n")
906                 continue
907         # not a plan line, or have seen one before
908         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
909         if match:
910             # new test, emit current one.
911             _emit_test()
912             status, number, description, directive, directive_comment = match.groups()
913             if status == 'ok':
914                 result = 'success'
915             else:
916                 result = "failure"
917             if description is None:
918                 description = ''
919             else:
920                 description = ' ' + description
921             if directive is not None:
922                 if directive == 'TODO':
923                     result = 'xfail'
924                 elif directive == 'SKIP':
925                     result = 'skip'
926                 if directive_comment is not None:
927                     log.append(directive_comment)
928             if number is not None:
929                 number = int(number)
930                 while plan_start < number:
931                     plan_start = _skipped_test(subunit, plan_start)
932             test_name = "test %d%s" % (plan_start, description)
933             plan_start += 1
934             continue
935         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
936         if match:
937             reason, = match.groups()
938             if reason is None:
939                 extra = ''
940             else:
941                 extra = ' %s' % reason
942             _emit_test()
943             test_name = "Bail out!%s" % extra
944             result = "error"
945             state = SKIP_STREAM
946             continue
947         match = re.match("\#.*\n", line)
948         if match:
949             log.append(line[:-1])
950             continue
951         subunit.write(line)
952     _emit_test()
953     while plan_start <= plan_stop:
954         # record missed tests
955         plan_start = _skipped_test(subunit, plan_start)
956     return 0
957
958
959 def tag_stream(original, filtered, tags):
960     """Alter tags on a stream.
961
962     :param original: The input stream.
963     :param filtered: The output stream.
964     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
965         '-TAG' commands.
966
967         A 'TAG' command will add the tag to the output stream,
968         and override any existing '-TAG' command in that stream.
969         Specifically:
970          * A global 'tags: TAG' will be added to the start of the stream.
971          * Any tags commands with -TAG will have the -TAG removed.
972
973         A '-TAG' command will remove the TAG command from the stream.
974         Specifically:
975          * A 'tags: -TAG' command will be added to the start of the stream.
976          * Any 'tags: TAG' command will have 'TAG' removed from it.
977         Additionally, any redundant tagging commands (adding a tag globally
978         present, or removing a tag globally removed) are stripped as a
979         by-product of the filtering.
980     :return: 0
981     """
982     new_tags, gone_tags = tags_to_new_gone(tags)
983     def write_tags(new_tags, gone_tags):
984         if new_tags or gone_tags:
985             filtered.write("tags: " + ' '.join(new_tags))
986             if gone_tags:
987                 for tag in gone_tags:
988                     filtered.write("-" + tag)
989             filtered.write("\n")
990     write_tags(new_tags, gone_tags)
991     # TODO: use the protocol parser and thus don't mangle test comments.
992     for line in original:
993         if line.startswith("tags:"):
994             line_tags = line[5:].split()
995             line_new, line_gone = tags_to_new_gone(line_tags)
996             line_new = line_new - gone_tags
997             line_gone = line_gone - new_tags
998             write_tags(line_new, line_gone)
999         else:
1000             filtered.write(line)
1001     return 0
1002
1003
1004 class ProtocolTestCase(object):
1005     """Subunit wire protocol to unittest.TestCase adapter.
1006
1007     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1008     calling a ProtocolTestCase or invoking the run() method will make a 'test
1009     run' happen. The 'test run' will simply be a replay of the test activity
1010     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1011     and ``countTestCases`` methods are not supported because there isn't a
1012     sensible mapping for those methods.
1013     
1014     # Get a stream (any object with a readline() method), in this case the
1015     # stream output by the example from ``subunit.TestProtocolClient``.
1016     stream = file('tests.log', 'rb')
1017     # Create a parser which will read from the stream and emit 
1018     # activity to a unittest.TestResult when run() is called.
1019     suite = subunit.ProtocolTestCase(stream)
1020     # Create a result object to accept the contents of that stream.
1021     result = unittest._TextTestResult(sys.stdout)
1022     # 'run' the tests - process the stream and feed its contents to result.
1023     suite.run(result)
1024     stream.close()
1025
1026     :seealso: TestProtocolServer (the subunit wire protocol parser).
1027     """
1028
1029     def __init__(self, stream, passthrough=None):
1030         """Create a ProtocolTestCase reading from stream.
1031
1032         :param stream: A filelike object which a subunit stream can be read
1033             from.
1034         :param passthrough: A stream pass non subunit input on to. If not
1035             supplied, the TestProtocolServer default is used.
1036         """
1037         self._stream = stream
1038         self._passthrough = passthrough
1039
1040     def __call__(self, result=None):
1041         return self.run(result)
1042
1043     def run(self, result=None):
1044         if result is None:
1045             result = self.defaultTestResult()
1046         protocol = TestProtocolServer(result, self._passthrough)
1047         line = self._stream.readline()
1048         while line:
1049             protocol.lineReceived(line)
1050             line = self._stream.readline()
1051         protocol.lostConnection()
1052
1053
1054 class TestResultStats(unittest.TestResult):
1055     """A pyunit TestResult interface implementation for making statistics.
1056     
1057     :ivar total_tests: The total tests seen.
1058     :ivar passed_tests: The tests that passed.
1059     :ivar failed_tests: The tests that failed.
1060     :ivar seen_tags: The tags seen across all tests.
1061     """
1062
1063     def __init__(self, stream):
1064         """Create a TestResultStats which outputs to stream."""
1065         unittest.TestResult.__init__(self)
1066         self._stream = stream
1067         self.failed_tests = 0
1068         self.skipped_tests = 0
1069         self.seen_tags = set()
1070
1071     @property
1072     def total_tests(self):
1073         return self.testsRun
1074
1075     def addError(self, test, err):
1076         self.failed_tests += 1
1077
1078     def addFailure(self, test, err):
1079         self.failed_tests += 1
1080
1081     def addSkip(self, test, reason):
1082         self.skipped_tests += 1
1083
1084     def formatStats(self):
1085         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1086         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1087         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1088         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1089         tags = sorted(self.seen_tags)
1090         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1091
1092     @property
1093     def passed_tests(self):
1094         return self.total_tests - self.failed_tests - self.skipped_tests
1095
1096     def tags(self, new_tags, gone_tags):
1097         """Accumulate the seen tags."""
1098         self.seen_tags.update(new_tags)
1099
1100     def wasSuccessful(self):
1101         """Tells whether or not this result was a success"""
1102         return self.failed_tests == 0
1103
1104
1105 class TestResultFilter(unittest.TestResult):
1106     """A pyunit TestResult interface implementation which filters tests.
1107
1108     Tests that pass the filter are handed on to another TestResult instance
1109     for further processing/reporting. To obtain the filtered results, 
1110     the other instance must be interrogated.
1111
1112     :ivar result: The result that tests are passed to after filtering.
1113     :ivar filter_predicate: The callback run to decide whether to pass 
1114         a result.
1115     """
1116
1117     def __init__(self, result, filter_error=False, filter_failure=False,
1118         filter_success=True, filter_skip=False,
1119         filter_predicate=None):
1120         """Create a FilterResult object filtering to result.
1121         
1122         :param filter_error: Filter out errors.
1123         :param filter_failure: Filter out failures.
1124         :param filter_success: Filter out successful tests.
1125         :param filter_skip: Filter out skipped tests.
1126         :param filter_predicate: A callable taking (test, err) and 
1127             returning True if the result should be passed through.
1128             err is None for success.
1129         """
1130         unittest.TestResult.__init__(self)
1131         self.result = result
1132         self._filter_error = filter_error
1133         self._filter_failure = filter_failure
1134         self._filter_success = filter_success
1135         self._filter_skip = filter_skip
1136         if filter_predicate is None:
1137             filter_predicate = lambda test, err: True
1138         self.filter_predicate = filter_predicate
1139         # The current test (for filtering tags)
1140         self._current_test = None
1141         # Has the current test been filtered (for outputting test tags)
1142         self._current_test_filtered = None
1143         # The (new, gone) tags for the current test.
1144         self._current_test_tags = None
1145         
1146     def addError(self, test, err):
1147         if not self._filter_error and self.filter_predicate(test, err):
1148             self.result.startTest(test)
1149             self.result.addError(test, err)
1150
1151     def addFailure(self, test, err):
1152         if not self._filter_failure and self.filter_predicate(test, err):
1153             self.result.startTest(test)
1154             self.result.addFailure(test, err)
1155
1156     def addSkip(self, test, reason):
1157         if not self._filter_skip and self.filter_predicate(test, reason):
1158             self.result.startTest(test)
1159             # This is duplicated, it would be nice to have on a 'calls
1160             # TestResults' mixin perhaps.
1161             addSkip = getattr(self.result, 'addSkip', None)
1162             if not callable(addSkip):
1163                 self.result.addError(test, RemoteError(reason))
1164             else:
1165                 self.result.addSkip(test, reason)
1166
1167     def addSuccess(self, test):
1168         if not self._filter_success and self.filter_predicate(test, None):
1169             self.result.startTest(test)
1170             self.result.addSuccess(test)
1171
1172     def startTest(self, test):
1173         """Start a test.
1174         
1175         Not directly passed to the client, but used for handling of tags
1176         correctly.
1177         """
1178         self._current_test = test
1179         self._current_test_filtered = False
1180         self._current_test_tags = set(), set()
1181     
1182     def stopTest(self, test):
1183         """Stop a test.
1184         
1185         Not directly passed to the client, but used for handling of tags
1186         correctly.
1187         """
1188         if not self._current_test_filtered:
1189             # Tags to output for this test.
1190             if self._current_test_tags[0] or self._current_test_tags[1]:
1191                 tags_method = getattr(self.result, 'tags', None)
1192                 if callable(tags_method):
1193                     self.result.tags(*self._current_test_tags)
1194             self.result.stopTest(test)
1195         self._current_test = None
1196         self._current_test_filtered = None
1197         self._current_test_tags = None
1198
1199     def tags(self, new_tags, gone_tags):
1200         """Handle tag instructions.
1201
1202         Adds and removes tags as appropriate. If a test is currently running,
1203         tags are not affected for subsequent tests.
1204         
1205         :param new_tags: Tags to add,
1206         :param gone_tags: Tags to remove.
1207         """
1208         if self._current_test is not None:
1209             # gather the tags until the test stops.
1210             self._current_test_tags[0].update(new_tags)
1211             self._current_test_tags[0].difference_update(gone_tags)
1212             self._current_test_tags[1].update(gone_tags)
1213             self._current_test_tags[1].difference_update(new_tags)
1214         tags_method = getattr(self.result, 'tags', None)
1215         if tags_method is None:
1216             return
1217         return tags_method(new_tags, gone_tags)
1218
1219     def id_to_orig_id(self, id):
1220         if id.startswith("subunit.RemotedTestCase."):
1221             return id[len("subunit.RemotedTestCase."):]
1222         return id