Remove unnecessary python path updates for bundled subunit/testtools.
[samba.git] / lib / subunit / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ++++++++
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
58
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
64
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
69
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
74
75
76 Python test support
77 -------------------
78
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
81
82   $ python -m subunit.run mylib.tests.test_suite
83
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
86
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
89
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
93
94  class AggregateTests(subunit.ExecTestCase):
95
96      def test_script_one(self):
97          './bin/script_one'
98
99      def test_script_two(self):
100          './bin/script_two'
101
102  # Normally your normal test loading would take of this automatically,
103  # It is only spelt out in detail here for clarity.
104  suite = unittest.TestSuite([AggregateTests("test_script_one"),
105      AggregateTests("test_script_two")])
106  # Create any TestResult class you like.
107  result = unittest._TextTestResult(sys.stdout)
108  # And run your suite as normal, Subunit will exec each external script as
109  # needed and report to your result object.
110  suite.run(result)
111
112 Utility modules
113 ---------------
114
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
117 """
118
119 import os
120 import re
121 import subprocess
122 import sys
123 import unittest
124 if sys.version_info > (3, 0):
125     from io import UnsupportedOperation as _UnsupportedOperation
126 else:
127     _UnsupportedOperation = AttributeError
128
129
130 from testtools import content, content_type, ExtendedToOriginalDecorator
131 from testtools.content import TracebackContent
132 from testtools.compat import _b, _u, BytesIO, StringIO
133 try:
134     from testtools.testresult.real import _StringException
135     RemoteException = _StringException
136     # For testing: different pythons have different str() implementations.
137     if sys.version_info > (3, 0):
138         _remote_exception_str = "testtools.testresult.real._StringException"
139         _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
140     else:
141         _remote_exception_str = "_StringException" 
142         _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
143 except ImportError:
144     raise ImportError ("testtools.testresult.real does not contain "
145         "_StringException, check your version.")
146 from testtools import testresult
147
148 from subunit import chunked, details, iso8601, test_results
149
150 # same format as sys.version_info: "A tuple containing the five components of
151 # the version number: major, minor, micro, releaselevel, and serial. All
152 # values except releaselevel are integers; the release level is 'alpha',
153 # 'beta', 'candidate', or 'final'. The version_info value corresponding to the
154 # Python version 2.0 is (2, 0, 0, 'final', 0)."  Additionally we use a
155 # releaselevel of 'dev' for unreleased under-development code.
156 #
157 # If the releaselevel is 'alpha' then the major/minor/micro components are not
158 # established at this point, and setup.py will use a version of next-$(revno).
159 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
160 # Otherwise it is major.minor.micro~$(revno).
161
162 __version__ = (0, 0, 9, 'final', 0)
163
164 PROGRESS_SET = 0
165 PROGRESS_CUR = 1
166 PROGRESS_PUSH = 2
167 PROGRESS_POP = 3
168
169
170 def test_suite():
171     import subunit.tests
172     return subunit.tests.test_suite()
173
174
175 def join_dir(base_path, path):
176     """
177     Returns an absolute path to C{path}, calculated relative to the parent
178     of C{base_path}.
179
180     @param base_path: A path to a file or directory.
181     @param path: An absolute path, or a path relative to the containing
182     directory of C{base_path}.
183
184     @return: An absolute path to C{path}.
185     """
186     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
187
188
189 def tags_to_new_gone(tags):
190     """Split a list of tags into a new_set and a gone_set."""
191     new_tags = set()
192     gone_tags = set()
193     for tag in tags:
194         if tag[0] == '-':
195             gone_tags.add(tag[1:])
196         else:
197             new_tags.add(tag)
198     return new_tags, gone_tags
199
200
201 class DiscardStream(object):
202     """A filelike object which discards what is written to it."""
203
204     def fileno(self):
205         raise _UnsupportedOperation()
206
207     def write(self, bytes):
208         pass
209
210     def read(self, len=0):
211         return _b('')
212
213
214 class _ParserState(object):
215     """State for the subunit parser."""
216
217     def __init__(self, parser):
218         self.parser = parser
219         self._test_sym = (_b('test'), _b('testing'))
220         self._colon_sym = _b(':')
221         self._error_sym = (_b('error'),)
222         self._failure_sym = (_b('failure'),)
223         self._progress_sym = (_b('progress'),)
224         self._skip_sym = _b('skip')
225         self._success_sym = (_b('success'), _b('successful'))
226         self._tags_sym = (_b('tags'),)
227         self._time_sym = (_b('time'),)
228         self._xfail_sym = (_b('xfail'),)
229         self._uxsuccess_sym = (_b('uxsuccess'),)
230         self._start_simple = _u(" [")
231         self._start_multipart = _u(" [ multipart")
232
233     def addError(self, offset, line):
234         """An 'error:' directive has been read."""
235         self.parser.stdOutLineReceived(line)
236
237     def addExpectedFail(self, offset, line):
238         """An 'xfail:' directive has been read."""
239         self.parser.stdOutLineReceived(line)
240
241     def addFailure(self, offset, line):
242         """A 'failure:' directive has been read."""
243         self.parser.stdOutLineReceived(line)
244
245     def addSkip(self, offset, line):
246         """A 'skip:' directive has been read."""
247         self.parser.stdOutLineReceived(line)
248
249     def addSuccess(self, offset, line):
250         """A 'success:' directive has been read."""
251         self.parser.stdOutLineReceived(line)
252
253     def lineReceived(self, line):
254         """a line has been received."""
255         parts = line.split(None, 1)
256         if len(parts) == 2 and line.startswith(parts[0]):
257             cmd, rest = parts
258             offset = len(cmd) + 1
259             cmd = cmd.rstrip(self._colon_sym)
260             if cmd in self._test_sym:
261                 self.startTest(offset, line)
262             elif cmd in self._error_sym:
263                 self.addError(offset, line)
264             elif cmd in self._failure_sym:
265                 self.addFailure(offset, line)
266             elif cmd in self._progress_sym:
267                 self.parser._handleProgress(offset, line)
268             elif cmd in self._skip_sym:
269                 self.addSkip(offset, line)
270             elif cmd in self._success_sym:
271                 self.addSuccess(offset, line)
272             elif cmd in self._tags_sym:
273                 self.parser._handleTags(offset, line)
274                 self.parser.subunitLineReceived(line)
275             elif cmd in self._time_sym:
276                 self.parser._handleTime(offset, line)
277                 self.parser.subunitLineReceived(line)
278             elif cmd in self._xfail_sym:
279                 self.addExpectedFail(offset, line)
280             elif cmd in self._uxsuccess_sym:
281                 self.addUnexpectedSuccess(offset, line)
282             else:
283                 self.parser.stdOutLineReceived(line)
284         else:
285             self.parser.stdOutLineReceived(line)
286
287     def lostConnection(self):
288         """Connection lost."""
289         self.parser._lostConnectionInTest(_u('unknown state of '))
290
291     def startTest(self, offset, line):
292         """A test start command received."""
293         self.parser.stdOutLineReceived(line)
294
295
296 class _InTest(_ParserState):
297     """State for the subunit parser after reading a test: directive."""
298
299     def _outcome(self, offset, line, no_details, details_state):
300         """An outcome directive has been read.
301
302         :param no_details: Callable to call when no details are presented.
303         :param details_state: The state to switch to for details
304             processing of this outcome.
305         """
306         test_name = line[offset:-1].decode('utf8')
307         if self.parser.current_test_description == test_name:
308             self.parser._state = self.parser._outside_test
309             self.parser.current_test_description = None
310             no_details()
311             self.parser.client.stopTest(self.parser._current_test)
312             self.parser._current_test = None
313             self.parser.subunitLineReceived(line)
314         elif self.parser.current_test_description + self._start_simple == \
315             test_name:
316             self.parser._state = details_state
317             details_state.set_simple()
318             self.parser.subunitLineReceived(line)
319         elif self.parser.current_test_description + self._start_multipart == \
320             test_name:
321             self.parser._state = details_state
322             details_state.set_multipart()
323             self.parser.subunitLineReceived(line)
324         else:
325             self.parser.stdOutLineReceived(line)
326
327     def _error(self):
328         self.parser.client.addError(self.parser._current_test,
329             details={})
330
331     def addError(self, offset, line):
332         """An 'error:' directive has been read."""
333         self._outcome(offset, line, self._error,
334             self.parser._reading_error_details)
335
336     def _xfail(self):
337         self.parser.client.addExpectedFailure(self.parser._current_test,
338             details={})
339
340     def addExpectedFail(self, offset, line):
341         """An 'xfail:' directive has been read."""
342         self._outcome(offset, line, self._xfail,
343             self.parser._reading_xfail_details)
344
345     def _uxsuccess(self):
346         self.parser.client.addUnexpectedSuccess(self.parser._current_test)
347
348     def addUnexpectedSuccess(self, offset, line):
349         """A 'uxsuccess:' directive has been read."""
350         self._outcome(offset, line, self._uxsuccess,
351             self.parser._reading_uxsuccess_details)
352
353     def _failure(self):
354         self.parser.client.addFailure(self.parser._current_test, details={})
355
356     def addFailure(self, offset, line):
357         """A 'failure:' directive has been read."""
358         self._outcome(offset, line, self._failure,
359             self.parser._reading_failure_details)
360
361     def _skip(self):
362         self.parser.client.addSkip(self.parser._current_test, details={})
363
364     def addSkip(self, offset, line):
365         """A 'skip:' directive has been read."""
366         self._outcome(offset, line, self._skip,
367             self.parser._reading_skip_details)
368
369     def _succeed(self):
370         self.parser.client.addSuccess(self.parser._current_test, details={})
371
372     def addSuccess(self, offset, line):
373         """A 'success:' directive has been read."""
374         self._outcome(offset, line, self._succeed,
375             self.parser._reading_success_details)
376
377     def lostConnection(self):
378         """Connection lost."""
379         self.parser._lostConnectionInTest(_u(''))
380
381
382 class _OutSideTest(_ParserState):
383     """State for the subunit parser outside of a test context."""
384
385     def lostConnection(self):
386         """Connection lost."""
387
388     def startTest(self, offset, line):
389         """A test start command received."""
390         self.parser._state = self.parser._in_test
391         test_name = line[offset:-1].decode('utf8')
392         self.parser._current_test = RemotedTestCase(test_name)
393         self.parser.current_test_description = test_name
394         self.parser.client.startTest(self.parser._current_test)
395         self.parser.subunitLineReceived(line)
396
397
398 class _ReadingDetails(_ParserState):
399     """Common logic for readin state details."""
400
401     def endDetails(self):
402         """The end of a details section has been reached."""
403         self.parser._state = self.parser._outside_test
404         self.parser.current_test_description = None
405         self._report_outcome()
406         self.parser.client.stopTest(self.parser._current_test)
407
408     def lineReceived(self, line):
409         """a line has been received."""
410         self.details_parser.lineReceived(line)
411         self.parser.subunitLineReceived(line)
412
413     def lostConnection(self):
414         """Connection lost."""
415         self.parser._lostConnectionInTest(_u('%s report of ') %
416             self._outcome_label())
417
418     def _outcome_label(self):
419         """The label to describe this outcome."""
420         raise NotImplementedError(self._outcome_label)
421
422     def set_simple(self):
423         """Start a simple details parser."""
424         self.details_parser = details.SimpleDetailsParser(self)
425
426     def set_multipart(self):
427         """Start a multipart details parser."""
428         self.details_parser = details.MultipartDetailsParser(self)
429
430
431 class _ReadingFailureDetails(_ReadingDetails):
432     """State for the subunit parser when reading failure details."""
433
434     def _report_outcome(self):
435         self.parser.client.addFailure(self.parser._current_test,
436             details=self.details_parser.get_details())
437
438     def _outcome_label(self):
439         return "failure"
440
441
442 class _ReadingErrorDetails(_ReadingDetails):
443     """State for the subunit parser when reading error details."""
444
445     def _report_outcome(self):
446         self.parser.client.addError(self.parser._current_test,
447             details=self.details_parser.get_details())
448
449     def _outcome_label(self):
450         return "error"
451
452
453 class _ReadingExpectedFailureDetails(_ReadingDetails):
454     """State for the subunit parser when reading xfail details."""
455
456     def _report_outcome(self):
457         self.parser.client.addExpectedFailure(self.parser._current_test,
458             details=self.details_parser.get_details())
459
460     def _outcome_label(self):
461         return "xfail"
462
463
464 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
465     """State for the subunit parser when reading uxsuccess details."""
466
467     def _report_outcome(self):
468         self.parser.client.addUnexpectedSuccess(self.parser._current_test,
469             details=self.details_parser.get_details())
470
471     def _outcome_label(self):
472         return "uxsuccess"
473
474
475 class _ReadingSkipDetails(_ReadingDetails):
476     """State for the subunit parser when reading skip details."""
477
478     def _report_outcome(self):
479         self.parser.client.addSkip(self.parser._current_test,
480             details=self.details_parser.get_details("skip"))
481
482     def _outcome_label(self):
483         return "skip"
484
485
486 class _ReadingSuccessDetails(_ReadingDetails):
487     """State for the subunit parser when reading success details."""
488
489     def _report_outcome(self):
490         self.parser.client.addSuccess(self.parser._current_test,
491             details=self.details_parser.get_details("success"))
492
493     def _outcome_label(self):
494         return "success"
495
496
497 class TestProtocolServer(object):
498     """A parser for subunit.
499
500     :ivar tags: The current tags associated with the protocol stream.
501     """
502
503     def __init__(self, client, stream=None, forward_stream=None):
504         """Create a TestProtocolServer instance.
505
506         :param client: An object meeting the unittest.TestResult protocol.
507         :param stream: The stream that lines received which are not part of the
508             subunit protocol should be written to. This allows custom handling
509             of mixed protocols. By default, sys.stdout will be used for
510             convenience. It should accept bytes to its write() method.
511         :param forward_stream: A stream to forward subunit lines to. This
512             allows a filter to forward the entire stream while still parsing
513             and acting on it. By default forward_stream is set to
514             DiscardStream() and no forwarding happens.
515         """
516         self.client = ExtendedToOriginalDecorator(client)
517         if stream is None:
518             stream = sys.stdout
519             if sys.version_info > (3, 0):
520                 stream = stream.buffer
521         self._stream = stream
522         self._forward_stream = forward_stream or DiscardStream()
523         # state objects we can switch too
524         self._in_test = _InTest(self)
525         self._outside_test = _OutSideTest(self)
526         self._reading_error_details = _ReadingErrorDetails(self)
527         self._reading_failure_details = _ReadingFailureDetails(self)
528         self._reading_skip_details = _ReadingSkipDetails(self)
529         self._reading_success_details = _ReadingSuccessDetails(self)
530         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
531         self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
532         # start with outside test.
533         self._state = self._outside_test
534         # Avoid casts on every call
535         self._plusminus = _b('+-')
536         self._push_sym = _b('push')
537         self._pop_sym = _b('pop')
538
539     def _handleProgress(self, offset, line):
540         """Process a progress directive."""
541         line = line[offset:].strip()
542         if line[0] in self._plusminus:
543             whence = PROGRESS_CUR
544             delta = int(line)
545         elif line == self._push_sym:
546             whence = PROGRESS_PUSH
547             delta = None
548         elif line == self._pop_sym:
549             whence = PROGRESS_POP
550             delta = None
551         else:
552             whence = PROGRESS_SET
553             delta = int(line)
554         self.client.progress(delta, whence)
555
556     def _handleTags(self, offset, line):
557         """Process a tags command."""
558         tags = line[offset:].decode('utf8').split()
559         new_tags, gone_tags = tags_to_new_gone(tags)
560         self.client.tags(new_tags, gone_tags)
561
562     def _handleTime(self, offset, line):
563         # Accept it, but do not do anything with it yet.
564         try:
565             event_time = iso8601.parse_date(line[offset:-1])
566         except TypeError:
567             raise TypeError(_u("Failed to parse %r, got %r")
568                 % (line, sys.exec_info[1]))
569         self.client.time(event_time)
570
571     def lineReceived(self, line):
572         """Call the appropriate local method for the received line."""
573         self._state.lineReceived(line)
574
575     def _lostConnectionInTest(self, state_string):
576         error_string = _u("lost connection during %stest '%s'") % (
577             state_string, self.current_test_description)
578         self.client.addError(self._current_test, RemoteError(error_string))
579         self.client.stopTest(self._current_test)
580
581     def lostConnection(self):
582         """The input connection has finished."""
583         self._state.lostConnection()
584
585     def readFrom(self, pipe):
586         """Blocking convenience API to parse an entire stream.
587
588         :param pipe: A file-like object supporting readlines().
589         :return: None.
590         """
591         for line in pipe.readlines():
592             self.lineReceived(line)
593         self.lostConnection()
594
595     def _startTest(self, offset, line):
596         """Internal call to change state machine. Override startTest()."""
597         self._state.startTest(offset, line)
598
599     def subunitLineReceived(self, line):
600         self._forward_stream.write(line)
601
602     def stdOutLineReceived(self, line):
603         self._stream.write(line)
604
605
606 class TestProtocolClient(testresult.TestResult):
607     """A TestResult which generates a subunit stream for a test run.
608
609     # Get a TestSuite or TestCase to run
610     suite = make_suite()
611     # Create a stream (any object with a 'write' method). This should accept
612     # bytes not strings: subunit is a byte orientated protocol.
613     stream = file('tests.log', 'wb')
614     # Create a subunit result object which will output to the stream
615     result = subunit.TestProtocolClient(stream)
616     # Optionally, to get timing data for performance analysis, wrap the
617     # serialiser with a timing decorator
618     result = subunit.test_results.AutoTimingTestResultDecorator(result)
619     # Run the test suite reporting to the subunit result object
620     suite.run(result)
621     # Close the stream.
622     stream.close()
623     """
624
625     def __init__(self, stream):
626         testresult.TestResult.__init__(self)
627         stream = _make_stream_binary(stream)
628         self._stream = stream
629         self._progress_fmt = _b("progress: ")
630         self._bytes_eol = _b("\n")
631         self._progress_plus = _b("+")
632         self._progress_push = _b("push")
633         self._progress_pop = _b("pop")
634         self._empty_bytes = _b("")
635         self._start_simple = _b(" [\n")
636         self._end_simple = _b("]\n")
637
638     def addError(self, test, error=None, details=None):
639         """Report an error in test test.
640
641         Only one of error and details should be provided: conceptually there
642         are two separate methods:
643             addError(self, test, error)
644             addError(self, test, details)
645
646         :param error: Standard unittest positional argument form - an
647             exc_info tuple.
648         :param details: New Testing-in-python drafted API; a dict from string
649             to subunit.Content objects.
650         """
651         self._addOutcome("error", test, error=error, details=details)
652         if self.failfast:
653             self.stop()
654
655     def addExpectedFailure(self, test, error=None, details=None):
656         """Report an expected failure in test test.
657
658         Only one of error and details should be provided: conceptually there
659         are two separate methods:
660             addError(self, test, error)
661             addError(self, test, details)
662
663         :param error: Standard unittest positional argument form - an
664             exc_info tuple.
665         :param details: New Testing-in-python drafted API; a dict from string
666             to subunit.Content objects.
667         """
668         self._addOutcome("xfail", test, error=error, details=details)
669
670     def addFailure(self, test, error=None, details=None):
671         """Report a failure in test test.
672
673         Only one of error and details should be provided: conceptually there
674         are two separate methods:
675             addFailure(self, test, error)
676             addFailure(self, test, details)
677
678         :param error: Standard unittest positional argument form - an
679             exc_info tuple.
680         :param details: New Testing-in-python drafted API; a dict from string
681             to subunit.Content objects.
682         """
683         self._addOutcome("failure", test, error=error, details=details)
684         if self.failfast:
685             self.stop()
686
687     def _addOutcome(self, outcome, test, error=None, details=None,
688         error_permitted=True):
689         """Report a failure in test test.
690
691         Only one of error and details should be provided: conceptually there
692         are two separate methods:
693             addOutcome(self, test, error)
694             addOutcome(self, test, details)
695
696         :param outcome: A string describing the outcome - used as the
697             event name in the subunit stream.
698         :param error: Standard unittest positional argument form - an
699             exc_info tuple.
700         :param details: New Testing-in-python drafted API; a dict from string
701             to subunit.Content objects.
702         :param error_permitted: If True then one and only one of error or
703             details must be supplied. If False then error must not be supplied
704             and details is still optional.  """
705         self._stream.write(_b("%s: " % outcome) + self._test_id(test))
706         if error_permitted:
707             if error is None and details is None:
708                 raise ValueError
709         else:
710             if error is not None:
711                 raise ValueError
712         if error is not None:
713             self._stream.write(self._start_simple)
714             tb_content = TracebackContent(error, test)
715             for bytes in tb_content.iter_bytes():
716                 self._stream.write(bytes)
717         elif details is not None:
718             self._write_details(details)
719         else:
720             self._stream.write(_b("\n"))
721         if details is not None or error is not None:
722             self._stream.write(self._end_simple)
723
724     def addSkip(self, test, reason=None, details=None):
725         """Report a skipped test."""
726         if reason is None:
727             self._addOutcome("skip", test, error=None, details=details)
728         else:
729             self._stream.write(_b("skip: %s [\n" % test.id()))
730             self._stream.write(_b("%s\n" % reason))
731             self._stream.write(self._end_simple)
732
733     def addSuccess(self, test, details=None):
734         """Report a success in a test."""
735         self._addOutcome("successful", test, details=details, error_permitted=False)
736
737     def addUnexpectedSuccess(self, test, details=None):
738         """Report an unexpected success in test test.
739
740         Details can optionally be provided: conceptually there
741         are two separate methods:
742             addError(self, test)
743             addError(self, test, details)
744
745         :param details: New Testing-in-python drafted API; a dict from string
746             to subunit.Content objects.
747         """
748         self._addOutcome("uxsuccess", test, details=details,
749             error_permitted=False)
750         if self.failfast:
751             self.stop()
752
753     def _test_id(self, test):
754         result = test.id()
755         if type(result) is not bytes:
756             result = result.encode('utf8')
757         return result
758
759     def startTest(self, test):
760         """Mark a test as starting its test run."""
761         super(TestProtocolClient, self).startTest(test)
762         self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
763         self._stream.flush()
764
765     def stopTest(self, test):
766         super(TestProtocolClient, self).stopTest(test)
767         self._stream.flush()
768
769     def progress(self, offset, whence):
770         """Provide indication about the progress/length of the test run.
771
772         :param offset: Information about the number of tests remaining. If
773             whence is PROGRESS_CUR, then offset increases/decreases the
774             remaining test count. If whence is PROGRESS_SET, then offset
775             specifies exactly the remaining test count.
776         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
777             PROGRESS_POP.
778         """
779         if whence == PROGRESS_CUR and offset > -1:
780             prefix = self._progress_plus
781             offset = _b(str(offset))
782         elif whence == PROGRESS_PUSH:
783             prefix = self._empty_bytes
784             offset = self._progress_push
785         elif whence == PROGRESS_POP:
786             prefix = self._empty_bytes
787             offset = self._progress_pop
788         else:
789             prefix = self._empty_bytes
790             offset = _b(str(offset))
791         self._stream.write(self._progress_fmt + prefix + offset +
792             self._bytes_eol)
793
794     def tags(self, new_tags, gone_tags):
795         """Inform the client about tags added/removed from the stream."""
796         if not new_tags and not gone_tags:
797             return
798         tags = set([tag.encode('utf8') for tag in new_tags])
799         tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
800         tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
801         self._stream.write(tag_line)
802
803     def time(self, a_datetime):
804         """Inform the client of the time.
805
806         ":param datetime: A datetime.datetime object.
807         """
808         time = a_datetime.astimezone(iso8601.Utc())
809         self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
810             time.year, time.month, time.day, time.hour, time.minute,
811             time.second, time.microsecond)))
812
813     def _write_details(self, details):
814         """Output details to the stream.
815
816         :param details: An extended details dict for a test outcome.
817         """
818         self._stream.write(_b(" [ multipart\n"))
819         for name, content in sorted(details.items()):
820             self._stream.write(_b("Content-Type: %s/%s" %
821                 (content.content_type.type, content.content_type.subtype)))
822             parameters = content.content_type.parameters
823             if parameters:
824                 self._stream.write(_b(";"))
825                 param_strs = []
826                 for param, value in parameters.items():
827                     param_strs.append("%s=%s" % (param, value))
828                 self._stream.write(_b(",".join(param_strs)))
829             self._stream.write(_b("\n%s\n" % name))
830             encoder = chunked.Encoder(self._stream)
831             list(map(encoder.write, content.iter_bytes()))
832             encoder.close()
833
834     def done(self):
835         """Obey the testtools result.done() interface."""
836
837
838 def RemoteError(description=_u("")):
839     return (_StringException, _StringException(description), None)
840
841
842 class RemotedTestCase(unittest.TestCase):
843     """A class to represent test cases run in child processes.
844
845     Instances of this class are used to provide the Python test API a TestCase
846     that can be printed to the screen, introspected for metadata and so on.
847     However, as they are a simply a memoisation of a test that was actually
848     run in the past by a separate process, they cannot perform any interactive
849     actions.
850     """
851
852     def __eq__ (self, other):
853         try:
854             return self.__description == other.__description
855         except AttributeError:
856             return False
857
858     def __init__(self, description):
859         """Create a psuedo test case with description description."""
860         self.__description = description
861
862     def error(self, label):
863         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
864             label)
865
866     def setUp(self):
867         self.error("setUp")
868
869     def tearDown(self):
870         self.error("tearDown")
871
872     def shortDescription(self):
873         return self.__description
874
875     def id(self):
876         return "%s" % (self.__description,)
877
878     def __str__(self):
879         return "%s (%s)" % (self.__description, self._strclass())
880
881     def __repr__(self):
882         return "<%s description='%s'>" % \
883                (self._strclass(), self.__description)
884
885     def run(self, result=None):
886         if result is None: result = self.defaultTestResult()
887         result.startTest(self)
888         result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
889         result.stopTest(self)
890
891     def _strclass(self):
892         cls = self.__class__
893         return "%s.%s" % (cls.__module__, cls.__name__)
894
895
896 class ExecTestCase(unittest.TestCase):
897     """A test case which runs external scripts for test fixtures."""
898
899     def __init__(self, methodName='runTest'):
900         """Create an instance of the class that will use the named test
901            method when executed. Raises a ValueError if the instance does
902            not have a method with the specified name.
903         """
904         unittest.TestCase.__init__(self, methodName)
905         testMethod = getattr(self, methodName)
906         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
907                                testMethod.__doc__)
908
909     def countTestCases(self):
910         return 1
911
912     def run(self, result=None):
913         if result is None: result = self.defaultTestResult()
914         self._run(result)
915
916     def debug(self):
917         """Run the test without collecting errors in a TestResult"""
918         self._run(testresult.TestResult())
919
920     def _run(self, result):
921         protocol = TestProtocolServer(result)
922         process = subprocess.Popen(self.script, shell=True,
923             stdout=subprocess.PIPE)
924         _make_stream_binary(process.stdout)
925         output = process.communicate()[0]
926         protocol.readFrom(BytesIO(output))
927
928
929 class IsolatedTestCase(unittest.TestCase):
930     """A TestCase which executes in a forked process.
931
932     Each test gets its own process, which has a performance overhead but will
933     provide excellent isolation from global state (such as django configs,
934     zope utilities and so on).
935     """
936
937     def run(self, result=None):
938         if result is None: result = self.defaultTestResult()
939         run_isolated(unittest.TestCase, self, result)
940
941
942 class IsolatedTestSuite(unittest.TestSuite):
943     """A TestSuite which runs its tests in a forked process.
944
945     This decorator that will fork() before running the tests and report the
946     results from the child process using a Subunit stream.  This is useful for
947     handling tests that mutate global state, or are testing C extensions that
948     could crash the VM.
949     """
950
951     def run(self, result=None):
952         if result is None: result = testresult.TestResult()
953         run_isolated(unittest.TestSuite, self, result)
954
955
956 def run_isolated(klass, self, result):
957     """Run a test suite or case in a subprocess, using the run method on klass.
958     """
959     c2pread, c2pwrite = os.pipe()
960     # fixme - error -> result
961     # now fork
962     pid = os.fork()
963     if pid == 0:
964         # Child
965         # Close parent's pipe ends
966         os.close(c2pread)
967         # Dup fds for child
968         os.dup2(c2pwrite, 1)
969         # Close pipe fds.
970         os.close(c2pwrite)
971
972         # at this point, sys.stdin is redirected, now we want
973         # to filter it to escape ]'s.
974         ### XXX: test and write that bit.
975         stream = os.fdopen(1, 'wb')
976         result = TestProtocolClient(stream)
977         klass.run(self, result)
978         stream.flush()
979         sys.stderr.flush()
980         # exit HARD, exit NOW.
981         os._exit(0)
982     else:
983         # Parent
984         # Close child pipe ends
985         os.close(c2pwrite)
986         # hookup a protocol engine
987         protocol = TestProtocolServer(result)
988         fileobj = os.fdopen(c2pread, 'rb')
989         protocol.readFrom(fileobj)
990         os.waitpid(pid, 0)
991         # TODO return code evaluation.
992     return result
993
994
995 def TAP2SubUnit(tap, subunit):
996     """Filter a TAP pipe into a subunit pipe.
997
998     :param tap: A tap pipe/stream/file object.
999     :param subunit: A pipe/stream/file object to write subunit results to.
1000     :return: The exit code to exit with.
1001     """
1002     BEFORE_PLAN = 0
1003     AFTER_PLAN = 1
1004     SKIP_STREAM = 2
1005     state = BEFORE_PLAN
1006     plan_start = 1
1007     plan_stop = 0
1008     def _skipped_test(subunit, plan_start):
1009         # Some tests were skipped.
1010         subunit.write('test test %d\n' % plan_start)
1011         subunit.write('error test %d [\n' % plan_start)
1012         subunit.write('test missing from TAP output\n')
1013         subunit.write(']\n')
1014         return plan_start + 1
1015     # Test data for the next test to emit
1016     test_name = None
1017     log = []
1018     result = None
1019     def _emit_test():
1020         "write out a test"
1021         if test_name is None:
1022             return
1023         subunit.write("test %s\n" % test_name)
1024         if not log:
1025             subunit.write("%s %s\n" % (result, test_name))
1026         else:
1027             subunit.write("%s %s [\n" % (result, test_name))
1028         if log:
1029             for line in log:
1030                 subunit.write("%s\n" % line)
1031             subunit.write("]\n")
1032         del log[:]
1033     for line in tap:
1034         if state == BEFORE_PLAN:
1035             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
1036             if match:
1037                 state = AFTER_PLAN
1038                 _, plan_stop, comment = match.groups()
1039                 plan_stop = int(plan_stop)
1040                 if plan_start > plan_stop and plan_stop == 0:
1041                     # skipped file
1042                     state = SKIP_STREAM
1043                     subunit.write("test file skip\n")
1044                     subunit.write("skip file skip [\n")
1045                     subunit.write("%s\n" % comment)
1046                     subunit.write("]\n")
1047                 continue
1048         # not a plan line, or have seen one before
1049         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1050         if match:
1051             # new test, emit current one.
1052             _emit_test()
1053             status, number, description, directive, directive_comment = match.groups()
1054             if status == 'ok':
1055                 result = 'success'
1056             else:
1057                 result = "failure"
1058             if description is None:
1059                 description = ''
1060             else:
1061                 description = ' ' + description
1062             if directive is not None:
1063                 if directive.upper() == 'TODO':
1064                     result = 'xfail'
1065                 elif directive.upper() == 'SKIP':
1066                     result = 'skip'
1067                 if directive_comment is not None:
1068                     log.append(directive_comment)
1069             if number is not None:
1070                 number = int(number)
1071                 while plan_start < number:
1072                     plan_start = _skipped_test(subunit, plan_start)
1073             test_name = "test %d%s" % (plan_start, description)
1074             plan_start += 1
1075             continue
1076         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1077         if match:
1078             reason, = match.groups()
1079             if reason is None:
1080                 extra = ''
1081             else:
1082                 extra = ' %s' % reason
1083             _emit_test()
1084             test_name = "Bail out!%s" % extra
1085             result = "error"
1086             state = SKIP_STREAM
1087             continue
1088         match = re.match("\#.*\n", line)
1089         if match:
1090             log.append(line[:-1])
1091             continue
1092         subunit.write(line)
1093     _emit_test()
1094     while plan_start <= plan_stop:
1095         # record missed tests
1096         plan_start = _skipped_test(subunit, plan_start)
1097     return 0
1098
1099
1100 def tag_stream(original, filtered, tags):
1101     """Alter tags on a stream.
1102
1103     :param original: The input stream.
1104     :param filtered: The output stream.
1105     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1106         '-TAG' commands.
1107
1108         A 'TAG' command will add the tag to the output stream,
1109         and override any existing '-TAG' command in that stream.
1110         Specifically:
1111          * A global 'tags: TAG' will be added to the start of the stream.
1112          * Any tags commands with -TAG will have the -TAG removed.
1113
1114         A '-TAG' command will remove the TAG command from the stream.
1115         Specifically:
1116          * A 'tags: -TAG' command will be added to the start of the stream.
1117          * Any 'tags: TAG' command will have 'TAG' removed from it.
1118         Additionally, any redundant tagging commands (adding a tag globally
1119         present, or removing a tag globally removed) are stripped as a
1120         by-product of the filtering.
1121     :return: 0
1122     """
1123     new_tags, gone_tags = tags_to_new_gone(tags)
1124     def write_tags(new_tags, gone_tags):
1125         if new_tags or gone_tags:
1126             filtered.write("tags: " + ' '.join(new_tags))
1127             if gone_tags:
1128                 for tag in gone_tags:
1129                     filtered.write("-" + tag)
1130             filtered.write("\n")
1131     write_tags(new_tags, gone_tags)
1132     # TODO: use the protocol parser and thus don't mangle test comments.
1133     for line in original:
1134         if line.startswith("tags:"):
1135             line_tags = line[5:].split()
1136             line_new, line_gone = tags_to_new_gone(line_tags)
1137             line_new = line_new - gone_tags
1138             line_gone = line_gone - new_tags
1139             write_tags(line_new, line_gone)
1140         else:
1141             filtered.write(line)
1142     return 0
1143
1144
1145 class ProtocolTestCase(object):
1146     """Subunit wire protocol to unittest.TestCase adapter.
1147
1148     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1149     calling a ProtocolTestCase or invoking the run() method will make a 'test
1150     run' happen. The 'test run' will simply be a replay of the test activity
1151     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1152     and ``countTestCases`` methods are not supported because there isn't a
1153     sensible mapping for those methods.
1154
1155     # Get a stream (any object with a readline() method), in this case the
1156     # stream output by the example from ``subunit.TestProtocolClient``.
1157     stream = file('tests.log', 'rb')
1158     # Create a parser which will read from the stream and emit
1159     # activity to a unittest.TestResult when run() is called.
1160     suite = subunit.ProtocolTestCase(stream)
1161     # Create a result object to accept the contents of that stream.
1162     result = unittest._TextTestResult(sys.stdout)
1163     # 'run' the tests - process the stream and feed its contents to result.
1164     suite.run(result)
1165     stream.close()
1166
1167     :seealso: TestProtocolServer (the subunit wire protocol parser).
1168     """
1169
1170     def __init__(self, stream, passthrough=None, forward=None):
1171         """Create a ProtocolTestCase reading from stream.
1172
1173         :param stream: A filelike object which a subunit stream can be read
1174             from.
1175         :param passthrough: A stream pass non subunit input on to. If not
1176             supplied, the TestProtocolServer default is used.
1177         :param forward: A stream to pass subunit input on to. If not supplied
1178             subunit input is not forwarded.
1179         """
1180         stream = _make_stream_binary(stream)
1181         self._stream = stream
1182         self._passthrough = passthrough
1183         if forward is not None:
1184             forward = _make_stream_binary(forward)
1185         self._forward = forward
1186
1187     def __call__(self, result=None):
1188         return self.run(result)
1189
1190     def run(self, result=None):
1191         if result is None:
1192             result = self.defaultTestResult()
1193         protocol = TestProtocolServer(result, self._passthrough, self._forward)
1194         line = self._stream.readline()
1195         while line:
1196             protocol.lineReceived(line)
1197             line = self._stream.readline()
1198         protocol.lostConnection()
1199
1200
1201 class TestResultStats(testresult.TestResult):
1202     """A pyunit TestResult interface implementation for making statistics.
1203
1204     :ivar total_tests: The total tests seen.
1205     :ivar passed_tests: The tests that passed.
1206     :ivar failed_tests: The tests that failed.
1207     :ivar seen_tags: The tags seen across all tests.
1208     """
1209
1210     def __init__(self, stream):
1211         """Create a TestResultStats which outputs to stream."""
1212         testresult.TestResult.__init__(self)
1213         self._stream = stream
1214         self.failed_tests = 0
1215         self.skipped_tests = 0
1216         self.seen_tags = set()
1217
1218     @property
1219     def total_tests(self):
1220         return self.testsRun
1221
1222     def addError(self, test, err, details=None):
1223         self.failed_tests += 1
1224
1225     def addFailure(self, test, err, details=None):
1226         self.failed_tests += 1
1227
1228     def addSkip(self, test, reason, details=None):
1229         self.skipped_tests += 1
1230
1231     def formatStats(self):
1232         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1233         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1234         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1235         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1236         tags = sorted(self.seen_tags)
1237         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1238
1239     @property
1240     def passed_tests(self):
1241         return self.total_tests - self.failed_tests - self.skipped_tests
1242
1243     def tags(self, new_tags, gone_tags):
1244         """Accumulate the seen tags."""
1245         self.seen_tags.update(new_tags)
1246
1247     def wasSuccessful(self):
1248         """Tells whether or not this result was a success"""
1249         return self.failed_tests == 0
1250
1251
1252 def get_default_formatter():
1253     """Obtain the default formatter to write to.
1254
1255     :return: A file-like object.
1256     """
1257     formatter = os.getenv("SUBUNIT_FORMATTER")
1258     if formatter:
1259         return os.popen(formatter, "w")
1260     else:
1261         stream = sys.stdout
1262         if sys.version_info > (3, 0):
1263             stream = stream.buffer
1264         return stream
1265
1266
1267 def read_test_list(path):
1268     """Read a list of test ids from a file on disk.
1269
1270     :param path: Path to the file
1271     :return: Sequence of test ids
1272     """
1273     f = open(path, 'rb')
1274     try:
1275         return [l.rstrip("\n") for l in f.readlines()]
1276     finally:
1277         f.close()
1278
1279
1280 def _make_stream_binary(stream):
1281     """Ensure that a stream will be binary safe. See _make_binary_on_windows.
1282     
1283     :return: A binary version of the same stream (some streams cannot be
1284         'fixed' but can be unwrapped).
1285     """
1286     try:
1287         fileno = stream.fileno()
1288     except _UnsupportedOperation:
1289         pass
1290     else:
1291         _make_binary_on_windows(fileno)
1292     return _unwrap_text(stream)
1293
1294 def _make_binary_on_windows(fileno):
1295     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1296     if sys.platform == "win32":
1297         import msvcrt
1298         msvcrt.setmode(fileno, os.O_BINARY)
1299
1300
1301 def _unwrap_text(stream):
1302     """Unwrap stream if it is a text stream to get the original buffer."""
1303     if sys.version_info > (3, 0):
1304         try:
1305             # Read streams
1306             if type(stream.read(0)) is str:
1307                 return stream.buffer
1308         except (_UnsupportedOperation, IOError):
1309             # Cannot read from the stream: try via writes
1310             try:
1311                 stream.write(_b(''))
1312             except TypeError:
1313                 return stream.buffer
1314     return stream