Release 0.0.13.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ++++++++
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
58
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
64
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
69
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
74
75
76 Python test support
77 -------------------
78
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
81
82   $ python -m subunit.run mylib.tests.test_suite
83
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
86
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
89
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
93
94  class AggregateTests(subunit.ExecTestCase):
95
96      def test_script_one(self):
97          './bin/script_one'
98
99      def test_script_two(self):
100          './bin/script_two'
101
102  # Normally your normal test loading would take of this automatically,
103  # It is only spelt out in detail here for clarity.
104  suite = unittest.TestSuite([AggregateTests("test_script_one"),
105      AggregateTests("test_script_two")])
106  # Create any TestResult class you like.
107  result = unittest._TextTestResult(sys.stdout)
108  # And run your suite as normal, Subunit will exec each external script as
109  # needed and report to your result object.
110  suite.run(result)
111
112 Utility modules
113 ---------------
114
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
117 """
118
119 import os
120 import re
121 import subprocess
122 import sys
123 import unittest
124 try:
125     from io import UnsupportedOperation as _UnsupportedOperation
126 except ImportError:
127     _UnsupportedOperation = AttributeError
128
129 from extras import safe_hasattr
130 from testtools import content, content_type, ExtendedToOriginalDecorator
131 from testtools.content import TracebackContent
132 from testtools.compat import _b, _u, BytesIO, StringIO
133 try:
134     from testtools.testresult.real import _StringException
135     RemoteException = _StringException
136     # For testing: different pythons have different str() implementations.
137     if sys.version_info > (3, 0):
138         _remote_exception_str = "testtools.testresult.real._StringException"
139         _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
140     else:
141         _remote_exception_str = "_StringException" 
142         _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
143 except ImportError:
144     raise ImportError ("testtools.testresult.real does not contain "
145         "_StringException, check your version.")
146 from testtools import testresult, CopyStreamResult
147
148 from subunit import chunked, details, iso8601, test_results
149 from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes
150
151 # same format as sys.version_info: "A tuple containing the five components of
152 # the version number: major, minor, micro, releaselevel, and serial. All
153 # values except releaselevel are integers; the release level is 'alpha',
154 # 'beta', 'candidate', or 'final'. The version_info value corresponding to the
155 # Python version 2.0 is (2, 0, 0, 'final', 0)."  Additionally we use a
156 # releaselevel of 'dev' for unreleased under-development code.
157 #
158 # If the releaselevel is 'alpha' then the major/minor/micro components are not
159 # established at this point, and setup.py will use a version of next-$(revno).
160 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
161 # Otherwise it is major.minor.micro~$(revno).
162
163 __version__ = (0, 0, 13, 'final', 0)
164
165 PROGRESS_SET = 0
166 PROGRESS_CUR = 1
167 PROGRESS_PUSH = 2
168 PROGRESS_POP = 3
169
170
171 def test_suite():
172     import subunit.tests
173     return subunit.tests.test_suite()
174
175
176 def join_dir(base_path, path):
177     """
178     Returns an absolute path to C{path}, calculated relative to the parent
179     of C{base_path}.
180
181     @param base_path: A path to a file or directory.
182     @param path: An absolute path, or a path relative to the containing
183     directory of C{base_path}.
184
185     @return: An absolute path to C{path}.
186     """
187     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
188
189
190 def tags_to_new_gone(tags):
191     """Split a list of tags into a new_set and a gone_set."""
192     new_tags = set()
193     gone_tags = set()
194     for tag in tags:
195         if tag[0] == '-':
196             gone_tags.add(tag[1:])
197         else:
198             new_tags.add(tag)
199     return new_tags, gone_tags
200
201
202 class DiscardStream(object):
203     """A filelike object which discards what is written to it."""
204
205     def fileno(self):
206         raise _UnsupportedOperation()
207
208     def write(self, bytes):
209         pass
210
211     def read(self, len=0):
212         return _b('')
213
214
215 class _ParserState(object):
216     """State for the subunit parser."""
217
218     def __init__(self, parser):
219         self.parser = parser
220         self._test_sym = (_b('test'), _b('testing'))
221         self._colon_sym = _b(':')
222         self._error_sym = (_b('error'),)
223         self._failure_sym = (_b('failure'),)
224         self._progress_sym = (_b('progress'),)
225         self._skip_sym = _b('skip')
226         self._success_sym = (_b('success'), _b('successful'))
227         self._tags_sym = (_b('tags'),)
228         self._time_sym = (_b('time'),)
229         self._xfail_sym = (_b('xfail'),)
230         self._uxsuccess_sym = (_b('uxsuccess'),)
231         self._start_simple = _u(" [")
232         self._start_multipart = _u(" [ multipart")
233
234     def addError(self, offset, line):
235         """An 'error:' directive has been read."""
236         self.parser.stdOutLineReceived(line)
237
238     def addExpectedFail(self, offset, line):
239         """An 'xfail:' directive has been read."""
240         self.parser.stdOutLineReceived(line)
241
242     def addFailure(self, offset, line):
243         """A 'failure:' directive has been read."""
244         self.parser.stdOutLineReceived(line)
245
246     def addSkip(self, offset, line):
247         """A 'skip:' directive has been read."""
248         self.parser.stdOutLineReceived(line)
249
250     def addSuccess(self, offset, line):
251         """A 'success:' directive has been read."""
252         self.parser.stdOutLineReceived(line)
253
254     def lineReceived(self, line):
255         """a line has been received."""
256         parts = line.split(None, 1)
257         if len(parts) == 2 and line.startswith(parts[0]):
258             cmd, rest = parts
259             offset = len(cmd) + 1
260             cmd = cmd.rstrip(self._colon_sym)
261             if cmd in self._test_sym:
262                 self.startTest(offset, line)
263             elif cmd in self._error_sym:
264                 self.addError(offset, line)
265             elif cmd in self._failure_sym:
266                 self.addFailure(offset, line)
267             elif cmd in self._progress_sym:
268                 self.parser._handleProgress(offset, line)
269             elif cmd in self._skip_sym:
270                 self.addSkip(offset, line)
271             elif cmd in self._success_sym:
272                 self.addSuccess(offset, line)
273             elif cmd in self._tags_sym:
274                 self.parser._handleTags(offset, line)
275                 self.parser.subunitLineReceived(line)
276             elif cmd in self._time_sym:
277                 self.parser._handleTime(offset, line)
278                 self.parser.subunitLineReceived(line)
279             elif cmd in self._xfail_sym:
280                 self.addExpectedFail(offset, line)
281             elif cmd in self._uxsuccess_sym:
282                 self.addUnexpectedSuccess(offset, line)
283             else:
284                 self.parser.stdOutLineReceived(line)
285         else:
286             self.parser.stdOutLineReceived(line)
287
288     def lostConnection(self):
289         """Connection lost."""
290         self.parser._lostConnectionInTest(_u('unknown state of '))
291
292     def startTest(self, offset, line):
293         """A test start command received."""
294         self.parser.stdOutLineReceived(line)
295
296
297 class _InTest(_ParserState):
298     """State for the subunit parser after reading a test: directive."""
299
300     def _outcome(self, offset, line, no_details, details_state):
301         """An outcome directive has been read.
302
303         :param no_details: Callable to call when no details are presented.
304         :param details_state: The state to switch to for details
305             processing of this outcome.
306         """
307         test_name = line[offset:-1].decode('utf8')
308         if self.parser.current_test_description == test_name:
309             self.parser._state = self.parser._outside_test
310             self.parser.current_test_description = None
311             no_details()
312             self.parser.client.stopTest(self.parser._current_test)
313             self.parser._current_test = None
314             self.parser.subunitLineReceived(line)
315         elif self.parser.current_test_description + self._start_simple == \
316             test_name:
317             self.parser._state = details_state
318             details_state.set_simple()
319             self.parser.subunitLineReceived(line)
320         elif self.parser.current_test_description + self._start_multipart == \
321             test_name:
322             self.parser._state = details_state
323             details_state.set_multipart()
324             self.parser.subunitLineReceived(line)
325         else:
326             self.parser.stdOutLineReceived(line)
327
328     def _error(self):
329         self.parser.client.addError(self.parser._current_test,
330             details={})
331
332     def addError(self, offset, line):
333         """An 'error:' directive has been read."""
334         self._outcome(offset, line, self._error,
335             self.parser._reading_error_details)
336
337     def _xfail(self):
338         self.parser.client.addExpectedFailure(self.parser._current_test,
339             details={})
340
341     def addExpectedFail(self, offset, line):
342         """An 'xfail:' directive has been read."""
343         self._outcome(offset, line, self._xfail,
344             self.parser._reading_xfail_details)
345
346     def _uxsuccess(self):
347         self.parser.client.addUnexpectedSuccess(self.parser._current_test)
348
349     def addUnexpectedSuccess(self, offset, line):
350         """A 'uxsuccess:' directive has been read."""
351         self._outcome(offset, line, self._uxsuccess,
352             self.parser._reading_uxsuccess_details)
353
354     def _failure(self):
355         self.parser.client.addFailure(self.parser._current_test, details={})
356
357     def addFailure(self, offset, line):
358         """A 'failure:' directive has been read."""
359         self._outcome(offset, line, self._failure,
360             self.parser._reading_failure_details)
361
362     def _skip(self):
363         self.parser.client.addSkip(self.parser._current_test, details={})
364
365     def addSkip(self, offset, line):
366         """A 'skip:' directive has been read."""
367         self._outcome(offset, line, self._skip,
368             self.parser._reading_skip_details)
369
370     def _succeed(self):
371         self.parser.client.addSuccess(self.parser._current_test, details={})
372
373     def addSuccess(self, offset, line):
374         """A 'success:' directive has been read."""
375         self._outcome(offset, line, self._succeed,
376             self.parser._reading_success_details)
377
378     def lostConnection(self):
379         """Connection lost."""
380         self.parser._lostConnectionInTest(_u(''))
381
382
383 class _OutSideTest(_ParserState):
384     """State for the subunit parser outside of a test context."""
385
386     def lostConnection(self):
387         """Connection lost."""
388
389     def startTest(self, offset, line):
390         """A test start command received."""
391         self.parser._state = self.parser._in_test
392         test_name = line[offset:-1].decode('utf8')
393         self.parser._current_test = RemotedTestCase(test_name)
394         self.parser.current_test_description = test_name
395         self.parser.client.startTest(self.parser._current_test)
396         self.parser.subunitLineReceived(line)
397
398
399 class _ReadingDetails(_ParserState):
400     """Common logic for readin state details."""
401
402     def endDetails(self):
403         """The end of a details section has been reached."""
404         self.parser._state = self.parser._outside_test
405         self.parser.current_test_description = None
406         self._report_outcome()
407         self.parser.client.stopTest(self.parser._current_test)
408
409     def lineReceived(self, line):
410         """a line has been received."""
411         self.details_parser.lineReceived(line)
412         self.parser.subunitLineReceived(line)
413
414     def lostConnection(self):
415         """Connection lost."""
416         self.parser._lostConnectionInTest(_u('%s report of ') %
417             self._outcome_label())
418
419     def _outcome_label(self):
420         """The label to describe this outcome."""
421         raise NotImplementedError(self._outcome_label)
422
423     def set_simple(self):
424         """Start a simple details parser."""
425         self.details_parser = details.SimpleDetailsParser(self)
426
427     def set_multipart(self):
428         """Start a multipart details parser."""
429         self.details_parser = details.MultipartDetailsParser(self)
430
431
432 class _ReadingFailureDetails(_ReadingDetails):
433     """State for the subunit parser when reading failure details."""
434
435     def _report_outcome(self):
436         self.parser.client.addFailure(self.parser._current_test,
437             details=self.details_parser.get_details())
438
439     def _outcome_label(self):
440         return "failure"
441
442
443 class _ReadingErrorDetails(_ReadingDetails):
444     """State for the subunit parser when reading error details."""
445
446     def _report_outcome(self):
447         self.parser.client.addError(self.parser._current_test,
448             details=self.details_parser.get_details())
449
450     def _outcome_label(self):
451         return "error"
452
453
454 class _ReadingExpectedFailureDetails(_ReadingDetails):
455     """State for the subunit parser when reading xfail details."""
456
457     def _report_outcome(self):
458         self.parser.client.addExpectedFailure(self.parser._current_test,
459             details=self.details_parser.get_details())
460
461     def _outcome_label(self):
462         return "xfail"
463
464
465 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
466     """State for the subunit parser when reading uxsuccess details."""
467
468     def _report_outcome(self):
469         self.parser.client.addUnexpectedSuccess(self.parser._current_test,
470             details=self.details_parser.get_details())
471
472     def _outcome_label(self):
473         return "uxsuccess"
474
475
476 class _ReadingSkipDetails(_ReadingDetails):
477     """State for the subunit parser when reading skip details."""
478
479     def _report_outcome(self):
480         self.parser.client.addSkip(self.parser._current_test,
481             details=self.details_parser.get_details("skip"))
482
483     def _outcome_label(self):
484         return "skip"
485
486
487 class _ReadingSuccessDetails(_ReadingDetails):
488     """State for the subunit parser when reading success details."""
489
490     def _report_outcome(self):
491         self.parser.client.addSuccess(self.parser._current_test,
492             details=self.details_parser.get_details("success"))
493
494     def _outcome_label(self):
495         return "success"
496
497
498 class TestProtocolServer(object):
499     """A parser for subunit.
500
501     :ivar tags: The current tags associated with the protocol stream.
502     """
503
504     def __init__(self, client, stream=None, forward_stream=None):
505         """Create a TestProtocolServer instance.
506
507         :param client: An object meeting the unittest.TestResult protocol.
508         :param stream: The stream that lines received which are not part of the
509             subunit protocol should be written to. This allows custom handling
510             of mixed protocols. By default, sys.stdout will be used for
511             convenience. It should accept bytes to its write() method.
512         :param forward_stream: A stream to forward subunit lines to. This
513             allows a filter to forward the entire stream while still parsing
514             and acting on it. By default forward_stream is set to
515             DiscardStream() and no forwarding happens.
516         """
517         self.client = ExtendedToOriginalDecorator(client)
518         if stream is None:
519             stream = sys.stdout
520             if sys.version_info > (3, 0):
521                 stream = stream.buffer
522         self._stream = stream
523         self._forward_stream = forward_stream or DiscardStream()
524         # state objects we can switch too
525         self._in_test = _InTest(self)
526         self._outside_test = _OutSideTest(self)
527         self._reading_error_details = _ReadingErrorDetails(self)
528         self._reading_failure_details = _ReadingFailureDetails(self)
529         self._reading_skip_details = _ReadingSkipDetails(self)
530         self._reading_success_details = _ReadingSuccessDetails(self)
531         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
532         self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
533         # start with outside test.
534         self._state = self._outside_test
535         # Avoid casts on every call
536         self._plusminus = _b('+-')
537         self._push_sym = _b('push')
538         self._pop_sym = _b('pop')
539
540     def _handleProgress(self, offset, line):
541         """Process a progress directive."""
542         line = line[offset:].strip()
543         if line[0] in self._plusminus:
544             whence = PROGRESS_CUR
545             delta = int(line)
546         elif line == self._push_sym:
547             whence = PROGRESS_PUSH
548             delta = None
549         elif line == self._pop_sym:
550             whence = PROGRESS_POP
551             delta = None
552         else:
553             whence = PROGRESS_SET
554             delta = int(line)
555         self.client.progress(delta, whence)
556
557     def _handleTags(self, offset, line):
558         """Process a tags command."""
559         tags = line[offset:].decode('utf8').split()
560         new_tags, gone_tags = tags_to_new_gone(tags)
561         self.client.tags(new_tags, gone_tags)
562
563     def _handleTime(self, offset, line):
564         # Accept it, but do not do anything with it yet.
565         try:
566             event_time = iso8601.parse_date(line[offset:-1])
567         except TypeError:
568             raise TypeError(_u("Failed to parse %r, got %r")
569                 % (line, sys.exec_info[1]))
570         self.client.time(event_time)
571
572     def lineReceived(self, line):
573         """Call the appropriate local method for the received line."""
574         self._state.lineReceived(line)
575
576     def _lostConnectionInTest(self, state_string):
577         error_string = _u("lost connection during %stest '%s'") % (
578             state_string, self.current_test_description)
579         self.client.addError(self._current_test, RemoteError(error_string))
580         self.client.stopTest(self._current_test)
581
582     def lostConnection(self):
583         """The input connection has finished."""
584         self._state.lostConnection()
585
586     def readFrom(self, pipe):
587         """Blocking convenience API to parse an entire stream.
588
589         :param pipe: A file-like object supporting readlines().
590         :return: None.
591         """
592         for line in pipe.readlines():
593             self.lineReceived(line)
594         self.lostConnection()
595
596     def _startTest(self, offset, line):
597         """Internal call to change state machine. Override startTest()."""
598         self._state.startTest(offset, line)
599
600     def subunitLineReceived(self, line):
601         self._forward_stream.write(line)
602
603     def stdOutLineReceived(self, line):
604         self._stream.write(line)
605
606
607 class TestProtocolClient(testresult.TestResult):
608     """A TestResult which generates a subunit stream for a test run.
609
610     # Get a TestSuite or TestCase to run
611     suite = make_suite()
612     # Create a stream (any object with a 'write' method). This should accept
613     # bytes not strings: subunit is a byte orientated protocol.
614     stream = file('tests.log', 'wb')
615     # Create a subunit result object which will output to the stream
616     result = subunit.TestProtocolClient(stream)
617     # Optionally, to get timing data for performance analysis, wrap the
618     # serialiser with a timing decorator
619     result = subunit.test_results.AutoTimingTestResultDecorator(result)
620     # Run the test suite reporting to the subunit result object
621     suite.run(result)
622     # Close the stream.
623     stream.close()
624     """
625
626     def __init__(self, stream):
627         testresult.TestResult.__init__(self)
628         stream = make_stream_binary(stream)
629         self._stream = stream
630         self._progress_fmt = _b("progress: ")
631         self._bytes_eol = _b("\n")
632         self._progress_plus = _b("+")
633         self._progress_push = _b("push")
634         self._progress_pop = _b("pop")
635         self._empty_bytes = _b("")
636         self._start_simple = _b(" [\n")
637         self._end_simple = _b("]\n")
638
639     def addError(self, test, error=None, details=None):
640         """Report an error in test test.
641
642         Only one of error and details should be provided: conceptually there
643         are two separate methods:
644             addError(self, test, error)
645             addError(self, test, details)
646
647         :param error: Standard unittest positional argument form - an
648             exc_info tuple.
649         :param details: New Testing-in-python drafted API; a dict from string
650             to subunit.Content objects.
651         """
652         self._addOutcome("error", test, error=error, details=details)
653         if self.failfast:
654             self.stop()
655
656     def addExpectedFailure(self, test, error=None, details=None):
657         """Report an expected failure in test test.
658
659         Only one of error and details should be provided: conceptually there
660         are two separate methods:
661             addError(self, test, error)
662             addError(self, test, details)
663
664         :param error: Standard unittest positional argument form - an
665             exc_info tuple.
666         :param details: New Testing-in-python drafted API; a dict from string
667             to subunit.Content objects.
668         """
669         self._addOutcome("xfail", test, error=error, details=details)
670
671     def addFailure(self, test, error=None, details=None):
672         """Report a failure in test test.
673
674         Only one of error and details should be provided: conceptually there
675         are two separate methods:
676             addFailure(self, test, error)
677             addFailure(self, test, details)
678
679         :param error: Standard unittest positional argument form - an
680             exc_info tuple.
681         :param details: New Testing-in-python drafted API; a dict from string
682             to subunit.Content objects.
683         """
684         self._addOutcome("failure", test, error=error, details=details)
685         if self.failfast:
686             self.stop()
687
688     def _addOutcome(self, outcome, test, error=None, details=None,
689         error_permitted=True):
690         """Report a failure in test test.
691
692         Only one of error and details should be provided: conceptually there
693         are two separate methods:
694             addOutcome(self, test, error)
695             addOutcome(self, test, details)
696
697         :param outcome: A string describing the outcome - used as the
698             event name in the subunit stream.
699         :param error: Standard unittest positional argument form - an
700             exc_info tuple.
701         :param details: New Testing-in-python drafted API; a dict from string
702             to subunit.Content objects.
703         :param error_permitted: If True then one and only one of error or
704             details must be supplied. If False then error must not be supplied
705             and details is still optional.  """
706         self._stream.write(_b("%s: " % outcome) + self._test_id(test))
707         if error_permitted:
708             if error is None and details is None:
709                 raise ValueError
710         else:
711             if error is not None:
712                 raise ValueError
713         if error is not None:
714             self._stream.write(self._start_simple)
715             tb_content = TracebackContent(error, test)
716             for bytes in tb_content.iter_bytes():
717                 self._stream.write(bytes)
718         elif details is not None:
719             self._write_details(details)
720         else:
721             self._stream.write(_b("\n"))
722         if details is not None or error is not None:
723             self._stream.write(self._end_simple)
724
725     def addSkip(self, test, reason=None, details=None):
726         """Report a skipped test."""
727         if reason is None:
728             self._addOutcome("skip", test, error=None, details=details)
729         else:
730             self._stream.write(_b("skip: %s [\n" % test.id()))
731             self._stream.write(_b("%s\n" % reason))
732             self._stream.write(self._end_simple)
733
734     def addSuccess(self, test, details=None):
735         """Report a success in a test."""
736         self._addOutcome("successful", test, details=details, error_permitted=False)
737
738     def addUnexpectedSuccess(self, test, details=None):
739         """Report an unexpected success in test test.
740
741         Details can optionally be provided: conceptually there
742         are two separate methods:
743             addError(self, test)
744             addError(self, test, details)
745
746         :param details: New Testing-in-python drafted API; a dict from string
747             to subunit.Content objects.
748         """
749         self._addOutcome("uxsuccess", test, details=details,
750             error_permitted=False)
751         if self.failfast:
752             self.stop()
753
754     def _test_id(self, test):
755         result = test.id()
756         if type(result) is not bytes:
757             result = result.encode('utf8')
758         return result
759
760     def startTest(self, test):
761         """Mark a test as starting its test run."""
762         super(TestProtocolClient, self).startTest(test)
763         self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
764         self._stream.flush()
765
766     def stopTest(self, test):
767         super(TestProtocolClient, self).stopTest(test)
768         self._stream.flush()
769
770     def progress(self, offset, whence):
771         """Provide indication about the progress/length of the test run.
772
773         :param offset: Information about the number of tests remaining. If
774             whence is PROGRESS_CUR, then offset increases/decreases the
775             remaining test count. If whence is PROGRESS_SET, then offset
776             specifies exactly the remaining test count.
777         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
778             PROGRESS_POP.
779         """
780         if whence == PROGRESS_CUR and offset > -1:
781             prefix = self._progress_plus
782             offset = _b(str(offset))
783         elif whence == PROGRESS_PUSH:
784             prefix = self._empty_bytes
785             offset = self._progress_push
786         elif whence == PROGRESS_POP:
787             prefix = self._empty_bytes
788             offset = self._progress_pop
789         else:
790             prefix = self._empty_bytes
791             offset = _b(str(offset))
792         self._stream.write(self._progress_fmt + prefix + offset +
793             self._bytes_eol)
794
795     def tags(self, new_tags, gone_tags):
796         """Inform the client about tags added/removed from the stream."""
797         if not new_tags and not gone_tags:
798             return
799         tags = set([tag.encode('utf8') for tag in new_tags])
800         tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
801         tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
802         self._stream.write(tag_line)
803
804     def time(self, a_datetime):
805         """Inform the client of the time.
806
807         ":param datetime: A datetime.datetime object.
808         """
809         time = a_datetime.astimezone(iso8601.Utc())
810         self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
811             time.year, time.month, time.day, time.hour, time.minute,
812             time.second, time.microsecond)))
813
814     def _write_details(self, details):
815         """Output details to the stream.
816
817         :param details: An extended details dict for a test outcome.
818         """
819         self._stream.write(_b(" [ multipart\n"))
820         for name, content in sorted(details.items()):
821             self._stream.write(_b("Content-Type: %s/%s" %
822                 (content.content_type.type, content.content_type.subtype)))
823             parameters = content.content_type.parameters
824             if parameters:
825                 self._stream.write(_b(";"))
826                 param_strs = []
827                 for param, value in parameters.items():
828                     param_strs.append("%s=%s" % (param, value))
829                 self._stream.write(_b(",".join(param_strs)))
830             self._stream.write(_b("\n%s\n" % name))
831             encoder = chunked.Encoder(self._stream)
832             list(map(encoder.write, content.iter_bytes()))
833             encoder.close()
834
835     def done(self):
836         """Obey the testtools result.done() interface."""
837
838
839 def RemoteError(description=_u("")):
840     return (_StringException, _StringException(description), None)
841
842
843 class RemotedTestCase(unittest.TestCase):
844     """A class to represent test cases run in child processes.
845
846     Instances of this class are used to provide the Python test API a TestCase
847     that can be printed to the screen, introspected for metadata and so on.
848     However, as they are a simply a memoisation of a test that was actually
849     run in the past by a separate process, they cannot perform any interactive
850     actions.
851     """
852
853     def __eq__ (self, other):
854         try:
855             return self.__description == other.__description
856         except AttributeError:
857             return False
858
859     def __init__(self, description):
860         """Create a psuedo test case with description description."""
861         self.__description = description
862
863     def error(self, label):
864         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
865             label)
866
867     def setUp(self):
868         self.error("setUp")
869
870     def tearDown(self):
871         self.error("tearDown")
872
873     def shortDescription(self):
874         return self.__description
875
876     def id(self):
877         return "%s" % (self.__description,)
878
879     def __str__(self):
880         return "%s (%s)" % (self.__description, self._strclass())
881
882     def __repr__(self):
883         return "<%s description='%s'>" % \
884                (self._strclass(), self.__description)
885
886     def run(self, result=None):
887         if result is None: result = self.defaultTestResult()
888         result.startTest(self)
889         result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
890         result.stopTest(self)
891
892     def _strclass(self):
893         cls = self.__class__
894         return "%s.%s" % (cls.__module__, cls.__name__)
895
896
897 class ExecTestCase(unittest.TestCase):
898     """A test case which runs external scripts for test fixtures."""
899
900     def __init__(self, methodName='runTest'):
901         """Create an instance of the class that will use the named test
902            method when executed. Raises a ValueError if the instance does
903            not have a method with the specified name.
904         """
905         unittest.TestCase.__init__(self, methodName)
906         testMethod = getattr(self, methodName)
907         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
908                                testMethod.__doc__)
909
910     def countTestCases(self):
911         return 1
912
913     def run(self, result=None):
914         if result is None: result = self.defaultTestResult()
915         self._run(result)
916
917     def debug(self):
918         """Run the test without collecting errors in a TestResult"""
919         self._run(testresult.TestResult())
920
921     def _run(self, result):
922         protocol = TestProtocolServer(result)
923         process = subprocess.Popen(self.script, shell=True,
924             stdout=subprocess.PIPE)
925         make_stream_binary(process.stdout)
926         output = process.communicate()[0]
927         protocol.readFrom(BytesIO(output))
928
929
930 class IsolatedTestCase(unittest.TestCase):
931     """A TestCase which executes in a forked process.
932
933     Each test gets its own process, which has a performance overhead but will
934     provide excellent isolation from global state (such as django configs,
935     zope utilities and so on).
936     """
937
938     def run(self, result=None):
939         if result is None: result = self.defaultTestResult()
940         run_isolated(unittest.TestCase, self, result)
941
942
943 class IsolatedTestSuite(unittest.TestSuite):
944     """A TestSuite which runs its tests in a forked process.
945
946     This decorator that will fork() before running the tests and report the
947     results from the child process using a Subunit stream.  This is useful for
948     handling tests that mutate global state, or are testing C extensions that
949     could crash the VM.
950     """
951
952     def run(self, result=None):
953         if result is None: result = testresult.TestResult()
954         run_isolated(unittest.TestSuite, self, result)
955
956
957 def run_isolated(klass, self, result):
958     """Run a test suite or case in a subprocess, using the run method on klass.
959     """
960     c2pread, c2pwrite = os.pipe()
961     # fixme - error -> result
962     # now fork
963     pid = os.fork()
964     if pid == 0:
965         # Child
966         # Close parent's pipe ends
967         os.close(c2pread)
968         # Dup fds for child
969         os.dup2(c2pwrite, 1)
970         # Close pipe fds.
971         os.close(c2pwrite)
972
973         # at this point, sys.stdin is redirected, now we want
974         # to filter it to escape ]'s.
975         ### XXX: test and write that bit.
976         stream = os.fdopen(1, 'wb')
977         result = TestProtocolClient(stream)
978         klass.run(self, result)
979         stream.flush()
980         sys.stderr.flush()
981         # exit HARD, exit NOW.
982         os._exit(0)
983     else:
984         # Parent
985         # Close child pipe ends
986         os.close(c2pwrite)
987         # hookup a protocol engine
988         protocol = TestProtocolServer(result)
989         fileobj = os.fdopen(c2pread, 'rb')
990         protocol.readFrom(fileobj)
991         os.waitpid(pid, 0)
992         # TODO return code evaluation.
993     return result
994
995
996 def TAP2SubUnit(tap, output_stream):
997     """Filter a TAP pipe into a subunit pipe.
998
999     This should be invoked once per TAP script, as TAP scripts get
1000     mapped to a single runnable case with multiple components.
1001
1002     :param tap: A tap pipe/stream/file object - should emit unicode strings.
1003     :param subunit: A pipe/stream/file object to write subunit results to.
1004     :return: The exit code to exit with.
1005     """
1006     output = StreamResultToBytes(output_stream)
1007     UTF8_TEXT = 'text/plain; charset=UTF8'
1008     BEFORE_PLAN = 0
1009     AFTER_PLAN = 1
1010     SKIP_STREAM = 2
1011     state = BEFORE_PLAN
1012     plan_start = 1
1013     plan_stop = 0
1014     # Test data for the next test to emit
1015     test_name = None
1016     log = []
1017     result = None
1018     def missing_test(plan_start):
1019         output.status(test_id='test %d' % plan_start,
1020             test_status='fail', runnable=False, 
1021             mime_type=UTF8_TEXT, eof=True, file_name="tap meta",
1022             file_bytes=b"test missing from TAP output")
1023     def _emit_test():
1024         "write out a test"
1025         if test_name is None:
1026             return
1027         if log:
1028             log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log)
1029             mime_type = UTF8_TEXT
1030             file_name = 'tap comment'
1031             eof = True
1032         else:
1033             log_bytes = None
1034             mime_type = None
1035             file_name = None
1036             eof = True
1037         del log[:]
1038         output.status(test_id=test_name, test_status=result,
1039             file_bytes=log_bytes, mime_type=mime_type, eof=eof,
1040             file_name=file_name, runnable=False)
1041     for line in tap:
1042         if state == BEFORE_PLAN:
1043             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
1044             if match:
1045                 state = AFTER_PLAN
1046                 _, plan_stop, comment = match.groups()
1047                 plan_stop = int(plan_stop)
1048                 if plan_start > plan_stop and plan_stop == 0:
1049                     # skipped file
1050                     state = SKIP_STREAM
1051                     output.status(test_id='file skip', test_status='skip',
1052                         file_bytes=comment.encode('utf8'), eof=True,
1053                         file_name='tap comment')
1054                 continue
1055         # not a plan line, or have seen one before
1056         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1057         if match:
1058             # new test, emit current one.
1059             _emit_test()
1060             status, number, description, directive, directive_comment = match.groups()
1061             if status == 'ok':
1062                 result = 'success'
1063             else:
1064                 result = "fail"
1065             if description is None:
1066                 description = ''
1067             else:
1068                 description = ' ' + description
1069             if directive is not None:
1070                 if directive.upper() == 'TODO':
1071                     result = 'xfail'
1072                 elif directive.upper() == 'SKIP':
1073                     result = 'skip'
1074                 if directive_comment is not None:
1075                     log.append(directive_comment)
1076             if number is not None:
1077                 number = int(number)
1078                 while plan_start < number:
1079                     missing_test(plan_start)
1080                     plan_start += 1
1081             test_name = "test %d%s" % (plan_start, description)
1082             plan_start += 1
1083             continue
1084         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1085         if match:
1086             reason, = match.groups()
1087             if reason is None:
1088                 extra = ''
1089             else:
1090                 extra = ' %s' % reason
1091             _emit_test()
1092             test_name = "Bail out!%s" % extra
1093             result = "fail"
1094             state = SKIP_STREAM
1095             continue
1096         match = re.match("\#.*\n", line)
1097         if match:
1098             log.append(line[:-1])
1099             continue
1100         # Should look at buffering status and binding this to the prior result.
1101         output.status(file_bytes=line.encode('utf8'), file_name='stdout',
1102             mime_type=UTF8_TEXT)
1103     _emit_test()
1104     while plan_start <= plan_stop:
1105         # record missed tests
1106         missing_test(plan_start)
1107         plan_start += 1
1108     return 0
1109
1110
1111 def tag_stream(original, filtered, tags):
1112     """Alter tags on a stream.
1113
1114     :param original: The input stream.
1115     :param filtered: The output stream.
1116     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1117         '-TAG' commands.
1118
1119         A 'TAG' command will add the tag to the output stream,
1120         and override any existing '-TAG' command in that stream.
1121         Specifically:
1122          * A global 'tags: TAG' will be added to the start of the stream.
1123          * Any tags commands with -TAG will have the -TAG removed.
1124
1125         A '-TAG' command will remove the TAG command from the stream.
1126         Specifically:
1127          * A 'tags: -TAG' command will be added to the start of the stream.
1128          * Any 'tags: TAG' command will have 'TAG' removed from it.
1129         Additionally, any redundant tagging commands (adding a tag globally
1130         present, or removing a tag globally removed) are stripped as a
1131         by-product of the filtering.
1132     :return: 0
1133     """
1134     new_tags, gone_tags = tags_to_new_gone(tags)
1135     source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
1136     class Tagger(CopyStreamResult):
1137         def status(self, **kwargs):
1138             tags = kwargs.get('test_tags')
1139             if not tags:
1140                 tags = set()
1141             tags.update(new_tags)
1142             tags.difference_update(gone_tags)
1143             if tags:
1144                 kwargs['test_tags'] = tags
1145             else:
1146                 kwargs['test_tags'] = None
1147             super(Tagger, self).status(**kwargs)
1148     output = Tagger([StreamResultToBytes(filtered)])
1149     source.run(output)
1150     return 0
1151
1152
1153 class ProtocolTestCase(object):
1154     """Subunit wire protocol to unittest.TestCase adapter.
1155
1156     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1157     calling a ProtocolTestCase or invoking the run() method will make a 'test
1158     run' happen. The 'test run' will simply be a replay of the test activity
1159     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1160     and ``countTestCases`` methods are not supported because there isn't a
1161     sensible mapping for those methods.
1162
1163     # Get a stream (any object with a readline() method), in this case the
1164     # stream output by the example from ``subunit.TestProtocolClient``.
1165     stream = file('tests.log', 'rb')
1166     # Create a parser which will read from the stream and emit
1167     # activity to a unittest.TestResult when run() is called.
1168     suite = subunit.ProtocolTestCase(stream)
1169     # Create a result object to accept the contents of that stream.
1170     result = unittest._TextTestResult(sys.stdout)
1171     # 'run' the tests - process the stream and feed its contents to result.
1172     suite.run(result)
1173     stream.close()
1174
1175     :seealso: TestProtocolServer (the subunit wire protocol parser).
1176     """
1177
1178     def __init__(self, stream, passthrough=None, forward=None):
1179         """Create a ProtocolTestCase reading from stream.
1180
1181         :param stream: A filelike object which a subunit stream can be read
1182             from.
1183         :param passthrough: A stream pass non subunit input on to. If not
1184             supplied, the TestProtocolServer default is used.
1185         :param forward: A stream to pass subunit input on to. If not supplied
1186             subunit input is not forwarded.
1187         """
1188         stream = make_stream_binary(stream)
1189         self._stream = stream
1190         self._passthrough = passthrough
1191         if forward is not None:
1192             forward = make_stream_binary(forward)
1193         self._forward = forward
1194
1195     def __call__(self, result=None):
1196         return self.run(result)
1197
1198     def run(self, result=None):
1199         if result is None:
1200             result = self.defaultTestResult()
1201         protocol = TestProtocolServer(result, self._passthrough, self._forward)
1202         line = self._stream.readline()
1203         while line:
1204             protocol.lineReceived(line)
1205             line = self._stream.readline()
1206         protocol.lostConnection()
1207
1208
1209 class TestResultStats(testresult.TestResult):
1210     """A pyunit TestResult interface implementation for making statistics.
1211
1212     :ivar total_tests: The total tests seen.
1213     :ivar passed_tests: The tests that passed.
1214     :ivar failed_tests: The tests that failed.
1215     :ivar seen_tags: The tags seen across all tests.
1216     """
1217
1218     def __init__(self, stream):
1219         """Create a TestResultStats which outputs to stream."""
1220         testresult.TestResult.__init__(self)
1221         self._stream = stream
1222         self.failed_tests = 0
1223         self.skipped_tests = 0
1224         self.seen_tags = set()
1225
1226     @property
1227     def total_tests(self):
1228         return self.testsRun
1229
1230     def addError(self, test, err, details=None):
1231         self.failed_tests += 1
1232
1233     def addFailure(self, test, err, details=None):
1234         self.failed_tests += 1
1235
1236     def addSkip(self, test, reason, details=None):
1237         self.skipped_tests += 1
1238
1239     def formatStats(self):
1240         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1241         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1242         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1243         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1244         tags = sorted(self.seen_tags)
1245         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1246
1247     @property
1248     def passed_tests(self):
1249         return self.total_tests - self.failed_tests - self.skipped_tests
1250
1251     def tags(self, new_tags, gone_tags):
1252         """Accumulate the seen tags."""
1253         self.seen_tags.update(new_tags)
1254
1255     def wasSuccessful(self):
1256         """Tells whether or not this result was a success"""
1257         return self.failed_tests == 0
1258
1259
1260 def get_default_formatter():
1261     """Obtain the default formatter to write to.
1262
1263     :return: A file-like object.
1264     """
1265     formatter = os.getenv("SUBUNIT_FORMATTER")
1266     if formatter:
1267         return os.popen(formatter, "w")
1268     else:
1269         stream = sys.stdout
1270         if sys.version_info > (3, 0):
1271             if safe_hasattr(stream, 'buffer'):
1272                 stream = stream.buffer
1273         return stream
1274
1275
1276 def read_test_list(path):
1277     """Read a list of test ids from a file on disk.
1278
1279     :param path: Path to the file
1280     :return: Sequence of test ids
1281     """
1282     f = open(path, 'rb')
1283     try:
1284         return [l.rstrip("\n") for l in f.readlines()]
1285     finally:
1286         f.close()
1287
1288
1289 def make_stream_binary(stream):
1290     """Ensure that a stream will be binary safe. See _make_binary_on_windows.
1291     
1292     :return: A binary version of the same stream (some streams cannot be
1293         'fixed' but can be unwrapped).
1294     """
1295     try:
1296         fileno = stream.fileno()
1297     except (_UnsupportedOperation, AttributeError):
1298         pass
1299     else:
1300         _make_binary_on_windows(fileno)
1301     return _unwrap_text(stream)
1302
1303
1304 def _make_binary_on_windows(fileno):
1305     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1306     if sys.platform == "win32":
1307         import msvcrt
1308         msvcrt.setmode(fileno, os.O_BINARY)
1309
1310
1311 def _unwrap_text(stream):
1312     """Unwrap stream if it is a text stream to get the original buffer."""
1313     if sys.version_info > (3, 0):
1314         unicode_type = str
1315     else:
1316         unicode_type = unicode
1317     try:
1318         # Read streams
1319         if type(stream.read(0)) is unicode_type:
1320             return stream.buffer
1321     except (_UnsupportedOperation, IOError):
1322         # Cannot read from the stream: try via writes
1323         try:
1324             stream.write(_b(''))
1325         except TypeError:
1326             return stream.buffer
1327     return stream