Move the TEST_STARTED parser state to a state object.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ========
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to 
53 ``subunit.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc.
57
58 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
59 remove tags in the test run that is currently executing. If called when no
60 test is in progress (that is, if called outside of the ``startTest``, 
61 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called
62 when a test is in progress, then the tags only apply to that test.
63
64 The ``time(a_datetime)`` method is called (if present) when a ``time:``
65 directive is encountered in a Subunit stream. This is used to tell a TestResult
66 about the time that events in the stream occured at, to allow reconstructing
67 test timing from a stream.
68
69 The ``progress(offset, whence)`` method controls progress data for a stream.
70 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
71 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
72 ignore the offset parameter.
73
74
75 Python test support
76 -------------------
77
78 ``subunit.run`` is a convenience wrapper to run a Python test suite via
79 the command line, reporting via Subunit::
80
81   $ python -m subunit.run mylib.tests.test_suite
82
83 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
84 tests, allowing isolation between the test runner and some tests.
85
86 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
87 tests that will fork() before that individual test is run.
88
89 `ExecTestCase`` is a convenience wrapper for running an external 
90 program to get a Subunit stream and then report that back to an arbitrary
91 result object::
92
93  class AggregateTests(subunit.ExecTestCase):
94
95      def test_script_one(self):
96          './bin/script_one'
97
98      def test_script_two(self):
99          './bin/script_two'
100  
101  # Normally your normal test loading would take of this automatically,
102  # It is only spelt out in detail here for clarity.
103  suite = unittest.TestSuite([AggregateTests("test_script_one"),
104      AggregateTests("test_script_two")])
105  # Create any TestResult class you like.
106  result = unittest._TextTestResult(sys.stdout)
107  # And run your suite as normal, Subunit will exec each external script as
108  # needed and report to your result object.
109  suite.run(result)
110
111 Utility modules
112 ---------------
113
114 * subunit.chunked contains HTTP chunked encoding/decoding logic.
115 * subunit.content contains a minimal assumptions MIME content representation.
116 * subunit.content_type contains a MIME Content-Type representation.
117 """
118
119 import datetime
120 import os
121 import re
122 from StringIO import StringIO
123 import subprocess
124 import sys
125 import unittest
126
127 import iso8601
128
129 import chunked, content, content_type
130
131
132 PROGRESS_SET = 0
133 PROGRESS_CUR = 1
134 PROGRESS_PUSH = 2
135 PROGRESS_POP = 3
136
137
138 def test_suite():
139     import subunit.tests
140     return subunit.tests.test_suite()
141
142
143 def join_dir(base_path, path):
144     """
145     Returns an absolute path to C{path}, calculated relative to the parent
146     of C{base_path}.
147
148     @param base_path: A path to a file or directory.
149     @param path: An absolute path, or a path relative to the containing
150     directory of C{base_path}.
151
152     @return: An absolute path to C{path}.
153     """
154     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
155
156
157 def tags_to_new_gone(tags):
158     """Split a list of tags into a new_set and a gone_set."""
159     new_tags = set()
160     gone_tags = set()
161     for tag in tags:
162         if tag[0] == '-':
163             gone_tags.add(tag[1:])
164         else:
165             new_tags.add(tag)
166     return new_tags, gone_tags
167
168
169 class DiscardStream(object):
170     """A filelike object which discards what is written to it."""
171
172     def write(self, bytes):
173         pass
174
175
176 class _ParserState(object):
177     """State for the subunit parser."""
178
179     def __init__(self, parser):
180         self.parser = parser
181
182     def addError(self, offset, line):
183         """An 'error:' directive has been read."""
184         self.parser.stdOutLineReceived(line)
185
186     def addExpectedFail(self, offset, line):
187         """An 'xfail:' directive has been read."""
188         self.parser.stdOutLineReceived(line)
189
190     def addFailure(self, offset, line):
191         """A 'failure:' directive has been read."""
192         self.parser.stdOutLineReceived(line)
193
194     def addSkip(self, offset, line):
195         """A 'skip:' directive has been read."""
196         self.parser.stdOutLineReceived(line)
197
198     def addSuccess(self, offset, line):
199         """A 'success:' directive has been read."""
200         self.parser.stdOutLineReceived(line)
201
202     def startTest(self, offset, line):
203         """A test start command received."""
204         self.parser.stdOutLineReceived(line)
205
206
207 class _InTest(_ParserState):
208     """State for the subunit parser after reading a test: directive."""
209
210     def addError(self, offset, line):
211         """An 'error:' directive has been read."""
212         if self.parser.current_test_description == line[offset:-1]:
213             self.parser._state = self.parser._outside_test
214             self.parser.state = TestProtocolServer.STATE_OBJECT
215             self.parser.current_test_description = None
216             self.parser.client.addError(self.parser._current_test, RemoteError(""))
217             self.parser.client.stopTest(self.parser._current_test)
218             self.parser._current_test = None
219         elif self.parser.current_test_description + " [" == line[offset:-1]:
220             self.parser.state = TestProtocolServer.READING_ERROR
221             self.parser._message = ""
222         else:
223             self.parser.stdOutLineReceived(line)
224
225     def addExpectedFail(self, offset, line):
226         """An 'xfail:' directive has been read."""
227         if self.parser.current_test_description == line[offset:-1]:
228             self.parser._state = self.parser._outside_test
229             self.parser.state = TestProtocolServer.STATE_OBJECT
230             self.parser.current_test_description = None
231             xfail = getattr(self.parser.client, 'addExpectedFailure', None)
232             if callable(xfail):
233                 xfail(self.parser._current_test, RemoteError())
234             else:
235                 self.parser.client.addSuccess(self.parser._current_test)
236             self.parser.client.stopTest(self.parser._current_test)
237         elif self.parser.current_test_description + " [" == line[offset:-1]:
238             self.parser.state = TestProtocolServer.READING_XFAIL
239             self.parser._message = ""
240         else:
241             self.parser.stdOutLineReceived(line)
242
243     def addFailure(self, offset, line):
244         """A 'failure:' directive has been read."""
245         if self.parser.current_test_description == line[offset:-1]:
246             self.parser._state = self.parser._outside_test
247             self.parser.state = TestProtocolServer.STATE_OBJECT
248             self.parser.current_test_description = None
249             self.parser.client.addFailure(self.parser._current_test, RemoteError())
250             self.parser.client.stopTest(self.parser._current_test)
251         elif self.parser.current_test_description + " [" == line[offset:-1]:
252             self.parser.state = TestProtocolServer.READING_FAILURE
253             self.parser._message = ""
254         else:
255             self.parser.stdOutLineReceived(line)
256
257     def addSkip(self, offset, line):
258         """A 'skip:' directive has been read."""
259         if self.parser.current_test_description == line[offset:-1]:
260             self.parser._state = self.parser._outside_test
261             self.parser.state = TestProtocolServer.STATE_OBJECT
262             self.parser.current_test_description = None
263             self.parser._skip_or_error()
264             self.parser.client.stopTest(self.parser._current_test)
265         elif self.parser.current_test_description + " [" == line[offset:-1]:
266             self.parser.state = TestProtocolServer.READING_SKIP
267             self.parser._message = ""
268         else:
269             self.parser.stdOutLineReceived(line)
270
271     def addSuccess(self, offset, line):
272         """A 'success:' directive has been read."""
273         if self.parser.current_test_description == line[offset:-1]:
274             self.parser._succeedTest()
275         elif self.parser.current_test_description + " [" == line[offset:-1]:
276             self.parser.state = TestProtocolServer.READING_SUCCESS
277             self.parser._message = ""
278         else:
279             self.parser.stdOutLineReceived(line)
280
281     def lostConnection(self):
282         """Connection lost."""
283         self.parser._lostConnectionInTest('')
284
285
286 class _OutSideTest(_ParserState):
287     """State for the subunit parser outside of a test context."""
288
289     def lostConnection(self):
290         """Connection lost."""
291
292     def startTest(self, offset, line):
293         """A test start command received."""
294         self.parser._state = self.parser._in_test
295         self.parser.state = TestProtocolServer.STATE_OBJECT
296         self.parser._current_test = RemotedTestCase(line[offset:-1])
297         self.parser.current_test_description = line[offset:-1]
298         self.parser.client.startTest(self.parser._current_test)
299
300
301 class TestProtocolServer(object):
302     """A parser for subunit.
303     
304     :ivar tags: The current tags associated with the protocol stream.
305     """
306
307     STATE_OBJECT = 0
308     STATE_OBJECTS = [0]
309     READING_FAILURE = 2
310     READING_ERROR = 3
311     READING_SKIP = 4
312     READING_XFAIL = 5
313     READING_SUCCESS = 6
314
315     def __init__(self, client, stream=None):
316         """Create a TestProtocolServer instance.
317
318         :param client: An object meeting the unittest.TestResult protocol.
319         :param stream: The stream that lines received which are not part of the
320             subunit protocol should be written to. This allows custom handling
321             of mixed protocols. By default, sys.stdout will be used for
322             convenience.
323         """
324         self.client = client
325         if stream is None:
326             stream = sys.stdout
327         self._stream = stream
328         # state objects we can switch too
329         self._in_test = _InTest(self)
330         self._outside_test = _OutSideTest(self)
331         # start with outside test.
332         self._state = self._outside_test
333         self.state = TestProtocolServer.STATE_OBJECT
334
335     def _addError(self, offset, line):
336         if self.state in TestProtocolServer.STATE_OBJECTS:
337             self._state.addError(offset, line)
338         else:
339             self.stdOutLineReceived(line)
340
341     def _addExpectedFail(self, offset, line):
342         if self.state in TestProtocolServer.STATE_OBJECTS:
343             self._state.addExpectedFail(offset, line)
344         else:
345             self.stdOutLineReceived(line)
346
347     def _addFailure(self, offset, line):
348         if self.state in TestProtocolServer.STATE_OBJECTS:
349             self._state.addFailure(offset, line)
350         else:
351             self.stdOutLineReceived(line)
352
353     def _addSkip(self, offset, line):
354         if self.state in TestProtocolServer.STATE_OBJECTS:
355             self._state.addSkip(offset, line)
356         else:
357             self.stdOutLineReceived(line)
358
359     def _skip_or_error(self, message=None):
360         """Report the current test as a skip if possible, or else an error."""
361         addSkip = getattr(self.client, 'addSkip', None)
362         if not callable(addSkip):
363             self.client.addError(self._current_test, RemoteError(message))
364         else:
365             if not message:
366                 message = "No reason given"
367             addSkip(self._current_test, message)
368
369     def _addSuccess(self, offset, line):
370         if self.state in TestProtocolServer.STATE_OBJECTS:
371             self._state.addSuccess(offset, line)
372         else:
373             self.stdOutLineReceived(line)
374
375     def _appendMessage(self, line):
376         if line[0:2] == " ]":
377             # quoted ] start
378             self._message += line[1:]
379         else:
380             self._message += line
381
382     def endQuote(self, line):
383         if self.state == TestProtocolServer.READING_FAILURE:
384             self._state = self._outside_test
385             self.state = TestProtocolServer.STATE_OBJECT
386             self.current_test_description = None
387             self.client.addFailure(self._current_test,
388                                    RemoteError(self._message))
389             self.client.stopTest(self._current_test)
390         elif self.state == TestProtocolServer.READING_ERROR:
391             self._state = self._outside_test
392             self.state = TestProtocolServer.STATE_OBJECT
393             self.current_test_description = None
394             self.client.addError(self._current_test,
395                                  RemoteError(self._message))
396             self.client.stopTest(self._current_test)
397         elif self.state == TestProtocolServer.READING_SKIP:
398             self._state = self._outside_test
399             self.state = TestProtocolServer.STATE_OBJECT
400             self.current_test_description = None
401             self._skip_or_error(self._message)
402             self.client.stopTest(self._current_test)
403         elif self.state == TestProtocolServer.READING_XFAIL:
404             self._state = self._outside_test
405             self.state = TestProtocolServer.STATE_OBJECT
406             self.current_test_description = None
407             xfail = getattr(self.client, 'addExpectedFailure', None)
408             if callable(xfail):
409                 xfail(self._current_test, RemoteError(self._message))
410             else:
411                 self.client.addSuccess(self._current_test)
412             self.client.stopTest(self._current_test)
413         elif self.state == TestProtocolServer.READING_SUCCESS:
414             self._succeedTest()
415         else:
416             self.stdOutLineReceived(line)
417
418     def _handleProgress(self, offset, line):
419         """Process a progress directive."""
420         line = line[offset:].strip()
421         if line[0] in '+-':
422             whence = PROGRESS_CUR
423             delta = int(line)
424         elif line == "push":
425             whence = PROGRESS_PUSH
426             delta = None
427         elif line == "pop":
428             whence = PROGRESS_POP
429             delta = None
430         else:
431             whence = PROGRESS_SET
432             delta = int(line)
433         progress_method = getattr(self.client, 'progress', None)
434         if callable(progress_method):
435             progress_method(delta, whence)
436
437     def _handleTags(self, offset, line):
438         """Process a tags command."""
439         tags = line[offset:].split()
440         new_tags, gone_tags = tags_to_new_gone(tags)
441         tags_method = getattr(self.client, 'tags', None)
442         if tags_method is not None:
443             tags_method(new_tags, gone_tags)
444
445     def _handleTime(self, offset, line):
446         # Accept it, but do not do anything with it yet.
447         try:
448             event_time = iso8601.parse_date(line[offset:-1])
449         except TypeError, e:
450             raise TypeError("Failed to parse %r, got %r" % (line, e))
451         time_method = getattr(self.client, 'time', None)
452         if callable(time_method):
453             time_method(event_time)
454
455     def lineReceived(self, line):
456         """Call the appropriate local method for the received line."""
457         if line == "]\n":
458             self.endQuote(line)
459         elif self.state in (TestProtocolServer.READING_FAILURE,
460             TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP,
461             TestProtocolServer.READING_SUCCESS,
462             TestProtocolServer.READING_XFAIL
463             ):
464             self._appendMessage(line)
465         else:
466             parts = line.split(None, 1)
467             if len(parts) == 2:
468                 cmd, rest = parts
469                 offset = len(cmd) + 1
470                 cmd = cmd.strip(':')
471                 if cmd in ('test', 'testing'):
472                     self._startTest(offset, line)
473                 elif cmd == 'error':
474                     self._addError(offset, line)
475                 elif cmd == 'failure':
476                     self._addFailure(offset, line)
477                 elif cmd == 'progress':
478                     self._handleProgress(offset, line)
479                 elif cmd == 'skip':
480                     self._addSkip(offset, line)
481                 elif cmd in ('success', 'successful'):
482                     self._addSuccess(offset, line)
483                 elif cmd in ('tags',):
484                     self._handleTags(offset, line)
485                 elif cmd in ('time',):
486                     self._handleTime(offset, line)
487                 elif cmd == 'xfail':
488                     self._addExpectedFail(offset, line)
489                 else:
490                     self.stdOutLineReceived(line)
491             else:
492                 self.stdOutLineReceived(line)
493
494     def _lostConnectionInTest(self, state_string):
495         error_string = "lost connection during %stest '%s'" % (
496             state_string, self.current_test_description)
497         self.client.addError(self._current_test, RemoteError(error_string))
498         self.client.stopTest(self._current_test)
499
500     def lostConnection(self):
501         """The input connection has finished."""
502         if self.state in TestProtocolServer.STATE_OBJECTS:
503             self._state.lostConnection()
504             return
505         if self.state == TestProtocolServer.READING_ERROR:
506             self._lostConnectionInTest('error report of ')
507         elif self.state == TestProtocolServer.READING_FAILURE:
508             self._lostConnectionInTest('failure report of ')
509         elif self.state == TestProtocolServer.READING_SUCCESS:
510             self._lostConnectionInTest('success report of ')
511         elif self.state == TestProtocolServer.READING_SKIP:
512             self._lostConnectionInTest('skip report of ')
513         elif self.state == TestProtocolServer.READING_XFAIL:
514             self._lostConnectionInTest('xfail report of ')
515         else:
516             self._lostConnectionInTest('unknown state of ')
517
518     def readFrom(self, pipe):
519         for line in pipe.readlines():
520             self.lineReceived(line)
521         self.lostConnection()
522
523     def _startTest(self, offset, line):
524         """Internal call to change state machine. Override startTest()."""
525         if self.state in TestProtocolServer.STATE_OBJECTS:
526             self._state.startTest(offset, line)
527         else:
528             self.stdOutLineReceived(line)
529
530     def stdOutLineReceived(self, line):
531         self._stream.write(line)
532
533     def _succeedTest(self):
534         self.client.addSuccess(self._current_test)
535         self.client.stopTest(self._current_test)
536         self.current_test_description = None
537         self._current_test = None
538         self._state = self._outside_test
539         self.state = TestProtocolServer.STATE_OBJECT
540
541
542 class RemoteException(Exception):
543     """An exception that occured remotely to Python."""
544
545     def __eq__(self, other):
546         try:
547             return self.args == other.args
548         except AttributeError:
549             return False
550
551
552 class TestProtocolClient(unittest.TestResult):
553     """A TestResult which generates a subunit stream for a test run.
554     
555     # Get a TestSuite or TestCase to run
556     suite = make_suite()
557     # Create a stream (any object with a 'write' method)
558     stream = file('tests.log', 'wb')
559     # Create a subunit result object which will output to the stream
560     result = subunit.TestProtocolClient(stream)
561     # Optionally, to get timing data for performance analysis, wrap the
562     # serialiser with a timing decorator
563     result = subunit.test_results.AutoTimingTestResultDecorator(result)
564     # Run the test suite reporting to the subunit result object
565     suite.run(result)
566     # Close the stream.
567     stream.close()
568     """
569
570     def __init__(self, stream):
571         unittest.TestResult.__init__(self)
572         self._stream = stream
573
574     def addError(self, test, error=None, details=None):
575         """Report an error in test test.
576         
577         Only one of error and details should be provided: conceptually there
578         are two separate methods:
579             addError(self, test, error)
580             addError(self, test, details)
581
582         :param error: Standard unittest positional argument form - an
583             exc_info tuple.
584         :param details: New Testing-in-python drafted API; a dict from string
585             to subunit.Content objects.
586         """
587         self._addOutcome("error", test, error=error, details=details)
588
589     def addExpectedFailure(self, test, error=None, details=None):
590         """Report an expected failure in test test.
591         
592         Only one of error and details should be provided: conceptually there
593         are two separate methods:
594             addError(self, test, error)
595             addError(self, test, details)
596
597         :param error: Standard unittest positional argument form - an
598             exc_info tuple.
599         :param details: New Testing-in-python drafted API; a dict from string
600             to subunit.Content objects.
601         """
602         self._addOutcome("xfail", test, error=error, details=details)
603
604     def addFailure(self, test, error=None, details=None):
605         """Report a failure in test test.
606         
607         Only one of error and details should be provided: conceptually there
608         are two separate methods:
609             addFailure(self, test, error)
610             addFailure(self, test, details)
611
612         :param error: Standard unittest positional argument form - an
613             exc_info tuple.
614         :param details: New Testing-in-python drafted API; a dict from string
615             to subunit.Content objects.
616         """
617         self._addOutcome("failure", test, error=error, details=details)
618
619     def _addOutcome(self, outcome, test, error=None, details=None):
620         """Report a failure in test test.
621         
622         Only one of error and details should be provided: conceptually there
623         are two separate methods:
624             addOutcome(self, test, error)
625             addOutcome(self, test, details)
626
627         :param outcome: A string describing the outcome - used as the
628             event name in the subunit stream.
629         :param error: Standard unittest positional argument form - an
630             exc_info tuple.
631         :param details: New Testing-in-python drafted API; a dict from string
632             to subunit.Content objects.
633         """
634         self._stream.write("%s: %s" % (outcome, test.id()))
635         if error is None and details is None:
636             raise ValueError
637         if error is not None:
638             self._stream.write(" [\n")
639             for line in self._exc_info_to_string(error, test).splitlines():
640                 self._stream.write("%s\n" % line)
641         else:
642             self._write_details(details)
643         self._stream.write("]\n")
644
645     def addSkip(self, test, reason=None, details=None):
646         """Report a skipped test."""
647         if reason is None:
648             self._addOutcome("skip", test, error=None, details=details)
649         else:
650             self._stream.write("skip: %s [\n" % test.id())
651             self._stream.write("%s\n" % reason)
652             self._stream.write("]\n")
653
654     def addSuccess(self, test, details=None):
655         """Report a success in a test."""
656         self._stream.write("successful: %s" % test.id())
657         if not details:
658             self._stream.write("\n")
659         else:
660             self._write_details(details)
661             self._stream.write("]\n")
662     addUnexpectedSuccess = addSuccess
663
664     def startTest(self, test):
665         """Mark a test as starting its test run."""
666         self._stream.write("test: %s\n" % test.id())
667
668     def progress(self, offset, whence):
669         """Provide indication about the progress/length of the test run.
670
671         :param offset: Information about the number of tests remaining. If
672             whence is PROGRESS_CUR, then offset increases/decreases the
673             remaining test count. If whence is PROGRESS_SET, then offset
674             specifies exactly the remaining test count.
675         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
676             PROGRESS_POP.
677         """
678         if whence == PROGRESS_CUR and offset > -1:
679             prefix = "+"
680         elif whence == PROGRESS_PUSH:
681             prefix = ""
682             offset = "push"
683         elif whence == PROGRESS_POP:
684             prefix = ""
685             offset = "pop"
686         else:
687             prefix = ""
688         self._stream.write("progress: %s%s\n" % (prefix, offset))
689
690     def time(self, a_datetime):
691         """Inform the client of the time.
692
693         ":param datetime: A datetime.datetime object.
694         """
695         time = a_datetime.astimezone(iso8601.Utc())
696         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
697             time.year, time.month, time.day, time.hour, time.minute,
698             time.second, time.microsecond))
699
700     def _write_details(self, details):
701         """Output details to the stream.
702
703         :param details: An extended details dict for a test outcome.
704         """
705         self._stream.write(" [ multipart\n")
706         for name, content in sorted(details.iteritems()):
707             self._stream.write("Content-Type: %s/%s" %
708                 (content.content_type.type, content.content_type.subtype))
709             parameters = content.content_type.parameters
710             if parameters:
711                 self._stream.write(";")
712                 param_strs = []
713                 for param, value in parameters.iteritems():
714                     param_strs.append("%s=%s" % (param, value))
715                 self._stream.write(",".join(param_strs))
716             self._stream.write("\n%s\n" % name)
717             encoder = chunked.Encoder(self._stream)
718             map(encoder.write, content.iter_bytes())
719             encoder.close()
720
721     def done(self):
722         """Obey the testtools result.done() interface."""
723
724
725 def RemoteError(description=""):
726     if description == "":
727         description = "\n"
728     return (RemoteException, RemoteException(description), None)
729
730
731 class RemotedTestCase(unittest.TestCase):
732     """A class to represent test cases run in child processes.
733     
734     Instances of this class are used to provide the Python test API a TestCase
735     that can be printed to the screen, introspected for metadata and so on.
736     However, as they are a simply a memoisation of a test that was actually
737     run in the past by a separate process, they cannot perform any interactive
738     actions.
739     """
740
741     def __eq__ (self, other):
742         try:
743             return self.__description == other.__description
744         except AttributeError:
745             return False
746
747     def __init__(self, description):
748         """Create a psuedo test case with description description."""
749         self.__description = description
750
751     def error(self, label):
752         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
753             label)
754
755     def setUp(self):
756         self.error("setUp")
757
758     def tearDown(self):
759         self.error("tearDown")
760
761     def shortDescription(self):
762         return self.__description
763
764     def id(self):
765         return "%s" % (self.__description,)
766
767     def __str__(self):
768         return "%s (%s)" % (self.__description, self._strclass())
769
770     def __repr__(self):
771         return "<%s description='%s'>" % \
772                (self._strclass(), self.__description)
773
774     def run(self, result=None):
775         if result is None: result = self.defaultTestResult()
776         result.startTest(self)
777         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
778         result.stopTest(self)
779
780     def _strclass(self):
781         cls = self.__class__
782         return "%s.%s" % (cls.__module__, cls.__name__)
783
784
785 class ExecTestCase(unittest.TestCase):
786     """A test case which runs external scripts for test fixtures."""
787
788     def __init__(self, methodName='runTest'):
789         """Create an instance of the class that will use the named test
790            method when executed. Raises a ValueError if the instance does
791            not have a method with the specified name.
792         """
793         unittest.TestCase.__init__(self, methodName)
794         testMethod = getattr(self, methodName)
795         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
796                                testMethod.__doc__)
797
798     def countTestCases(self):
799         return 1
800
801     def run(self, result=None):
802         if result is None: result = self.defaultTestResult()
803         self._run(result)
804
805     def debug(self):
806         """Run the test without collecting errors in a TestResult"""
807         self._run(unittest.TestResult())
808
809     def _run(self, result):
810         protocol = TestProtocolServer(result)
811         output = subprocess.Popen(self.script, shell=True,
812             stdout=subprocess.PIPE).communicate()[0]
813         protocol.readFrom(StringIO(output))
814
815
816 class IsolatedTestCase(unittest.TestCase):
817     """A TestCase which executes in a forked process.
818     
819     Each test gets its own process, which has a performance overhead but will
820     provide excellent isolation from global state (such as django configs,
821     zope utilities and so on).
822     """
823
824     def run(self, result=None):
825         if result is None: result = self.defaultTestResult()
826         run_isolated(unittest.TestCase, self, result)
827
828
829 class IsolatedTestSuite(unittest.TestSuite):
830     """A TestSuite which runs its tests in a forked process.
831     
832     This decorator that will fork() before running the tests and report the
833     results from the child process using a Subunit stream.  This is useful for
834     handling tests that mutate global state, or are testing C extensions that
835     could crash the VM.
836     """
837
838     def run(self, result=None):
839         if result is None: result = unittest.TestResult()
840         run_isolated(unittest.TestSuite, self, result)
841
842
843 def run_isolated(klass, self, result):
844     """Run a test suite or case in a subprocess, using the run method on klass.
845     """
846     c2pread, c2pwrite = os.pipe()
847     # fixme - error -> result
848     # now fork
849     pid = os.fork()
850     if pid == 0:
851         # Child
852         # Close parent's pipe ends
853         os.close(c2pread)
854         # Dup fds for child
855         os.dup2(c2pwrite, 1)
856         # Close pipe fds.
857         os.close(c2pwrite)
858
859         # at this point, sys.stdin is redirected, now we want
860         # to filter it to escape ]'s.
861         ### XXX: test and write that bit.
862
863         result = TestProtocolClient(sys.stdout)
864         klass.run(self, result)
865         sys.stdout.flush()
866         sys.stderr.flush()
867         # exit HARD, exit NOW.
868         os._exit(0)
869     else:
870         # Parent
871         # Close child pipe ends
872         os.close(c2pwrite)
873         # hookup a protocol engine
874         protocol = TestProtocolServer(result)
875         protocol.readFrom(os.fdopen(c2pread, 'rU'))
876         os.waitpid(pid, 0)
877         # TODO return code evaluation.
878     return result
879
880
881 def TAP2SubUnit(tap, subunit):
882     """Filter a TAP pipe into a subunit pipe.
883     
884     :param tap: A tap pipe/stream/file object.
885     :param subunit: A pipe/stream/file object to write subunit results to.
886     :return: The exit code to exit with.
887     """
888     BEFORE_PLAN = 0
889     AFTER_PLAN = 1
890     SKIP_STREAM = 2
891     client = TestProtocolClient(subunit)
892     state = BEFORE_PLAN
893     plan_start = 1
894     plan_stop = 0
895     def _skipped_test(subunit, plan_start):
896         # Some tests were skipped.
897         subunit.write('test test %d\n' % plan_start)
898         subunit.write('error test %d [\n' % plan_start)
899         subunit.write('test missing from TAP output\n')
900         subunit.write(']\n')
901         return plan_start + 1
902     # Test data for the next test to emit
903     test_name = None
904     log = []
905     result = None
906     def _emit_test():
907         "write out a test"
908         if test_name is None:
909             return
910         subunit.write("test %s\n" % test_name)
911         if not log:
912             subunit.write("%s %s\n" % (result, test_name))
913         else:
914             subunit.write("%s %s [\n" % (result, test_name))
915         if log:
916             for line in log:
917                 subunit.write("%s\n" % line)
918             subunit.write("]\n")
919         del log[:]
920     for line in tap:
921         if state == BEFORE_PLAN:
922             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
923             if match:
924                 state = AFTER_PLAN
925                 _, plan_stop, comment = match.groups()
926                 plan_stop = int(plan_stop)
927                 if plan_start > plan_stop and plan_stop == 0:
928                     # skipped file
929                     state = SKIP_STREAM
930                     subunit.write("test file skip\n")
931                     subunit.write("skip file skip [\n")
932                     subunit.write("%s\n" % comment)
933                     subunit.write("]\n")
934                 continue
935         # not a plan line, or have seen one before
936         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
937         if match:
938             # new test, emit current one.
939             _emit_test()
940             status, number, description, directive, directive_comment = match.groups()
941             if status == 'ok':
942                 result = 'success'
943             else:
944                 result = "failure"
945             if description is None:
946                 description = ''
947             else:
948                 description = ' ' + description
949             if directive is not None:
950                 if directive == 'TODO':
951                     result = 'xfail'
952                 elif directive == 'SKIP':
953                     result = 'skip'
954                 if directive_comment is not None:
955                     log.append(directive_comment)
956             if number is not None:
957                 number = int(number)
958                 while plan_start < number:
959                     plan_start = _skipped_test(subunit, plan_start)
960             test_name = "test %d%s" % (plan_start, description)
961             plan_start += 1
962             continue
963         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
964         if match:
965             reason, = match.groups()
966             if reason is None:
967                 extra = ''
968             else:
969                 extra = ' %s' % reason
970             _emit_test()
971             test_name = "Bail out!%s" % extra
972             result = "error"
973             state = SKIP_STREAM
974             continue
975         match = re.match("\#.*\n", line)
976         if match:
977             log.append(line[:-1])
978             continue
979         subunit.write(line)
980     _emit_test()
981     while plan_start <= plan_stop:
982         # record missed tests
983         plan_start = _skipped_test(subunit, plan_start)
984     return 0
985
986
987 def tag_stream(original, filtered, tags):
988     """Alter tags on a stream.
989
990     :param original: The input stream.
991     :param filtered: The output stream.
992     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
993         '-TAG' commands.
994
995         A 'TAG' command will add the tag to the output stream,
996         and override any existing '-TAG' command in that stream.
997         Specifically:
998          * A global 'tags: TAG' will be added to the start of the stream.
999          * Any tags commands with -TAG will have the -TAG removed.
1000
1001         A '-TAG' command will remove the TAG command from the stream.
1002         Specifically:
1003          * A 'tags: -TAG' command will be added to the start of the stream.
1004          * Any 'tags: TAG' command will have 'TAG' removed from it.
1005         Additionally, any redundant tagging commands (adding a tag globally
1006         present, or removing a tag globally removed) are stripped as a
1007         by-product of the filtering.
1008     :return: 0
1009     """
1010     new_tags, gone_tags = tags_to_new_gone(tags)
1011     def write_tags(new_tags, gone_tags):
1012         if new_tags or gone_tags:
1013             filtered.write("tags: " + ' '.join(new_tags))
1014             if gone_tags:
1015                 for tag in gone_tags:
1016                     filtered.write("-" + tag)
1017             filtered.write("\n")
1018     write_tags(new_tags, gone_tags)
1019     # TODO: use the protocol parser and thus don't mangle test comments.
1020     for line in original:
1021         if line.startswith("tags:"):
1022             line_tags = line[5:].split()
1023             line_new, line_gone = tags_to_new_gone(line_tags)
1024             line_new = line_new - gone_tags
1025             line_gone = line_gone - new_tags
1026             write_tags(line_new, line_gone)
1027         else:
1028             filtered.write(line)
1029     return 0
1030
1031
1032 class ProtocolTestCase(object):
1033     """Subunit wire protocol to unittest.TestCase adapter.
1034
1035     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1036     calling a ProtocolTestCase or invoking the run() method will make a 'test
1037     run' happen. The 'test run' will simply be a replay of the test activity
1038     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1039     and ``countTestCases`` methods are not supported because there isn't a
1040     sensible mapping for those methods.
1041     
1042     # Get a stream (any object with a readline() method), in this case the
1043     # stream output by the example from ``subunit.TestProtocolClient``.
1044     stream = file('tests.log', 'rb')
1045     # Create a parser which will read from the stream and emit 
1046     # activity to a unittest.TestResult when run() is called.
1047     suite = subunit.ProtocolTestCase(stream)
1048     # Create a result object to accept the contents of that stream.
1049     result = unittest._TextTestResult(sys.stdout)
1050     # 'run' the tests - process the stream and feed its contents to result.
1051     suite.run(result)
1052     stream.close()
1053
1054     :seealso: TestProtocolServer (the subunit wire protocol parser).
1055     """
1056
1057     def __init__(self, stream, passthrough=None):
1058         """Create a ProtocolTestCase reading from stream.
1059
1060         :param stream: A filelike object which a subunit stream can be read
1061             from.
1062         :param passthrough: A stream pass non subunit input on to. If not
1063             supplied, the TestProtocolServer default is used.
1064         """
1065         self._stream = stream
1066         self._passthrough = passthrough
1067
1068     def __call__(self, result=None):
1069         return self.run(result)
1070
1071     def run(self, result=None):
1072         if result is None:
1073             result = self.defaultTestResult()
1074         protocol = TestProtocolServer(result, self._passthrough)
1075         line = self._stream.readline()
1076         while line:
1077             protocol.lineReceived(line)
1078             line = self._stream.readline()
1079         protocol.lostConnection()
1080
1081
1082 class TestResultStats(unittest.TestResult):
1083     """A pyunit TestResult interface implementation for making statistics.
1084     
1085     :ivar total_tests: The total tests seen.
1086     :ivar passed_tests: The tests that passed.
1087     :ivar failed_tests: The tests that failed.
1088     :ivar seen_tags: The tags seen across all tests.
1089     """
1090
1091     def __init__(self, stream):
1092         """Create a TestResultStats which outputs to stream."""
1093         unittest.TestResult.__init__(self)
1094         self._stream = stream
1095         self.failed_tests = 0
1096         self.skipped_tests = 0
1097         self.seen_tags = set()
1098
1099     @property
1100     def total_tests(self):
1101         return self.testsRun
1102
1103     def addError(self, test, err):
1104         self.failed_tests += 1
1105
1106     def addFailure(self, test, err):
1107         self.failed_tests += 1
1108
1109     def addSkip(self, test, reason):
1110         self.skipped_tests += 1
1111
1112     def formatStats(self):
1113         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1114         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1115         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1116         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1117         tags = sorted(self.seen_tags)
1118         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1119
1120     @property
1121     def passed_tests(self):
1122         return self.total_tests - self.failed_tests - self.skipped_tests
1123
1124     def tags(self, new_tags, gone_tags):
1125         """Accumulate the seen tags."""
1126         self.seen_tags.update(new_tags)
1127
1128     def wasSuccessful(self):
1129         """Tells whether or not this result was a success"""
1130         return self.failed_tests == 0
1131
1132
1133 class TestResultFilter(unittest.TestResult):
1134     """A pyunit TestResult interface implementation which filters tests.
1135
1136     Tests that pass the filter are handed on to another TestResult instance
1137     for further processing/reporting. To obtain the filtered results, 
1138     the other instance must be interrogated.
1139
1140     :ivar result: The result that tests are passed to after filtering.
1141     :ivar filter_predicate: The callback run to decide whether to pass 
1142         a result.
1143     """
1144
1145     def __init__(self, result, filter_error=False, filter_failure=False,
1146         filter_success=True, filter_skip=False,
1147         filter_predicate=None):
1148         """Create a FilterResult object filtering to result.
1149         
1150         :param filter_error: Filter out errors.
1151         :param filter_failure: Filter out failures.
1152         :param filter_success: Filter out successful tests.
1153         :param filter_skip: Filter out skipped tests.
1154         :param filter_predicate: A callable taking (test, err) and 
1155             returning True if the result should be passed through.
1156             err is None for success.
1157         """
1158         unittest.TestResult.__init__(self)
1159         self.result = result
1160         self._filter_error = filter_error
1161         self._filter_failure = filter_failure
1162         self._filter_success = filter_success
1163         self._filter_skip = filter_skip
1164         if filter_predicate is None:
1165             filter_predicate = lambda test, err: True
1166         self.filter_predicate = filter_predicate
1167         # The current test (for filtering tags)
1168         self._current_test = None
1169         # Has the current test been filtered (for outputting test tags)
1170         self._current_test_filtered = None
1171         # The (new, gone) tags for the current test.
1172         self._current_test_tags = None
1173         
1174     def addError(self, test, err):
1175         if not self._filter_error and self.filter_predicate(test, err):
1176             self.result.startTest(test)
1177             self.result.addError(test, err)
1178
1179     def addFailure(self, test, err):
1180         if not self._filter_failure and self.filter_predicate(test, err):
1181             self.result.startTest(test)
1182             self.result.addFailure(test, err)
1183
1184     def addSkip(self, test, reason):
1185         if not self._filter_skip and self.filter_predicate(test, reason):
1186             self.result.startTest(test)
1187             # This is duplicated, it would be nice to have on a 'calls
1188             # TestResults' mixin perhaps.
1189             addSkip = getattr(self.result, 'addSkip', None)
1190             if not callable(addSkip):
1191                 self.result.addError(test, RemoteError(reason))
1192             else:
1193                 self.result.addSkip(test, reason)
1194
1195     def addSuccess(self, test):
1196         if not self._filter_success and self.filter_predicate(test, None):
1197             self.result.startTest(test)
1198             self.result.addSuccess(test)
1199
1200     def startTest(self, test):
1201         """Start a test.
1202         
1203         Not directly passed to the client, but used for handling of tags
1204         correctly.
1205         """
1206         self._current_test = test
1207         self._current_test_filtered = False
1208         self._current_test_tags = set(), set()
1209     
1210     def stopTest(self, test):
1211         """Stop a test.
1212         
1213         Not directly passed to the client, but used for handling of tags
1214         correctly.
1215         """
1216         if not self._current_test_filtered:
1217             # Tags to output for this test.
1218             if self._current_test_tags[0] or self._current_test_tags[1]:
1219                 tags_method = getattr(self.result, 'tags', None)
1220                 if callable(tags_method):
1221                     self.result.tags(*self._current_test_tags)
1222             self.result.stopTest(test)
1223         self._current_test = None
1224         self._current_test_filtered = None
1225         self._current_test_tags = None
1226
1227     def tags(self, new_tags, gone_tags):
1228         """Handle tag instructions.
1229
1230         Adds and removes tags as appropriate. If a test is currently running,
1231         tags are not affected for subsequent tests.
1232         
1233         :param new_tags: Tags to add,
1234         :param gone_tags: Tags to remove.
1235         """
1236         if self._current_test is not None:
1237             # gather the tags until the test stops.
1238             self._current_test_tags[0].update(new_tags)
1239             self._current_test_tags[0].difference_update(gone_tags)
1240             self._current_test_tags[1].update(gone_tags)
1241             self._current_test_tags[1].difference_update(new_tags)
1242         tags_method = getattr(self.result, 'tags', None)
1243         if tags_method is None:
1244             return
1245         return tags_method(new_tags, gone_tags)
1246
1247     def id_to_orig_id(self, id):
1248         if id.startswith("subunit.RemotedTestCase."):
1249             return id[len("subunit.RemotedTestCase."):]
1250         return id