Start reporting additional messages on successes via the details API.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ========
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
57
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``, 
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
63
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
68
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
73
74
75 Python test support
76 -------------------
77
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
80
81   $ python -m subunit.run mylib.tests.test_suite
82
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
85
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
88
89 `ExecTestCase`` is a convenience wrapper for running an external 
90 program to get a Subunit stream and then report that back to an arbitrary
91 result object::
92
93  class AggregateTests(subunit.ExecTestCase):
94
95      def test_script_one(self):
96          './bin/script_one'
97
98      def test_script_two(self):
99          './bin/script_two'
100  
101  # Normally your normal test loading would take of this automatically,
102  # It is only spelt out in detail here for clarity.
103  suite = unittest.TestSuite([AggregateTests("test_script_one"),
104      AggregateTests("test_script_two")])
105  # Create any TestResult class you like.
106  result = unittest._TextTestResult(sys.stdout)
107  # And run your suite as normal, Subunit will exec each external script as
108  # needed and report to your result object.
109  suite.run(result)
110
111 Utility modules
112 ---------------
113
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
117 * subunit.test_results contains TestResult helper classes.
118 """
119
120 import datetime
121 import os
122 import re
123 from StringIO import StringIO
124 import subprocess
125 import sys
126 import unittest
127
128 import iso8601
129
130 import chunked, content, content_type, details, test_results
131
132
133 PROGRESS_SET = 0
134 PROGRESS_CUR = 1
135 PROGRESS_PUSH = 2
136 PROGRESS_POP = 3
137
138
139 def test_suite():
140     import subunit.tests
141     return subunit.tests.test_suite()
142
143
144 def join_dir(base_path, path):
145     """
146     Returns an absolute path to C{path}, calculated relative to the parent
147     of C{base_path}.
148
149     @param base_path: A path to a file or directory.
150     @param path: An absolute path, or a path relative to the containing
151     directory of C{base_path}.
152
153     @return: An absolute path to C{path}.
154     """
155     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
156
157
158 def tags_to_new_gone(tags):
159     """Split a list of tags into a new_set and a gone_set."""
160     new_tags = set()
161     gone_tags = set()
162     for tag in tags:
163         if tag[0] == '-':
164             gone_tags.add(tag[1:])
165         else:
166             new_tags.add(tag)
167     return new_tags, gone_tags
168
169
170 class DiscardStream(object):
171     """A filelike object which discards what is written to it."""
172
173     def write(self, bytes):
174         pass
175
176
177 class _ParserState(object):
178     """State for the subunit parser."""
179
180     def __init__(self, parser):
181         self.parser = parser
182
183     def addError(self, offset, line):
184         """An 'error:' directive has been read."""
185         self.parser.stdOutLineReceived(line)
186
187     def addExpectedFail(self, offset, line):
188         """An 'xfail:' directive has been read."""
189         self.parser.stdOutLineReceived(line)
190
191     def addFailure(self, offset, line):
192         """A 'failure:' directive has been read."""
193         self.parser.stdOutLineReceived(line)
194
195     def addSkip(self, offset, line):
196         """A 'skip:' directive has been read."""
197         self.parser.stdOutLineReceived(line)
198
199     def addSuccess(self, offset, line):
200         """A 'success:' directive has been read."""
201         self.parser.stdOutLineReceived(line)
202
203     def lineReceived(self, line):
204         """a line has been received."""
205         parts = line.split(None, 1)
206         if len(parts) == 2:
207             cmd, rest = parts
208             offset = len(cmd) + 1
209             cmd = cmd.strip(':')
210             if cmd in ('test', 'testing'):
211                 self.startTest(offset, line)
212             elif cmd == 'error':
213                 self.addError(offset, line)
214             elif cmd == 'failure':
215                 self.addFailure(offset, line)
216             elif cmd == 'progress':
217                 self.parser._handleProgress(offset, line)
218             elif cmd == 'skip':
219                 self.addSkip(offset, line)
220             elif cmd in ('success', 'successful'):
221                 self.addSuccess(offset, line)
222             elif cmd in ('tags',):
223                 self.parser._handleTags(offset, line)
224             elif cmd in ('time',):
225                 self.parser._handleTime(offset, line)
226             elif cmd == 'xfail':
227                 self.addExpectedFail(offset, line)
228             else:
229                 self.parser.stdOutLineReceived(line)
230         else:
231             self.parser.stdOutLineReceived(line)
232
233     def lostConnection(self):
234         """Connection lost."""
235         self.parser._lostConnectionInTest('unknown state of ')
236
237     def startTest(self, offset, line):
238         """A test start command received."""
239         self.parser.stdOutLineReceived(line)
240
241
242 class _InTest(_ParserState):
243     """State for the subunit parser after reading a test: directive."""
244
245     def _outcome(self, offset, line, no_details, details_state):
246         """An outcome directive has been read.
247         
248         :param no_details: Callable to call when no details are presented.
249         :param details_state: The state to switch to for details
250             processing of this outcome.
251         """
252         if self.parser.current_test_description == line[offset:-1]:
253             self.parser._state = self.parser._outside_test
254             self.parser.current_test_description = None
255             no_details()
256             self.parser.client.stopTest(self.parser._current_test)
257             self.parser._current_test = None
258         elif self.parser.current_test_description + " [" == line[offset:-1]:
259             self.parser._state = details_state
260             details_state.set_simple()
261         elif self.parser.current_test_description + " [ multipart" == \
262             line[offset:-1]:
263             self.parser._state = details_state
264             details_state.set_multipart()
265         else:
266             self.parser.stdOutLineReceived(line)
267
268     def _error(self):
269         self.parser.client.addError(self.parser._current_test,
270             details={})
271
272     def addError(self, offset, line):
273         """An 'error:' directive has been read."""
274         self._outcome(offset, line, self._error,
275             self.parser._reading_error_details)
276
277     def _xfail(self):
278         self.parser.client.addExpectedFailure(self.parser._current_test,
279             details={})
280
281     def addExpectedFail(self, offset, line):
282         """An 'xfail:' directive has been read."""
283         self._outcome(offset, line, self._xfail,
284             self.parser._reading_xfail_details)
285
286     def _failure(self):
287         self.parser.client.addFailure(self.parser._current_test, details={})
288
289     def addFailure(self, offset, line):
290         """A 'failure:' directive has been read."""
291         self._outcome(offset, line, self._failure,
292             self.parser._reading_failure_details)
293
294     def _skip(self):
295         self.parser.client.addSkip(self.parser._current_test, details={})
296
297     def addSkip(self, offset, line):
298         """A 'skip:' directive has been read."""
299         self._outcome(offset, line, self._skip,
300             self.parser._reading_skip_details)
301
302     def _succeed(self):
303         self.parser.client.addSuccess(self.parser._current_test, details={})
304
305     def addSuccess(self, offset, line):
306         """A 'success:' directive has been read."""
307         self._outcome(offset, line, self._succeed,
308             self.parser._reading_success_details)
309
310     def lostConnection(self):
311         """Connection lost."""
312         self.parser._lostConnectionInTest('')
313
314
315 class _OutSideTest(_ParserState):
316     """State for the subunit parser outside of a test context."""
317
318     def lostConnection(self):
319         """Connection lost."""
320
321     def startTest(self, offset, line):
322         """A test start command received."""
323         self.parser._state = self.parser._in_test
324         self.parser._current_test = RemotedTestCase(line[offset:-1])
325         self.parser.current_test_description = line[offset:-1]
326         self.parser.client.startTest(self.parser._current_test)
327
328
329 class _ReadingDetails(_ParserState):
330     """Common logic for readin state details."""
331
332     def endDetails(self):
333         """The end of a details section has been reached."""
334         self.parser._state = self.parser._outside_test
335         self.parser.current_test_description = None
336         self._report_outcome()
337         self.parser.client.stopTest(self.parser._current_test)
338
339     def lineReceived(self, line):
340         """a line has been received."""
341         self.details_parser.lineReceived(line)
342
343     def lostConnection(self):
344         """Connection lost."""
345         self.parser._lostConnectionInTest('%s report of ' %
346             self._outcome_label())
347
348     def _outcome_label(self):
349         """The label to describe this outcome."""
350         raise NotImplementedError(self._outcome_label)
351
352     def set_simple(self):
353         """Start a simple details parser."""
354         self.details_parser = details.SimpleDetailsParser(self)
355
356     def set_multipart(self):
357         """Start a multipart details parser."""
358         self.details_parser = details.MultipartDetailsParser(self)
359
360
361 class _ReadingFailureDetails(_ReadingDetails):
362     """State for the subunit parser when reading failure details."""
363
364     def _report_outcome(self):
365         self.parser.client.addFailure(self.parser._current_test,
366             details=self.details_parser.get_details())
367
368     def _outcome_label(self):
369         return "failure"
370  
371
372 class _ReadingErrorDetails(_ReadingDetails):
373     """State for the subunit parser when reading error details."""
374
375     def _report_outcome(self):
376         self.parser.client.addError(self.parser._current_test,
377             details=self.details_parser.get_details())
378
379     def _outcome_label(self):
380         return "error"
381
382
383 class _ReadingExpectedFailureDetails(_ReadingDetails):
384     """State for the subunit parser when reading xfail details."""
385
386     def _report_outcome(self):
387         self.parser.client.addExpectedFailure(self.parser._current_test,
388             details=self.details_parser.get_details())
389
390     def _outcome_label(self):
391         return "xfail"
392
393
394 class _ReadingSkipDetails(_ReadingDetails):
395     """State for the subunit parser when reading skip details."""
396
397     def _report_outcome(self):
398         self.parser.client.addSkip(self.parser._current_test,
399             details=self.details_parser.get_details("skip"))
400
401     def _outcome_label(self):
402         return "skip"
403
404
405 class _ReadingSuccessDetails(_ReadingDetails):
406     """State for the subunit parser when reading success details."""
407
408     def _report_outcome(self):
409         self.parser.client.addSuccess(self.parser._current_test,
410             details=self.details_parser.get_details("success"))
411
412     def _outcome_label(self):
413         return "success"
414
415
416 class TestProtocolServer(object):
417     """A parser for subunit.
418     
419     :ivar tags: The current tags associated with the protocol stream.
420     """
421
422     def __init__(self, client, stream=None):
423         """Create a TestProtocolServer instance.
424
425         :param client: An object meeting the unittest.TestResult protocol.
426         :param stream: The stream that lines received which are not part of the
427             subunit protocol should be written to. This allows custom handling
428             of mixed protocols. By default, sys.stdout will be used for
429             convenience.
430         """
431         self.client = test_results.ExtendedToOriginalDecorator(client)
432         if stream is None:
433             stream = sys.stdout
434         self._stream = stream
435         # state objects we can switch too
436         self._in_test = _InTest(self)
437         self._outside_test = _OutSideTest(self)
438         self._reading_error_details = _ReadingErrorDetails(self)
439         self._reading_failure_details = _ReadingFailureDetails(self)
440         self._reading_skip_details = _ReadingSkipDetails(self)
441         self._reading_success_details = _ReadingSuccessDetails(self)
442         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
443         # start with outside test.
444         self._state = self._outside_test
445
446     def _handleProgress(self, offset, line):
447         """Process a progress directive."""
448         line = line[offset:].strip()
449         if line[0] in '+-':
450             whence = PROGRESS_CUR
451             delta = int(line)
452         elif line == "push":
453             whence = PROGRESS_PUSH
454             delta = None
455         elif line == "pop":
456             whence = PROGRESS_POP
457             delta = None
458         else:
459             whence = PROGRESS_SET
460             delta = int(line)
461         self.client.progress(delta, whence)
462
463     def _handleTags(self, offset, line):
464         """Process a tags command."""
465         tags = line[offset:].split()
466         new_tags, gone_tags = tags_to_new_gone(tags)
467         self.client.tags(new_tags, gone_tags)
468
469     def _handleTime(self, offset, line):
470         # Accept it, but do not do anything with it yet.
471         try:
472             event_time = iso8601.parse_date(line[offset:-1])
473         except TypeError, e:
474             raise TypeError("Failed to parse %r, got %r" % (line, e))
475         self.client.time(event_time)
476
477     def lineReceived(self, line):
478         """Call the appropriate local method for the received line."""
479         self._state.lineReceived(line)
480
481     def _lostConnectionInTest(self, state_string):
482         error_string = "lost connection during %stest '%s'" % (
483             state_string, self.current_test_description)
484         self.client.addError(self._current_test, RemoteError(error_string))
485         self.client.stopTest(self._current_test)
486
487     def lostConnection(self):
488         """The input connection has finished."""
489         self._state.lostConnection()
490
491     def readFrom(self, pipe):
492         """Blocking convenience API to parse an entire stream.
493         
494         :param pipe: A file-like object supporting readlines().
495         :return: None.
496         """
497         for line in pipe.readlines():
498             self.lineReceived(line)
499         self.lostConnection()
500
501     def _startTest(self, offset, line):
502         """Internal call to change state machine. Override startTest()."""
503         self._state.startTest(offset, line)
504
505     def stdOutLineReceived(self, line):
506         self._stream.write(line)
507
508
509 class RemoteException(Exception):
510     """An exception that occured remotely to Python."""
511
512     def __eq__(self, other):
513         try:
514             return self.args == other.args
515         except AttributeError:
516             return False
517
518
519 class TestProtocolClient(unittest.TestResult):
520     """A TestResult which generates a subunit stream for a test run.
521     
522     # Get a TestSuite or TestCase to run
523     suite = make_suite()
524     # Create a stream (any object with a 'write' method)
525     stream = file('tests.log', 'wb')
526     # Create a subunit result object which will output to the stream
527     result = subunit.TestProtocolClient(stream)
528     # Optionally, to get timing data for performance analysis, wrap the
529     # serialiser with a timing decorator
530     result = subunit.test_results.AutoTimingTestResultDecorator(result)
531     # Run the test suite reporting to the subunit result object
532     suite.run(result)
533     # Close the stream.
534     stream.close()
535     """
536
537     def __init__(self, stream):
538         unittest.TestResult.__init__(self)
539         self._stream = stream
540
541     def addError(self, test, error=None, details=None):
542         """Report an error in test test.
543         
544         Only one of error and details should be provided: conceptually there
545         are two separate methods:
546             addError(self, test, error)
547             addError(self, test, details)
548
549         :param error: Standard unittest positional argument form - an
550             exc_info tuple.
551         :param details: New Testing-in-python drafted API; a dict from string
552             to subunit.Content objects.
553         """
554         self._addOutcome("error", test, error=error, details=details)
555
556     def addExpectedFailure(self, test, error=None, details=None):
557         """Report an expected failure in test test.
558         
559         Only one of error and details should be provided: conceptually there
560         are two separate methods:
561             addError(self, test, error)
562             addError(self, test, details)
563
564         :param error: Standard unittest positional argument form - an
565             exc_info tuple.
566         :param details: New Testing-in-python drafted API; a dict from string
567             to subunit.Content objects.
568         """
569         self._addOutcome("xfail", test, error=error, details=details)
570
571     def addFailure(self, test, error=None, details=None):
572         """Report a failure in test test.
573         
574         Only one of error and details should be provided: conceptually there
575         are two separate methods:
576             addFailure(self, test, error)
577             addFailure(self, test, details)
578
579         :param error: Standard unittest positional argument form - an
580             exc_info tuple.
581         :param details: New Testing-in-python drafted API; a dict from string
582             to subunit.Content objects.
583         """
584         self._addOutcome("failure", test, error=error, details=details)
585
586     def _addOutcome(self, outcome, test, error=None, details=None):
587         """Report a failure in test test.
588         
589         Only one of error and details should be provided: conceptually there
590         are two separate methods:
591             addOutcome(self, test, error)
592             addOutcome(self, test, details)
593
594         :param outcome: A string describing the outcome - used as the
595             event name in the subunit stream.
596         :param error: Standard unittest positional argument form - an
597             exc_info tuple.
598         :param details: New Testing-in-python drafted API; a dict from string
599             to subunit.Content objects.
600         """
601         self._stream.write("%s: %s" % (outcome, test.id()))
602         if error is None and details is None:
603             raise ValueError
604         if error is not None:
605             self._stream.write(" [\n")
606             for line in self._exc_info_to_string(error, test).splitlines():
607                 self._stream.write("%s\n" % line)
608         else:
609             self._write_details(details)
610         self._stream.write("]\n")
611
612     def addSkip(self, test, reason=None, details=None):
613         """Report a skipped test."""
614         if reason is None:
615             self._addOutcome("skip", test, error=None, details=details)
616         else:
617             self._stream.write("skip: %s [\n" % test.id())
618             self._stream.write("%s\n" % reason)
619             self._stream.write("]\n")
620
621     def addSuccess(self, test, details=None):
622         """Report a success in a test."""
623         self._stream.write("successful: %s" % test.id())
624         if not details:
625             self._stream.write("\n")
626         else:
627             self._write_details(details)
628             self._stream.write("]\n")
629     addUnexpectedSuccess = addSuccess
630
631     def startTest(self, test):
632         """Mark a test as starting its test run."""
633         self._stream.write("test: %s\n" % test.id())
634
635     def progress(self, offset, whence):
636         """Provide indication about the progress/length of the test run.
637
638         :param offset: Information about the number of tests remaining. If
639             whence is PROGRESS_CUR, then offset increases/decreases the
640             remaining test count. If whence is PROGRESS_SET, then offset
641             specifies exactly the remaining test count.
642         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
643             PROGRESS_POP.
644         """
645         if whence == PROGRESS_CUR and offset > -1:
646             prefix = "+"
647         elif whence == PROGRESS_PUSH:
648             prefix = ""
649             offset = "push"
650         elif whence == PROGRESS_POP:
651             prefix = ""
652             offset = "pop"
653         else:
654             prefix = ""
655         self._stream.write("progress: %s%s\n" % (prefix, offset))
656
657     def time(self, a_datetime):
658         """Inform the client of the time.
659
660         ":param datetime: A datetime.datetime object.
661         """
662         time = a_datetime.astimezone(iso8601.Utc())
663         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
664             time.year, time.month, time.day, time.hour, time.minute,
665             time.second, time.microsecond))
666
667     def _write_details(self, details):
668         """Output details to the stream.
669
670         :param details: An extended details dict for a test outcome.
671         """
672         self._stream.write(" [ multipart\n")
673         for name, content in sorted(details.iteritems()):
674             self._stream.write("Content-Type: %s/%s" %
675                 (content.content_type.type, content.content_type.subtype))
676             parameters = content.content_type.parameters
677             if parameters:
678                 self._stream.write(";")
679                 param_strs = []
680                 for param, value in parameters.iteritems():
681                     param_strs.append("%s=%s" % (param, value))
682                 self._stream.write(",".join(param_strs))
683             self._stream.write("\n%s\n" % name)
684             encoder = chunked.Encoder(self._stream)
685             map(encoder.write, content.iter_bytes())
686             encoder.close()
687
688     def done(self):
689         """Obey the testtools result.done() interface."""
690
691
692 def RemoteError(description=""):
693     if description == "":
694         description = "\n"
695     return (RemoteException, RemoteException(description), None)
696
697
698 class RemotedTestCase(unittest.TestCase):
699     """A class to represent test cases run in child processes.
700     
701     Instances of this class are used to provide the Python test API a TestCase
702     that can be printed to the screen, introspected for metadata and so on.
703     However, as they are a simply a memoisation of a test that was actually
704     run in the past by a separate process, they cannot perform any interactive
705     actions.
706     """
707
708     def __eq__ (self, other):
709         try:
710             return self.__description == other.__description
711         except AttributeError:
712             return False
713
714     def __init__(self, description):
715         """Create a psuedo test case with description description."""
716         self.__description = description
717
718     def error(self, label):
719         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
720             label)
721
722     def setUp(self):
723         self.error("setUp")
724
725     def tearDown(self):
726         self.error("tearDown")
727
728     def shortDescription(self):
729         return self.__description
730
731     def id(self):
732         return "%s" % (self.__description,)
733
734     def __str__(self):
735         return "%s (%s)" % (self.__description, self._strclass())
736
737     def __repr__(self):
738         return "<%s description='%s'>" % \
739                (self._strclass(), self.__description)
740
741     def run(self, result=None):
742         if result is None: result = self.defaultTestResult()
743         result.startTest(self)
744         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
745         result.stopTest(self)
746
747     def _strclass(self):
748         cls = self.__class__
749         return "%s.%s" % (cls.__module__, cls.__name__)
750
751
752 class ExecTestCase(unittest.TestCase):
753     """A test case which runs external scripts for test fixtures."""
754
755     def __init__(self, methodName='runTest'):
756         """Create an instance of the class that will use the named test
757            method when executed. Raises a ValueError if the instance does
758            not have a method with the specified name.
759         """
760         unittest.TestCase.__init__(self, methodName)
761         testMethod = getattr(self, methodName)
762         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
763                                testMethod.__doc__)
764
765     def countTestCases(self):
766         return 1
767
768     def run(self, result=None):
769         if result is None: result = self.defaultTestResult()
770         self._run(result)
771
772     def debug(self):
773         """Run the test without collecting errors in a TestResult"""
774         self._run(unittest.TestResult())
775
776     def _run(self, result):
777         protocol = TestProtocolServer(result)
778         output = subprocess.Popen(self.script, shell=True,
779             stdout=subprocess.PIPE).communicate()[0]
780         protocol.readFrom(StringIO(output))
781
782
783 class IsolatedTestCase(unittest.TestCase):
784     """A TestCase which executes in a forked process.
785     
786     Each test gets its own process, which has a performance overhead but will
787     provide excellent isolation from global state (such as django configs,
788     zope utilities and so on).
789     """
790
791     def run(self, result=None):
792         if result is None: result = self.defaultTestResult()
793         run_isolated(unittest.TestCase, self, result)
794
795
796 class IsolatedTestSuite(unittest.TestSuite):
797     """A TestSuite which runs its tests in a forked process.
798     
799     This decorator that will fork() before running the tests and report the
800     results from the child process using a Subunit stream.  This is useful for
801     handling tests that mutate global state, or are testing C extensions that
802     could crash the VM.
803     """
804
805     def run(self, result=None):
806         if result is None: result = unittest.TestResult()
807         run_isolated(unittest.TestSuite, self, result)
808
809
810 def run_isolated(klass, self, result):
811     """Run a test suite or case in a subprocess, using the run method on klass.
812     """
813     c2pread, c2pwrite = os.pipe()
814     # fixme - error -> result
815     # now fork
816     pid = os.fork()
817     if pid == 0:
818         # Child
819         # Close parent's pipe ends
820         os.close(c2pread)
821         # Dup fds for child
822         os.dup2(c2pwrite, 1)
823         # Close pipe fds.
824         os.close(c2pwrite)
825
826         # at this point, sys.stdin is redirected, now we want
827         # to filter it to escape ]'s.
828         ### XXX: test and write that bit.
829
830         result = TestProtocolClient(sys.stdout)
831         klass.run(self, result)
832         sys.stdout.flush()
833         sys.stderr.flush()
834         # exit HARD, exit NOW.
835         os._exit(0)
836     else:
837         # Parent
838         # Close child pipe ends
839         os.close(c2pwrite)
840         # hookup a protocol engine
841         protocol = TestProtocolServer(result)
842         protocol.readFrom(os.fdopen(c2pread, 'rU'))
843         os.waitpid(pid, 0)
844         # TODO return code evaluation.
845     return result
846
847
848 def TAP2SubUnit(tap, subunit):
849     """Filter a TAP pipe into a subunit pipe.
850     
851     :param tap: A tap pipe/stream/file object.
852     :param subunit: A pipe/stream/file object to write subunit results to.
853     :return: The exit code to exit with.
854     """
855     BEFORE_PLAN = 0
856     AFTER_PLAN = 1
857     SKIP_STREAM = 2
858     client = TestProtocolClient(subunit)
859     state = BEFORE_PLAN
860     plan_start = 1
861     plan_stop = 0
862     def _skipped_test(subunit, plan_start):
863         # Some tests were skipped.
864         subunit.write('test test %d\n' % plan_start)
865         subunit.write('error test %d [\n' % plan_start)
866         subunit.write('test missing from TAP output\n')
867         subunit.write(']\n')
868         return plan_start + 1
869     # Test data for the next test to emit
870     test_name = None
871     log = []
872     result = None
873     def _emit_test():
874         "write out a test"
875         if test_name is None:
876             return
877         subunit.write("test %s\n" % test_name)
878         if not log:
879             subunit.write("%s %s\n" % (result, test_name))
880         else:
881             subunit.write("%s %s [\n" % (result, test_name))
882         if log:
883             for line in log:
884                 subunit.write("%s\n" % line)
885             subunit.write("]\n")
886         del log[:]
887     for line in tap:
888         if state == BEFORE_PLAN:
889             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
890             if match:
891                 state = AFTER_PLAN
892                 _, plan_stop, comment = match.groups()
893                 plan_stop = int(plan_stop)
894                 if plan_start > plan_stop and plan_stop == 0:
895                     # skipped file
896                     state = SKIP_STREAM
897                     subunit.write("test file skip\n")
898                     subunit.write("skip file skip [\n")
899                     subunit.write("%s\n" % comment)
900                     subunit.write("]\n")
901                 continue
902         # not a plan line, or have seen one before
903         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
904         if match:
905             # new test, emit current one.
906             _emit_test()
907             status, number, description, directive, directive_comment = match.groups()
908             if status == 'ok':
909                 result = 'success'
910             else:
911                 result = "failure"
912             if description is None:
913                 description = ''
914             else:
915                 description = ' ' + description
916             if directive is not None:
917                 if directive == 'TODO':
918                     result = 'xfail'
919                 elif directive == 'SKIP':
920                     result = 'skip'
921                 if directive_comment is not None:
922                     log.append(directive_comment)
923             if number is not None:
924                 number = int(number)
925                 while plan_start < number:
926                     plan_start = _skipped_test(subunit, plan_start)
927             test_name = "test %d%s" % (plan_start, description)
928             plan_start += 1
929             continue
930         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
931         if match:
932             reason, = match.groups()
933             if reason is None:
934                 extra = ''
935             else:
936                 extra = ' %s' % reason
937             _emit_test()
938             test_name = "Bail out!%s" % extra
939             result = "error"
940             state = SKIP_STREAM
941             continue
942         match = re.match("\#.*\n", line)
943         if match:
944             log.append(line[:-1])
945             continue
946         subunit.write(line)
947     _emit_test()
948     while plan_start <= plan_stop:
949         # record missed tests
950         plan_start = _skipped_test(subunit, plan_start)
951     return 0
952
953
954 def tag_stream(original, filtered, tags):
955     """Alter tags on a stream.
956
957     :param original: The input stream.
958     :param filtered: The output stream.
959     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
960         '-TAG' commands.
961
962         A 'TAG' command will add the tag to the output stream,
963         and override any existing '-TAG' command in that stream.
964         Specifically:
965          * A global 'tags: TAG' will be added to the start of the stream.
966          * Any tags commands with -TAG will have the -TAG removed.
967
968         A '-TAG' command will remove the TAG command from the stream.
969         Specifically:
970          * A 'tags: -TAG' command will be added to the start of the stream.
971          * Any 'tags: TAG' command will have 'TAG' removed from it.
972         Additionally, any redundant tagging commands (adding a tag globally
973         present, or removing a tag globally removed) are stripped as a
974         by-product of the filtering.
975     :return: 0
976     """
977     new_tags, gone_tags = tags_to_new_gone(tags)
978     def write_tags(new_tags, gone_tags):
979         if new_tags or gone_tags:
980             filtered.write("tags: " + ' '.join(new_tags))
981             if gone_tags:
982                 for tag in gone_tags:
983                     filtered.write("-" + tag)
984             filtered.write("\n")
985     write_tags(new_tags, gone_tags)
986     # TODO: use the protocol parser and thus don't mangle test comments.
987     for line in original:
988         if line.startswith("tags:"):
989             line_tags = line[5:].split()
990             line_new, line_gone = tags_to_new_gone(line_tags)
991             line_new = line_new - gone_tags
992             line_gone = line_gone - new_tags
993             write_tags(line_new, line_gone)
994         else:
995             filtered.write(line)
996     return 0
997
998
999 class ProtocolTestCase(object):
1000     """Subunit wire protocol to unittest.TestCase adapter.
1001
1002     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1003     calling a ProtocolTestCase or invoking the run() method will make a 'test
1004     run' happen. The 'test run' will simply be a replay of the test activity
1005     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1006     and ``countTestCases`` methods are not supported because there isn't a
1007     sensible mapping for those methods.
1008     
1009     # Get a stream (any object with a readline() method), in this case the
1010     # stream output by the example from ``subunit.TestProtocolClient``.
1011     stream = file('tests.log', 'rb')
1012     # Create a parser which will read from the stream and emit 
1013     # activity to a unittest.TestResult when run() is called.
1014     suite = subunit.ProtocolTestCase(stream)
1015     # Create a result object to accept the contents of that stream.
1016     result = unittest._TextTestResult(sys.stdout)
1017     # 'run' the tests - process the stream and feed its contents to result.
1018     suite.run(result)
1019     stream.close()
1020
1021     :seealso: TestProtocolServer (the subunit wire protocol parser).
1022     """
1023
1024     def __init__(self, stream, passthrough=None):
1025         """Create a ProtocolTestCase reading from stream.
1026
1027         :param stream: A filelike object which a subunit stream can be read
1028             from.
1029         :param passthrough: A stream pass non subunit input on to. If not
1030             supplied, the TestProtocolServer default is used.
1031         """
1032         self._stream = stream
1033         self._passthrough = passthrough
1034
1035     def __call__(self, result=None):
1036         return self.run(result)
1037
1038     def run(self, result=None):
1039         if result is None:
1040             result = self.defaultTestResult()
1041         protocol = TestProtocolServer(result, self._passthrough)
1042         line = self._stream.readline()
1043         while line:
1044             protocol.lineReceived(line)
1045             line = self._stream.readline()
1046         protocol.lostConnection()
1047
1048
1049 class TestResultStats(unittest.TestResult):
1050     """A pyunit TestResult interface implementation for making statistics.
1051     
1052     :ivar total_tests: The total tests seen.
1053     :ivar passed_tests: The tests that passed.
1054     :ivar failed_tests: The tests that failed.
1055     :ivar seen_tags: The tags seen across all tests.
1056     """
1057
1058     def __init__(self, stream):
1059         """Create a TestResultStats which outputs to stream."""
1060         unittest.TestResult.__init__(self)
1061         self._stream = stream
1062         self.failed_tests = 0
1063         self.skipped_tests = 0
1064         self.seen_tags = set()
1065
1066     @property
1067     def total_tests(self):
1068         return self.testsRun
1069
1070     def addError(self, test, err):
1071         self.failed_tests += 1
1072
1073     def addFailure(self, test, err):
1074         self.failed_tests += 1
1075
1076     def addSkip(self, test, reason):
1077         self.skipped_tests += 1
1078
1079     def formatStats(self):
1080         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1081         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1082         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1083         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1084         tags = sorted(self.seen_tags)
1085         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1086
1087     @property
1088     def passed_tests(self):
1089         return self.total_tests - self.failed_tests - self.skipped_tests
1090
1091     def tags(self, new_tags, gone_tags):
1092         """Accumulate the seen tags."""
1093         self.seen_tags.update(new_tags)
1094
1095     def wasSuccessful(self):
1096         """Tells whether or not this result was a success"""
1097         return self.failed_tests == 0
1098
1099
1100 class TestResultFilter(unittest.TestResult):
1101     """A pyunit TestResult interface implementation which filters tests.
1102
1103     Tests that pass the filter are handed on to another TestResult instance
1104     for further processing/reporting. To obtain the filtered results, 
1105     the other instance must be interrogated.
1106
1107     :ivar result: The result that tests are passed to after filtering.
1108     :ivar filter_predicate: The callback run to decide whether to pass 
1109         a result.
1110     """
1111
1112     def __init__(self, result, filter_error=False, filter_failure=False,
1113         filter_success=True, filter_skip=False,
1114         filter_predicate=None):
1115         """Create a FilterResult object filtering to result.
1116         
1117         :param filter_error: Filter out errors.
1118         :param filter_failure: Filter out failures.
1119         :param filter_success: Filter out successful tests.
1120         :param filter_skip: Filter out skipped tests.
1121         :param filter_predicate: A callable taking (test, err) and 
1122             returning True if the result should be passed through.
1123             err is None for success.
1124         """
1125         unittest.TestResult.__init__(self)
1126         self.result = result
1127         self._filter_error = filter_error
1128         self._filter_failure = filter_failure
1129         self._filter_success = filter_success
1130         self._filter_skip = filter_skip
1131         if filter_predicate is None:
1132             filter_predicate = lambda test, err: True
1133         self.filter_predicate = filter_predicate
1134         # The current test (for filtering tags)
1135         self._current_test = None
1136         # Has the current test been filtered (for outputting test tags)
1137         self._current_test_filtered = None
1138         # The (new, gone) tags for the current test.
1139         self._current_test_tags = None
1140         
1141     def addError(self, test, err):
1142         if not self._filter_error and self.filter_predicate(test, err):
1143             self.result.startTest(test)
1144             self.result.addError(test, err)
1145
1146     def addFailure(self, test, err):
1147         if not self._filter_failure and self.filter_predicate(test, err):
1148             self.result.startTest(test)
1149             self.result.addFailure(test, err)
1150
1151     def addSkip(self, test, reason):
1152         if not self._filter_skip and self.filter_predicate(test, reason):
1153             self.result.startTest(test)
1154             # This is duplicated, it would be nice to have on a 'calls
1155             # TestResults' mixin perhaps.
1156             addSkip = getattr(self.result, 'addSkip', None)
1157             if not callable(addSkip):
1158                 self.result.addError(test, RemoteError(reason))
1159             else:
1160                 self.result.addSkip(test, reason)
1161
1162     def addSuccess(self, test):
1163         if not self._filter_success and self.filter_predicate(test, None):
1164             self.result.startTest(test)
1165             self.result.addSuccess(test)
1166
1167     def startTest(self, test):
1168         """Start a test.
1169         
1170         Not directly passed to the client, but used for handling of tags
1171         correctly.
1172         """
1173         self._current_test = test
1174         self._current_test_filtered = False
1175         self._current_test_tags = set(), set()
1176     
1177     def stopTest(self, test):
1178         """Stop a test.
1179         
1180         Not directly passed to the client, but used for handling of tags
1181         correctly.
1182         """
1183         if not self._current_test_filtered:
1184             # Tags to output for this test.
1185             if self._current_test_tags[0] or self._current_test_tags[1]:
1186                 tags_method = getattr(self.result, 'tags', None)
1187                 if callable(tags_method):
1188                     self.result.tags(*self._current_test_tags)
1189             self.result.stopTest(test)
1190         self._current_test = None
1191         self._current_test_filtered = None
1192         self._current_test_tags = None
1193
1194     def tags(self, new_tags, gone_tags):
1195         """Handle tag instructions.
1196
1197         Adds and removes tags as appropriate. If a test is currently running,
1198         tags are not affected for subsequent tests.
1199         
1200         :param new_tags: Tags to add,
1201         :param gone_tags: Tags to remove.
1202         """
1203         if self._current_test is not None:
1204             # gather the tags until the test stops.
1205             self._current_test_tags[0].update(new_tags)
1206             self._current_test_tags[0].difference_update(gone_tags)
1207             self._current_test_tags[1].update(gone_tags)
1208             self._current_test_tags[1].difference_update(new_tags)
1209         tags_method = getattr(self.result, 'tags', None)
1210         if tags_method is None:
1211             return
1212         return tags_method(new_tags, gone_tags)
1213
1214     def id_to_orig_id(self, id):
1215         if id.startswith("subunit.RemotedTestCase."):
1216             return id[len("subunit.RemotedTestCase."):]
1217         return id