Add --forward option to subunit2junitxml.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ========
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extesion
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for tags,
47 timestamping and progress markers).
48
49 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
50 remove tags in the test run that is currently executing. If called when no
51 test is in progress (that is, if called outside of the ``startTest``, 
52 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
53 when a test is in progress, then the tags only apply to that test.
54
55 The ``time(a_datetime)`` method is called (if present) when a ``time:``
56 directive is encountered in a Subunit stream. This is used to tell a TestResult
57 about the time that events in the stream occured at, to allow reconstructing
58 test timing from a stream.
59
60 The ``progress(offset, whence)`` method controls progress data for a stream.
61 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
62 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
63 ignore the offset parameter.
64
65
66 Python test support
67 -------------------
68
69 ``subunit.run`` is a convenience wrapper to run a Python test suite via
70 the command line, reporting via Subunit::
71
72   $ python -m subunit.run mylib.tests.test_suite
73
74 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
75 tests, allowing isolation between the test runner and some tests.
76
77 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
78 tests that will fork() before that individual test is run.
79
80 `ExecTestCase`` is a convenience wrapper for running an external 
81 program to get a Subunit stream and then report that back to an arbitrary
82 result object::
83
84  class AggregateTests(subunit.ExecTestCase):
85
86      def test_script_one(self):
87          './bin/script_one'
88
89      def test_script_two(self):
90          './bin/script_two'
91  
92  # Normally your normal test loading would take of this automatically,
93  # It is only spelt out in detail here for clarity.
94  suite = unittest.TestSuite([AggregateTests("test_script_one"),
95      AggregateTests("test_script_two")])
96  # Create any TestResult class you like.
97  result = unittest._TextTestResult(sys.stdout)
98  # And run your suite as normal, Subunit will exec each external script as
99  # needed and report to your result object.
100  suite.run(result)
101 """
102
103 import datetime
104 import os
105 import re
106 from StringIO import StringIO
107 import subprocess
108 import sys
109 import unittest
110
111 import iso8601
112
113
114 PROGRESS_SET = 0
115 PROGRESS_CUR = 1
116 PROGRESS_PUSH = 2
117 PROGRESS_POP = 3
118
119
120 def test_suite():
121     import subunit.tests
122     return subunit.tests.test_suite()
123
124
125 def join_dir(base_path, path):
126     """
127     Returns an absolute path to C{path}, calculated relative to the parent
128     of C{base_path}.
129
130     @param base_path: A path to a file or directory.
131     @param path: An absolute path, or a path relative to the containing
132     directory of C{base_path}.
133
134     @return: An absolute path to C{path}.
135     """
136     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
137
138
139 def tags_to_new_gone(tags):
140     """Split a list of tags into a new_set and a gone_set."""
141     new_tags = set()
142     gone_tags = set()
143     for tag in tags:
144         if tag[0] == '-':
145             gone_tags.add(tag[1:])
146         else:
147             new_tags.add(tag)
148     return new_tags, gone_tags
149
150
151 class DiscardStream(object):
152     """A filelike object which discards what is written to it."""
153
154     def write(self, bytes):
155         pass
156
157
158 class TestProtocolServer(object):
159     """A parser for subunit.
160     
161     :ivar tags: The current tags associated with the protocol stream.
162     """
163
164     OUTSIDE_TEST = 0
165     TEST_STARTED = 1
166     READING_FAILURE = 2
167     READING_ERROR = 3
168     READING_SKIP = 4
169     READING_XFAIL = 5
170     READING_SUCCESS = 6
171
172     def __init__(self, client, stream=None, forward_stream=None):
173         """Create a TestProtocolServer instance.
174
175         :param client: An object meeting the unittest.TestResult protocol.
176         :param stream: The stream that lines received which are not part of the
177             subunit protocol should be written to. This allows custom handling
178             of mixed protocols. By default, sys.stdout will be used for
179             convenience.
180         :param forward_stream: A stream to forward subunit lines to. This 
181             allows a filter to forward the entire stream while still parsing
182             and acting on it. By default forward_stream is set to
183             DiscardStream() and no forwarding happens.
184         """
185         self.state = TestProtocolServer.OUTSIDE_TEST
186         self.client = client
187         if stream is None:
188             stream = sys.stdout
189         self._stream = stream
190         self._forward_stream = forward_stream or DiscardStream()
191
192     def _addError(self, offset, line):
193         if (self.state == TestProtocolServer.TEST_STARTED and
194             self.current_test_description == line[offset:-1]):
195             self.state = TestProtocolServer.OUTSIDE_TEST
196             self.current_test_description = None
197             self.client.addError(self._current_test, RemoteError(""))
198             self.client.stopTest(self._current_test)
199             self._current_test = None
200             self.subunitLineReceived(line)
201         elif (self.state == TestProtocolServer.TEST_STARTED and
202             self.current_test_description + " [" == line[offset:-1]):
203             self.state = TestProtocolServer.READING_ERROR
204             self._message = ""
205             self.subunitLineReceived(line)
206         else:
207             self.stdOutLineReceived(line)
208
209     def _addExpectedFail(self, offset, line):
210         if (self.state == TestProtocolServer.TEST_STARTED and
211             self.current_test_description == line[offset:-1]):
212             self.state = TestProtocolServer.OUTSIDE_TEST
213             self.current_test_description = None
214             xfail = getattr(self.client, 'addExpectedFailure', None)
215             if callable(xfail):
216                 xfail(self._current_test, RemoteError())
217             else:
218                 self.client.addSuccess(self._current_test)
219             self.client.stopTest(self._current_test)
220             self.subunitLineReceived(line)
221         elif (self.state == TestProtocolServer.TEST_STARTED and
222             self.current_test_description + " [" == line[offset:-1]):
223             self.state = TestProtocolServer.READING_XFAIL
224             self._message = ""
225             self.subunitLineReceived(line)
226         else:
227             self.stdOutLineReceived(line)
228
229     def _addFailure(self, offset, line):
230         if (self.state == TestProtocolServer.TEST_STARTED and
231             self.current_test_description == line[offset:-1]):
232             self.state = TestProtocolServer.OUTSIDE_TEST
233             self.current_test_description = None
234             self.client.addFailure(self._current_test, RemoteError())
235             self.client.stopTest(self._current_test)
236             self.subunitLineReceived(line)
237         elif (self.state == TestProtocolServer.TEST_STARTED and
238             self.current_test_description + " [" == line[offset:-1]):
239             self.state = TestProtocolServer.READING_FAILURE
240             self._message = ""
241             self.subunitLineReceived(line)
242         else:
243             self.stdOutLineReceived(line)
244
245     def _addSkip(self, offset, line):
246         if (self.state == TestProtocolServer.TEST_STARTED and
247             self.current_test_description == line[offset:-1]):
248             self.state = TestProtocolServer.OUTSIDE_TEST
249             self.current_test_description = None
250             self._skip_or_error()
251             self.client.stopTest(self._current_test)
252             self.subunitLineReceived(line)
253         elif (self.state == TestProtocolServer.TEST_STARTED and
254             self.current_test_description + " [" == line[offset:-1]):
255             self.state = TestProtocolServer.READING_SKIP
256             self._message = ""
257             self.subunitLineReceived(line)
258         else:
259             self.stdOutLineReceived(line)
260
261     def _skip_or_error(self, message=None):
262         """Report the current test as a skip if possible, or else an error."""
263         addSkip = getattr(self.client, 'addSkip', None)
264         if not callable(addSkip):
265             self.client.addError(self._current_test, RemoteError(message))
266         else:
267             if not message:
268                 message = "No reason given"
269             addSkip(self._current_test, message)
270
271     def _addSuccess(self, offset, line):
272         if (self.state == TestProtocolServer.TEST_STARTED and
273             self.current_test_description == line[offset:-1]):
274             self._succeedTest()
275             self.subunitLineReceived(line)
276         elif (self.state == TestProtocolServer.TEST_STARTED and
277             self.current_test_description + " [" == line[offset:-1]):
278             self.state = TestProtocolServer.READING_SUCCESS
279             self._message = ""
280             self.subunitLineReceived(line)
281         else:
282             self.stdOutLineReceived(line)
283
284     def _appendMessage(self, line):
285         if line[0:2] == " ]":
286             # quoted ] start
287             self._message += line[1:]
288         else:
289             self._message += line
290
291     def endQuote(self, line):
292         stdout = False
293         if self.state == TestProtocolServer.READING_FAILURE:
294             self.state = TestProtocolServer.OUTSIDE_TEST
295             self.current_test_description = None
296             self.client.addFailure(self._current_test,
297                                    RemoteError(self._message))
298             self.client.stopTest(self._current_test)
299         elif self.state == TestProtocolServer.READING_ERROR:
300             self.state = TestProtocolServer.OUTSIDE_TEST
301             self.current_test_description = None
302             self.client.addError(self._current_test,
303                                  RemoteError(self._message))
304             self.client.stopTest(self._current_test)
305         elif self.state == TestProtocolServer.READING_SKIP:
306             self.state = TestProtocolServer.OUTSIDE_TEST
307             self.current_test_description = None
308             self._skip_or_error(self._message)
309             self.client.stopTest(self._current_test)
310         elif self.state == TestProtocolServer.READING_XFAIL:
311             self.state = TestProtocolServer.OUTSIDE_TEST
312             self.current_test_description = None
313             xfail = getattr(self.client, 'addExpectedFailure', None)
314             if callable(xfail):
315                 xfail(self._current_test, RemoteError(self._message))
316             else:
317                 self.client.addSuccess(self._current_test)
318             self.client.stopTest(self._current_test)
319         elif self.state == TestProtocolServer.READING_SUCCESS:
320             self._succeedTest()
321         else:
322             self.stdOutLineReceived(line)
323             stdout = True
324         if not stdout:
325             self.subunitLineReceived(line)
326
327     def _handleProgress(self, offset, line):
328         """Process a progress directive."""
329         line = line[offset:].strip()
330         if line[0] in '+-':
331             whence = PROGRESS_CUR
332             delta = int(line)
333         elif line == "push":
334             whence = PROGRESS_PUSH
335             delta = None
336         elif line == "pop":
337             whence = PROGRESS_POP
338             delta = None
339         else:
340             whence = PROGRESS_SET
341             delta = int(line)
342         progress_method = getattr(self.client, 'progress', None)
343         if callable(progress_method):
344             progress_method(delta, whence)
345
346     def _handleTags(self, offset, line):
347         """Process a tags command."""
348         tags = line[offset:].split()
349         new_tags, gone_tags = tags_to_new_gone(tags)
350         tags_method = getattr(self.client, 'tags', None)
351         if tags_method is not None:
352             tags_method(new_tags, gone_tags)
353
354     def _handleTime(self, offset, line):
355         # Accept it, but do not do anything with it yet.
356         try:
357             event_time = iso8601.parse_date(line[offset:-1])
358         except TypeError, e:
359             raise TypeError("Failed to parse %r, got %r" % (line, e))
360         time_method = getattr(self.client, 'time', None)
361         if callable(time_method):
362             time_method(event_time)
363
364     def lineReceived(self, line):
365         """Call the appropriate local method for the received line."""
366         if line == "]\n":
367             self.endQuote(line)
368         elif self.state in (TestProtocolServer.READING_FAILURE,
369             TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP,
370             TestProtocolServer.READING_SUCCESS,
371             TestProtocolServer.READING_XFAIL
372             ):
373             self._appendMessage(line)
374             self.subunitLineReceived(line)
375         else:
376             parts = line.split(None, 1)
377             stdout = False
378             if len(parts) == 2:
379                 cmd, rest = parts
380                 offset = len(cmd) + 1
381                 cmd = cmd.strip(':')
382                 if cmd in ('test', 'testing'):
383                     self._startTest(offset, line)
384                 elif cmd == 'error':
385                     self._addError(offset, line)
386                 elif cmd == 'failure':
387                     self._addFailure(offset, line)
388                 elif cmd == 'progress':
389                     self._handleProgress(offset, line)
390                     self.subunitLineReceived(line)
391                 elif cmd == 'skip':
392                     self._addSkip(offset, line)
393                 elif cmd in ('success', 'successful'):
394                     self._addSuccess(offset, line)
395                 elif cmd in ('tags',):
396                     self._handleTags(offset, line)
397                     self.subunitLineReceived(line)
398                 elif cmd in ('time',):
399                     self._handleTime(offset, line)
400                     self.subunitLineReceived(line)
401                 elif cmd == 'xfail':
402                     self._addExpectedFail(offset, line)
403                 else:
404                     self.stdOutLineReceived(line)
405             else:
406                 self.stdOutLineReceived(line)
407
408     def _lostConnectionInTest(self, state_string):
409         error_string = "lost connection during %stest '%s'" % (
410             state_string, self.current_test_description)
411         self.client.addError(self._current_test, RemoteError(error_string))
412         self.client.stopTest(self._current_test)
413
414     def lostConnection(self):
415         """The input connection has finished."""
416         if self.state == TestProtocolServer.OUTSIDE_TEST:
417             return
418         if self.state == TestProtocolServer.TEST_STARTED:
419             self._lostConnectionInTest('')
420         elif self.state == TestProtocolServer.READING_ERROR:
421             self._lostConnectionInTest('error report of ')
422         elif self.state == TestProtocolServer.READING_FAILURE:
423             self._lostConnectionInTest('failure report of ')
424         elif self.state == TestProtocolServer.READING_SUCCESS:
425             self._lostConnectionInTest('success report of ')
426         elif self.state == TestProtocolServer.READING_SKIP:
427             self._lostConnectionInTest('skip report of ')
428         elif self.state == TestProtocolServer.READING_XFAIL:
429             self._lostConnectionInTest('xfail report of ')
430         else:
431             self._lostConnectionInTest('unknown state of ')
432
433     def readFrom(self, pipe):
434         for line in pipe.readlines():
435             self.lineReceived(line)
436         self.lostConnection()
437
438     def _startTest(self, offset, line):
439         """Internal call to change state machine. Override startTest()."""
440         if self.state == TestProtocolServer.OUTSIDE_TEST:
441             self.state = TestProtocolServer.TEST_STARTED
442             self._current_test = RemotedTestCase(line[offset:-1])
443             self.current_test_description = line[offset:-1]
444             self.client.startTest(self._current_test)
445             self.subunitLineReceived(line)
446         else:
447             self.stdOutLineReceived(line)
448
449     def subunitLineReceived(self, line):
450         self._forward_stream.write(line)
451
452     def stdOutLineReceived(self, line):
453         self._stream.write(line)
454
455     def _succeedTest(self):
456         self.client.addSuccess(self._current_test)
457         self.client.stopTest(self._current_test)
458         self.current_test_description = None
459         self._current_test = None
460         self.state = TestProtocolServer.OUTSIDE_TEST
461
462
463 class RemoteException(Exception):
464     """An exception that occured remotely to Python."""
465
466     def __eq__(self, other):
467         try:
468             return self.args == other.args
469         except AttributeError:
470             return False
471
472
473 class TestProtocolClient(unittest.TestResult):
474     """A TestResult which generates a subunit stream for a test run.
475     
476     # Get a TestSuite or TestCase to run
477     suite = make_suite()
478     # Create a stream (any object with a 'write' method)
479     stream = file('tests.log', 'wb')
480     # Create a subunit result object which will output to the stream
481     result = subunit.TestProtocolClient(stream)
482     # Optionally, to get timing data for performance analysis, wrap the
483     # serialiser with a timing decorator
484     result = subunit.test_results.AutoTimingTestResultDecorator(result)
485     # Run the test suite reporting to the subunit result object
486     suite.run(result)
487     # Close the stream.
488     stream.close()
489     """
490
491     def __init__(self, stream):
492         unittest.TestResult.__init__(self)
493         self._stream = stream
494
495     def addError(self, test, error):
496         """Report an error in test test."""
497         self._stream.write("error: %s [\n" % test.id())
498         for line in self._exc_info_to_string(error, test).splitlines():
499             self._stream.write("%s\n" % line)
500         self._stream.write("]\n")
501
502     def addFailure(self, test, error):
503         """Report a failure in test test."""
504         self._stream.write("failure: %s [\n" % test.id())
505         for line in self._exc_info_to_string(error, test).splitlines():
506             self._stream.write("%s\n" % line)
507         self._stream.write("]\n")
508
509     def addSkip(self, test, reason):
510         """Report a skipped test."""
511         self._stream.write("skip: %s [\n" % test.id())
512         self._stream.write("%s\n" % reason)
513         self._stream.write("]\n")
514
515     def addSuccess(self, test):
516         """Report a success in a test."""
517         self._stream.write("successful: %s\n" % test.id())
518
519     def startTest(self, test):
520         """Mark a test as starting its test run."""
521         self._stream.write("test: %s\n" % test.id())
522
523     def progress(self, offset, whence):
524         """Provide indication about the progress/length of the test run.
525
526         :param offset: Information about the number of tests remaining. If
527             whence is PROGRESS_CUR, then offset increases/decreases the
528             remaining test count. If whence is PROGRESS_SET, then offset
529             specifies exactly the remaining test count.
530         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
531             PROGRESS_POP.
532         """
533         if whence == PROGRESS_CUR and offset > -1:
534             prefix = "+"
535         elif whence == PROGRESS_PUSH:
536             prefix = ""
537             offset = "push"
538         elif whence == PROGRESS_POP:
539             prefix = ""
540             offset = "pop"
541         else:
542             prefix = ""
543         self._stream.write("progress: %s%s\n" % (prefix, offset))
544
545     def time(self, a_datetime):
546         """Inform the client of the time.
547
548         ":param datetime: A datetime.datetime object.
549         """
550         time = a_datetime.astimezone(iso8601.Utc())
551         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
552             time.year, time.month, time.day, time.hour, time.minute,
553             time.second, time.microsecond))
554
555     def done(self):
556         """Obey the testtools result.done() interface."""
557
558
559 def RemoteError(description=""):
560     if description == "":
561         description = "\n"
562     return (RemoteException, RemoteException(description), None)
563
564
565 class RemotedTestCase(unittest.TestCase):
566     """A class to represent test cases run in child processes.
567     
568     Instances of this class are used to provide the Python test API a TestCase
569     that can be printed to the screen, introspected for metadata and so on.
570     However, as they are a simply a memoisation of a test that was actually
571     run in the past by a separate process, they cannot perform any interactive
572     actions.
573     """
574
575     def __eq__ (self, other):
576         try:
577             return self.__description == other.__description
578         except AttributeError:
579             return False
580
581     def __init__(self, description):
582         """Create a psuedo test case with description description."""
583         self.__description = description
584
585     def error(self, label):
586         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
587             label)
588
589     def setUp(self):
590         self.error("setUp")
591
592     def tearDown(self):
593         self.error("tearDown")
594
595     def shortDescription(self):
596         return self.__description
597
598     def id(self):
599         return "%s" % (self.__description,)
600
601     def __str__(self):
602         return "%s (%s)" % (self.__description, self._strclass())
603
604     def __repr__(self):
605         return "<%s description='%s'>" % \
606                (self._strclass(), self.__description)
607
608     def run(self, result=None):
609         if result is None: result = self.defaultTestResult()
610         result.startTest(self)
611         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
612         result.stopTest(self)
613
614     def _strclass(self):
615         cls = self.__class__
616         return "%s.%s" % (cls.__module__, cls.__name__)
617
618
619 class ExecTestCase(unittest.TestCase):
620     """A test case which runs external scripts for test fixtures."""
621
622     def __init__(self, methodName='runTest'):
623         """Create an instance of the class that will use the named test
624            method when executed. Raises a ValueError if the instance does
625            not have a method with the specified name.
626         """
627         unittest.TestCase.__init__(self, methodName)
628         testMethod = getattr(self, methodName)
629         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
630                                testMethod.__doc__)
631
632     def countTestCases(self):
633         return 1
634
635     def run(self, result=None):
636         if result is None: result = self.defaultTestResult()
637         self._run(result)
638
639     def debug(self):
640         """Run the test without collecting errors in a TestResult"""
641         self._run(unittest.TestResult())
642
643     def _run(self, result):
644         protocol = TestProtocolServer(result)
645         output = subprocess.Popen(self.script, shell=True,
646             stdout=subprocess.PIPE).communicate()[0]
647         protocol.readFrom(StringIO(output))
648
649
650 class IsolatedTestCase(unittest.TestCase):
651     """A TestCase which executes in a forked process.
652     
653     Each test gets its own process, which has a performance overhead but will
654     provide excellent isolation from global state (such as django configs,
655     zope utilities and so on).
656     """
657
658     def run(self, result=None):
659         if result is None: result = self.defaultTestResult()
660         run_isolated(unittest.TestCase, self, result)
661
662
663 class IsolatedTestSuite(unittest.TestSuite):
664     """A TestSuite which runs its tests in a forked process.
665     
666     This decorator that will fork() before running the tests and report the
667     results from the child process using a Subunit stream.  This is useful for
668     handling tests that mutate global state, or are testing C extensions that
669     could crash the VM.
670     """
671
672     def run(self, result=None):
673         if result is None: result = unittest.TestResult()
674         run_isolated(unittest.TestSuite, self, result)
675
676
677 def run_isolated(klass, self, result):
678     """Run a test suite or case in a subprocess, using the run method on klass.
679     """
680     c2pread, c2pwrite = os.pipe()
681     # fixme - error -> result
682     # now fork
683     pid = os.fork()
684     if pid == 0:
685         # Child
686         # Close parent's pipe ends
687         os.close(c2pread)
688         # Dup fds for child
689         os.dup2(c2pwrite, 1)
690         # Close pipe fds.
691         os.close(c2pwrite)
692
693         # at this point, sys.stdin is redirected, now we want
694         # to filter it to escape ]'s.
695         ### XXX: test and write that bit.
696
697         result = TestProtocolClient(sys.stdout)
698         klass.run(self, result)
699         sys.stdout.flush()
700         sys.stderr.flush()
701         # exit HARD, exit NOW.
702         os._exit(0)
703     else:
704         # Parent
705         # Close child pipe ends
706         os.close(c2pwrite)
707         # hookup a protocol engine
708         protocol = TestProtocolServer(result)
709         protocol.readFrom(os.fdopen(c2pread, 'rU'))
710         os.waitpid(pid, 0)
711         # TODO return code evaluation.
712     return result
713
714
715 def TAP2SubUnit(tap, subunit):
716     """Filter a TAP pipe into a subunit pipe.
717     
718     :param tap: A tap pipe/stream/file object.
719     :param subunit: A pipe/stream/file object to write subunit results to.
720     :return: The exit code to exit with.
721     """
722     BEFORE_PLAN = 0
723     AFTER_PLAN = 1
724     SKIP_STREAM = 2
725     client = TestProtocolClient(subunit)
726     state = BEFORE_PLAN
727     plan_start = 1
728     plan_stop = 0
729     def _skipped_test(subunit, plan_start):
730         # Some tests were skipped.
731         subunit.write('test test %d\n' % plan_start)
732         subunit.write('error test %d [\n' % plan_start)
733         subunit.write('test missing from TAP output\n')
734         subunit.write(']\n')
735         return plan_start + 1
736     # Test data for the next test to emit
737     test_name = None
738     log = []
739     result = None
740     def _emit_test():
741         "write out a test"
742         if test_name is None:
743             return
744         subunit.write("test %s\n" % test_name)
745         if not log:
746             subunit.write("%s %s\n" % (result, test_name))
747         else:
748             subunit.write("%s %s [\n" % (result, test_name))
749         if log:
750             for line in log:
751                 subunit.write("%s\n" % line)
752             subunit.write("]\n")
753         del log[:]
754     for line in tap:
755         if state == BEFORE_PLAN:
756             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
757             if match:
758                 state = AFTER_PLAN
759                 _, plan_stop, comment = match.groups()
760                 plan_stop = int(plan_stop)
761                 if plan_start > plan_stop and plan_stop == 0:
762                     # skipped file
763                     state = SKIP_STREAM
764                     subunit.write("test file skip\n")
765                     subunit.write("skip file skip [\n")
766                     subunit.write("%s\n" % comment)
767                     subunit.write("]\n")
768                 continue
769         # not a plan line, or have seen one before
770         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
771         if match:
772             # new test, emit current one.
773             _emit_test()
774             status, number, description, directive, directive_comment = match.groups()
775             if status == 'ok':
776                 result = 'success'
777             else:
778                 result = "failure"
779             if description is None:
780                 description = ''
781             else:
782                 description = ' ' + description
783             if directive is not None:
784                 if directive == 'TODO':
785                     result = 'xfail'
786                 elif directive == 'SKIP':
787                     result = 'skip'
788                 if directive_comment is not None:
789                     log.append(directive_comment)
790             if number is not None:
791                 number = int(number)
792                 while plan_start < number:
793                     plan_start = _skipped_test(subunit, plan_start)
794             test_name = "test %d%s" % (plan_start, description)
795             plan_start += 1
796             continue
797         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
798         if match:
799             reason, = match.groups()
800             if reason is None:
801                 extra = ''
802             else:
803                 extra = ' %s' % reason
804             _emit_test()
805             test_name = "Bail out!%s" % extra
806             result = "error"
807             state = SKIP_STREAM
808             continue
809         match = re.match("\#.*\n", line)
810         if match:
811             log.append(line[:-1])
812             continue
813         subunit.write(line)
814     _emit_test()
815     while plan_start <= plan_stop:
816         # record missed tests
817         plan_start = _skipped_test(subunit, plan_start)
818     return 0
819
820
821 def tag_stream(original, filtered, tags):
822     """Alter tags on a stream.
823
824     :param original: The input stream.
825     :param filtered: The output stream.
826     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
827         '-TAG' commands.
828
829         A 'TAG' command will add the tag to the output stream,
830         and override any existing '-TAG' command in that stream.
831         Specifically:
832          * A global 'tags: TAG' will be added to the start of the stream.
833          * Any tags commands with -TAG will have the -TAG removed.
834
835         A '-TAG' command will remove the TAG command from the stream.
836         Specifically:
837          * A 'tags: -TAG' command will be added to the start of the stream.
838          * Any 'tags: TAG' command will have 'TAG' removed from it.
839         Additionally, any redundant tagging commands (adding a tag globally
840         present, or removing a tag globally removed) are stripped as a
841         by-product of the filtering.
842     :return: 0
843     """
844     new_tags, gone_tags = tags_to_new_gone(tags)
845     def write_tags(new_tags, gone_tags):
846         if new_tags or gone_tags:
847             filtered.write("tags: " + ' '.join(new_tags))
848             if gone_tags:
849                 for tag in gone_tags:
850                     filtered.write("-" + tag)
851             filtered.write("\n")
852     write_tags(new_tags, gone_tags)
853     # TODO: use the protocol parser and thus don't mangle test comments.
854     for line in original:
855         if line.startswith("tags:"):
856             line_tags = line[5:].split()
857             line_new, line_gone = tags_to_new_gone(line_tags)
858             line_new = line_new - gone_tags
859             line_gone = line_gone - new_tags
860             write_tags(line_new, line_gone)
861         else:
862             filtered.write(line)
863     return 0
864
865
866 class ProtocolTestCase(object):
867     """Subunit wire protocol to unittest.TestCase adapter.
868
869     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
870     calling a ProtocolTestCase or invoking the run() method will make a 'test
871     run' happen. The 'test run' will simply be a replay of the test activity
872     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
873     and ``countTestCases`` methods are not supported because there isn't a
874     sensible mapping for those methods.
875     
876     # Get a stream (any object with a readline() method), in this case the
877     # stream output by the example from ``subunit.TestProtocolClient``.
878     stream = file('tests.log', 'rb')
879     # Create a parser which will read from the stream and emit 
880     # activity to a unittest.TestResult when run() is called.
881     suite = subunit.ProtocolTestCase(stream)
882     # Create a result object to accept the contents of that stream.
883     result = unittest._TextTestResult(sys.stdout)
884     # 'run' the tests - process the stream and feed its contents to result.
885     suite.run(result)
886     stream.close()
887
888     :seealso: TestProtocolServer (the subunit wire protocol parser).
889     """
890
891     def __init__(self, stream, passthrough=None, forward=False):
892         """Create a ProtocolTestCase reading from stream.
893
894         :param stream: A filelike object which a subunit stream can be read
895             from.
896         :param passthrough: A stream pass non subunit input on to. If not
897             supplied, the TestProtocolServer default is used.
898         :param forward: A stream to pass subunit input on to. If not supplied
899             subunit input is not forwarded.
900         """
901         self._stream = stream
902         self._passthrough = passthrough
903         self._forward = forward
904
905     def __call__(self, result=None):
906         return self.run(result)
907
908     def run(self, result=None):
909         if result is None:
910             result = self.defaultTestResult()
911         protocol = TestProtocolServer(result, self._passthrough, self._forward)
912         line = self._stream.readline()
913         while line:
914             protocol.lineReceived(line)
915             line = self._stream.readline()
916         protocol.lostConnection()
917
918
919 class TestResultStats(unittest.TestResult):
920     """A pyunit TestResult interface implementation for making statistics.
921     
922     :ivar total_tests: The total tests seen.
923     :ivar passed_tests: The tests that passed.
924     :ivar failed_tests: The tests that failed.
925     :ivar seen_tags: The tags seen across all tests.
926     """
927
928     def __init__(self, stream):
929         """Create a TestResultStats which outputs to stream."""
930         unittest.TestResult.__init__(self)
931         self._stream = stream
932         self.failed_tests = 0
933         self.skipped_tests = 0
934         self.seen_tags = set()
935
936     @property
937     def total_tests(self):
938         return self.testsRun
939
940     def addError(self, test, err):
941         self.failed_tests += 1
942
943     def addFailure(self, test, err):
944         self.failed_tests += 1
945
946     def addSkip(self, test, reason):
947         self.skipped_tests += 1
948
949     def formatStats(self):
950         self._stream.write("Total tests:   %5d\n" % self.total_tests)
951         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
952         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
953         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
954         tags = sorted(self.seen_tags)
955         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
956
957     @property
958     def passed_tests(self):
959         return self.total_tests - self.failed_tests - self.skipped_tests
960
961     def tags(self, new_tags, gone_tags):
962         """Accumulate the seen tags."""
963         self.seen_tags.update(new_tags)
964
965     def wasSuccessful(self):
966         """Tells whether or not this result was a success"""
967         return self.failed_tests == 0
968
969
970 class TestResultFilter(unittest.TestResult):
971     """A pyunit TestResult interface implementation which filters tests.
972
973     Tests that pass the filter are handed on to another TestResult instance
974     for further processing/reporting. To obtain the filtered results, 
975     the other instance must be interrogated.
976
977     :ivar result: The result that tests are passed to after filtering.
978     :ivar filter_predicate: The callback run to decide whether to pass 
979         a result.
980     """
981
982     def __init__(self, result, filter_error=False, filter_failure=False,
983         filter_success=True, filter_skip=False,
984         filter_predicate=None):
985         """Create a FilterResult object filtering to result.
986         
987         :param filter_error: Filter out errors.
988         :param filter_failure: Filter out failures.
989         :param filter_success: Filter out successful tests.
990         :param filter_skip: Filter out skipped tests.
991         :param filter_predicate: A callable taking (test, err) and 
992             returning True if the result should be passed through.
993             err is None for success.
994         """
995         unittest.TestResult.__init__(self)
996         self.result = result
997         self._filter_error = filter_error
998         self._filter_failure = filter_failure
999         self._filter_success = filter_success
1000         self._filter_skip = filter_skip
1001         if filter_predicate is None:
1002             filter_predicate = lambda test, err: True
1003         self.filter_predicate = filter_predicate
1004         # The current test (for filtering tags)
1005         self._current_test = None
1006         # Has the current test been filtered (for outputting test tags)
1007         self._current_test_filtered = None
1008         # The (new, gone) tags for the current test.
1009         self._current_test_tags = None
1010         
1011     def addError(self, test, err):
1012         if not self._filter_error and self.filter_predicate(test, err):
1013             self.result.startTest(test)
1014             self.result.addError(test, err)
1015
1016     def addFailure(self, test, err):
1017         if not self._filter_failure and self.filter_predicate(test, err):
1018             self.result.startTest(test)
1019             self.result.addFailure(test, err)
1020
1021     def addSkip(self, test, reason):
1022         if not self._filter_skip and self.filter_predicate(test, reason):
1023             self.result.startTest(test)
1024             # This is duplicated, it would be nice to have on a 'calls
1025             # TestResults' mixin perhaps.
1026             addSkip = getattr(self.result, 'addSkip', None)
1027             if not callable(addSkip):
1028                 self.result.addError(test, RemoteError(reason))
1029             else:
1030                 self.result.addSkip(test, reason)
1031
1032     def addSuccess(self, test):
1033         if not self._filter_success and self.filter_predicate(test, None):
1034             self.result.startTest(test)
1035             self.result.addSuccess(test)
1036
1037     def startTest(self, test):
1038         """Start a test.
1039         
1040         Not directly passed to the client, but used for handling of tags
1041         correctly.
1042         """
1043         self._current_test = test
1044         self._current_test_filtered = False
1045         self._current_test_tags = set(), set()
1046     
1047     def stopTest(self, test):
1048         """Stop a test.
1049         
1050         Not directly passed to the client, but used for handling of tags
1051         correctly.
1052         """
1053         if not self._current_test_filtered:
1054             # Tags to output for this test.
1055             if self._current_test_tags[0] or self._current_test_tags[1]:
1056                 tags_method = getattr(self.result, 'tags', None)
1057                 if callable(tags_method):
1058                     self.result.tags(*self._current_test_tags)
1059             self.result.stopTest(test)
1060         self._current_test = None
1061         self._current_test_filtered = None
1062         self._current_test_tags = None
1063
1064     def tags(self, new_tags, gone_tags):
1065         """Handle tag instructions.
1066
1067         Adds and removes tags as appropriate. If a test is currently running,
1068         tags are not affected for subsequent tests.
1069         
1070         :param new_tags: Tags to add,
1071         :param gone_tags: Tags to remove.
1072         """
1073         if self._current_test is not None:
1074             # gather the tags until the test stops.
1075             self._current_test_tags[0].update(new_tags)
1076             self._current_test_tags[0].difference_update(gone_tags)
1077             self._current_test_tags[1].update(gone_tags)
1078             self._current_test_tags[1].difference_update(new_tags)
1079         tags_method = getattr(self.result, 'tags', None)
1080         if tags_method is None:
1081             return
1082         return tags_method(new_tags, gone_tags)
1083
1084     def id_to_orig_id(self, id):
1085         if id.startswith("subunit.RemotedTestCase."):
1086             return id[len("subunit.RemotedTestCase."):]
1087         return id