2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from testtools.compat import all
24 from testtools.content import (
29 from subunit import iso8601
32 # NOT a TestResult, because we are implementing the interface, not inheriting
34 class TestResultDecorator(object):
35 """General pass-through decorator.
37 This provides a base that other TestResults can inherit from to
38 gain basic forwarding functionality. It also takes care of
39 handling the case where the target doesn't support newer methods
40 or features by degrading them.
43 # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44 # we should gut this and just use that.
46 def __init__(self, decorated):
47 """Create a TestResultDecorator forwarding to decorated."""
48 # Make every decorator degrade gracefully.
49 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
51 def startTest(self, test):
52 return self.decorated.startTest(test)
54 def startTestRun(self):
55 return self.decorated.startTestRun()
57 def stopTest(self, test):
58 return self.decorated.stopTest(test)
60 def stopTestRun(self):
61 return self.decorated.stopTestRun()
63 def addError(self, test, err=None, details=None):
64 return self.decorated.addError(test, err, details=details)
66 def addFailure(self, test, err=None, details=None):
67 return self.decorated.addFailure(test, err, details=details)
69 def addSuccess(self, test, details=None):
70 return self.decorated.addSuccess(test, details=details)
72 def addSkip(self, test, reason=None, details=None):
73 return self.decorated.addSkip(test, reason, details=details)
75 def addExpectedFailure(self, test, err=None, details=None):
76 return self.decorated.addExpectedFailure(test, err, details=details)
78 def addUnexpectedSuccess(self, test, details=None):
79 return self.decorated.addUnexpectedSuccess(test, details=details)
81 def progress(self, offset, whence):
82 return self.decorated.progress(offset, whence)
84 def wasSuccessful(self):
85 return self.decorated.wasSuccessful()
89 return self.decorated.shouldStop
92 return self.decorated.stop()
96 return self.decorated.testsRun
98 def tags(self, new_tags, gone_tags):
99 return self.decorated.tags(new_tags, gone_tags)
101 def time(self, a_datetime):
102 return self.decorated.time(a_datetime)
105 class HookedTestResultDecorator(TestResultDecorator):
106 """A TestResult which calls a hook on every event."""
108 def __init__(self, decorated):
109 self.super = super(HookedTestResultDecorator, self)
110 self.super.__init__(decorated)
112 def startTest(self, test):
114 return self.super.startTest(test)
116 def startTestRun(self):
118 return self.super.startTestRun()
120 def stopTest(self, test):
122 return self.super.stopTest(test)
124 def stopTestRun(self):
126 return self.super.stopTestRun()
128 def addError(self, test, err=None, details=None):
130 return self.super.addError(test, err, details=details)
132 def addFailure(self, test, err=None, details=None):
134 return self.super.addFailure(test, err, details=details)
136 def addSuccess(self, test, details=None):
138 return self.super.addSuccess(test, details=details)
140 def addSkip(self, test, reason=None, details=None):
142 return self.super.addSkip(test, reason, details=details)
144 def addExpectedFailure(self, test, err=None, details=None):
146 return self.super.addExpectedFailure(test, err, details=details)
148 def addUnexpectedSuccess(self, test, details=None):
150 return self.super.addUnexpectedSuccess(test, details=details)
152 def progress(self, offset, whence):
154 return self.super.progress(offset, whence)
156 def wasSuccessful(self):
158 return self.super.wasSuccessful()
161 def shouldStop(self):
163 return self.super.shouldStop
167 return self.super.stop()
169 def time(self, a_datetime):
171 return self.super.time(a_datetime)
174 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
175 """Decorate a TestResult to add time events to a test run.
177 By default this will cause a time event before every test event,
178 but if explicit time data is being provided by the test run, then
179 this decorator will turn itself off to prevent causing confusion.
182 def __init__(self, decorated):
184 super(AutoTimingTestResultDecorator, self).__init__(decorated)
186 def _before_event(self):
190 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
191 self.decorated.time(time)
193 def progress(self, offset, whence):
194 return self.decorated.progress(offset, whence)
197 def shouldStop(self):
198 return self.decorated.shouldStop
200 def time(self, a_datetime):
201 """Provide a timestamp for the current test activity.
203 :param a_datetime: If None, automatically add timestamps before every
204 event (this is the default behaviour if time() is not called at
205 all). If not None, pass the provided time onto the decorated
206 result object and disable automatic timestamps.
208 self._time = a_datetime
209 return self.decorated.time(a_datetime)
212 class TagsMixin(object):
217 def _clear_tags(self):
218 self._global_tags = set(), set()
219 self._test_tags = None
221 def _get_active_tags(self):
222 global_new, global_gone = self._global_tags
223 if self._test_tags is None:
224 return set(global_new)
225 test_new, test_gone = self._test_tags
226 return global_new.difference(test_gone).union(test_new)
228 def _get_current_scope(self):
230 return self._test_tags
231 return self._global_tags
233 def _flush_current_scope(self, tag_receiver):
234 new_tags, gone_tags = self._get_current_scope()
235 if new_tags or gone_tags:
236 tag_receiver.tags(new_tags, gone_tags)
238 self._test_tags = set(), set()
240 self._global_tags = set(), set()
242 def startTestRun(self):
245 def startTest(self, test):
246 self._test_tags = set(), set()
248 def stopTest(self, test):
249 self._test_tags = None
251 def tags(self, new_tags, gone_tags):
252 """Handle tag instructions.
254 Adds and removes tags as appropriate. If a test is currently running,
255 tags are not affected for subsequent tests.
257 :param new_tags: Tags to add,
258 :param gone_tags: Tags to remove.
260 current_new_tags, current_gone_tags = self._get_current_scope()
261 current_new_tags.update(new_tags)
262 current_new_tags.difference_update(gone_tags)
263 current_gone_tags.update(gone_tags)
264 current_gone_tags.difference_update(new_tags)
267 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
268 """Collapses many 'tags' calls into one where possible."""
270 def __init__(self, result):
271 super(TagCollapsingDecorator, self).__init__(result)
274 def _before_event(self):
275 self._flush_current_scope(self.decorated)
277 def tags(self, new_tags, gone_tags):
278 TagsMixin.tags(self, new_tags, gone_tags)
281 class TimeCollapsingDecorator(HookedTestResultDecorator):
282 """Only pass on the first and last of a consecutive sequence of times."""
284 def __init__(self, decorated):
285 super(TimeCollapsingDecorator, self).__init__(decorated)
286 self._last_received_time = None
287 self._last_sent_time = None
289 def _before_event(self):
290 if self._last_received_time is None:
292 if self._last_received_time != self._last_sent_time:
293 self.decorated.time(self._last_received_time)
294 self._last_sent_time = self._last_received_time
295 self._last_received_time = None
297 def time(self, a_time):
298 # Don't upcall, because we don't want to call _before_event, it's only
299 # for non-time events.
300 if self._last_received_time is None:
301 self.decorated.time(a_time)
302 self._last_sent_time = a_time
303 self._last_received_time = a_time
306 def and_predicates(predicates):
307 """Return a predicate that is true iff all predicates are true."""
308 # XXX: Should probably be in testtools to be better used by matchers. jml
309 return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
312 def make_tag_filter(with_tags, without_tags):
313 """Make a callback that checks tests against tags."""
315 with_tags = with_tags and set(with_tags) or None
316 without_tags = without_tags and set(without_tags) or None
318 def check_tags(test, outcome, err, details, tags):
319 if with_tags and not with_tags <= tags:
321 if without_tags and bool(without_tags & tags):
328 class _PredicateFilter(TestResultDecorator, TagsMixin):
330 def __init__(self, result, predicate):
331 super(_PredicateFilter, self).__init__(result)
333 self.decorated = TimeCollapsingDecorator(
334 TagCollapsingDecorator(self.decorated))
335 self._predicate = predicate
336 # The current test (for filtering tags)
337 self._current_test = None
338 # Has the current test been filtered (for outputting test tags)
339 self._current_test_filtered = None
340 # Calls to this result that we don't know whether to forward on yet.
341 self._buffered_calls = []
343 def filter_predicate(self, test, outcome, error, details):
344 return self._predicate(
345 test, outcome, error, details, self._get_active_tags())
347 def addError(self, test, err=None, details=None):
348 if (self.filter_predicate(test, 'error', err, details)):
349 self._buffered_calls.append(
350 ('addError', [test, err], {'details': details}))
354 def addFailure(self, test, err=None, details=None):
355 if (self.filter_predicate(test, 'failure', err, details)):
356 self._buffered_calls.append(
357 ('addFailure', [test, err], {'details': details}))
361 def addSkip(self, test, reason=None, details=None):
362 if (self.filter_predicate(test, 'skip', reason, details)):
363 self._buffered_calls.append(
364 ('addSkip', [test, reason], {'details': details}))
368 def addExpectedFailure(self, test, err=None, details=None):
369 if self.filter_predicate(test, 'expectedfailure', err, details):
370 self._buffered_calls.append(
371 ('addExpectedFailure', [test, err], {'details': details}))
375 def addUnexpectedSuccess(self, test, details=None):
376 self._buffered_calls.append(
377 ('addUnexpectedSuccess', [test], {'details': details}))
379 def addSuccess(self, test, details=None):
380 if (self.filter_predicate(test, 'success', None, details)):
381 self._buffered_calls.append(
382 ('addSuccess', [test], {'details': details}))
387 self._current_test_filtered = True
389 def startTest(self, test):
392 Not directly passed to the client, but used for handling of tags
395 TagsMixin.startTest(self, test)
396 self._current_test = test
397 self._current_test_filtered = False
398 self._buffered_calls.append(('startTest', [test], {}))
400 def stopTest(self, test):
403 Not directly passed to the client, but used for handling of tags
406 if not self._current_test_filtered:
407 for method, args, kwargs in self._buffered_calls:
408 getattr(self.decorated, method)(*args, **kwargs)
409 self.decorated.stopTest(test)
410 self._current_test = None
411 self._current_test_filtered = None
412 self._buffered_calls = []
413 TagsMixin.stopTest(self, test)
415 def tags(self, new_tags, gone_tags):
416 TagsMixin.tags(self, new_tags, gone_tags)
417 if self._current_test is not None:
418 self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
420 return super(_PredicateFilter, self).tags(new_tags, gone_tags)
422 def time(self, a_time):
423 return self.decorated.time(a_time)
425 def id_to_orig_id(self, id):
426 if id.startswith("subunit.RemotedTestCase."):
427 return id[len("subunit.RemotedTestCase."):]
431 class TestResultFilter(TestResultDecorator):
432 """A pyunit TestResult interface implementation which filters tests.
434 Tests that pass the filter are handed on to another TestResult instance
435 for further processing/reporting. To obtain the filtered results,
436 the other instance must be interrogated.
438 :ivar result: The result that tests are passed to after filtering.
439 :ivar filter_predicate: The callback run to decide whether to pass
443 def __init__(self, result, filter_error=False, filter_failure=False,
444 filter_success=True, filter_skip=False, filter_xfail=False,
445 filter_predicate=None, fixup_expected_failures=None):
446 """Create a FilterResult object filtering to result.
448 :param filter_error: Filter out errors.
449 :param filter_failure: Filter out failures.
450 :param filter_success: Filter out successful tests.
451 :param filter_skip: Filter out skipped tests.
452 :param filter_xfail: Filter out expected failure tests.
453 :param filter_predicate: A callable taking (test, outcome, err,
454 details, tags) and returning True if the result should be passed
455 through. err and details may be none if no error or extra
456 metadata is available. outcome is the name of the outcome such
457 as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
458 are still supported but should be updated to accept the tags
459 parameter for efficiency.
460 :param fixup_expected_failures: Set of test ids to consider known
466 lambda t, outcome, e, d, tags: outcome != 'error')
469 lambda t, outcome, e, d, tags: outcome != 'failure')
472 lambda t, outcome, e, d, tags: outcome != 'success')
475 lambda t, outcome, e, d, tags: outcome != 'skip')
478 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
479 if filter_predicate is not None:
480 def compat(test, outcome, error, details, tags):
481 # 0.0.7 and earlier did not support the 'tags' parameter.
483 return filter_predicate(
484 test, outcome, error, details, tags)
486 return filter_predicate(test, outcome, error, details)
487 predicates.append(compat)
488 predicate = and_predicates(predicates)
489 super(TestResultFilter, self).__init__(
490 _PredicateFilter(result, predicate))
491 if fixup_expected_failures is None:
492 self._fixup_expected_failures = frozenset()
494 self._fixup_expected_failures = fixup_expected_failures
496 def addError(self, test, err=None, details=None):
497 if self._failure_expected(test):
498 self.addExpectedFailure(test, err=err, details=details)
500 super(TestResultFilter, self).addError(
501 test, err=err, details=details)
503 def addFailure(self, test, err=None, details=None):
504 if self._failure_expected(test):
505 self.addExpectedFailure(test, err=err, details=details)
507 super(TestResultFilter, self).addFailure(
508 test, err=err, details=details)
510 def addSuccess(self, test, details=None):
511 if self._failure_expected(test):
512 self.addUnexpectedSuccess(test, details=details)
514 super(TestResultFilter, self).addSuccess(test, details=details)
516 def _failure_expected(self, test):
517 return (test.id() in self._fixup_expected_failures)
520 class TestIdPrintingResult(testtools.TestResult):
522 def __init__(self, stream, show_times=False):
523 """Create a FilterResult object outputting to stream."""
524 super(TestIdPrintingResult, self).__init__()
525 self._stream = stream
526 self.failed_tests = 0
528 self.show_times = show_times
530 self._test_duration = 0
532 def addError(self, test, err):
533 self.failed_tests += 1
536 def addFailure(self, test, err):
537 self.failed_tests += 1
540 def addSuccess(self, test):
543 def addSkip(self, test, reason=None, details=None):
546 def addUnexpectedSuccess(self, test, details=None):
547 self.failed_tests += 1
550 def addExpectedFailure(self, test, err=None, details=None):
553 def reportTest(self, test, duration):
555 seconds = duration.seconds
556 seconds += duration.days * 3600 * 24
557 seconds += duration.microseconds / 1000000.0
558 self._stream.write(test.id() + ' %0.3f\n' % seconds)
560 self._stream.write(test.id() + '\n')
562 def startTest(self, test):
563 self._start_time = self._time()
565 def stopTest(self, test):
566 test_duration = self._time() - self._start_time
567 self.reportTest(self._test, test_duration)
569 def time(self, time):
575 def wasSuccessful(self):
576 "Tells whether or not this result was a success"
577 return self.failed_tests == 0
580 class TestByTestResult(testtools.TestResult):
581 """Call something every time a test completes."""
583 # XXX: In testtools since lp:testtools r249. Once that's released, just
586 def __init__(self, on_test):
587 """Construct a ``TestByTestResult``.
589 :param on_test: A callable that take a test case, a status (one of
590 "success", "failure", "error", "skip", or "xfail"), a start time
591 (a ``datetime`` with timezone), a stop time, an iterable of tags,
592 and a details dict. Is called at the end of each test (i.e. on
593 ``stopTest``) with the accumulated values for that test.
595 super(TestByTestResult, self).__init__()
596 self._on_test = on_test
598 def startTest(self, test):
599 super(TestByTestResult, self).startTest(test)
600 self._start_time = self._now()
601 # There's no supported (i.e. tested) behaviour that relies on these
602 # being set, but it makes me more comfortable all the same. -- jml
605 self._stop_time = None
607 def stopTest(self, test):
608 self._stop_time = self._now()
609 super(TestByTestResult, self).stopTest(test)
613 start_time=self._start_time,
614 stop_time=self._stop_time,
615 # current_tags is new in testtools 0.9.13.
616 tags=getattr(self, 'current_tags', None),
617 details=self._details)
619 def _err_to_details(self, test, err, details):
622 return {'traceback': TracebackContent(err, test)}
624 def addSuccess(self, test, details=None):
625 super(TestByTestResult, self).addSuccess(test)
626 self._status = 'success'
627 self._details = details
629 def addFailure(self, test, err=None, details=None):
630 super(TestByTestResult, self).addFailure(test, err, details)
631 self._status = 'failure'
632 self._details = self._err_to_details(test, err, details)
634 def addError(self, test, err=None, details=None):
635 super(TestByTestResult, self).addError(test, err, details)
636 self._status = 'error'
637 self._details = self._err_to_details(test, err, details)
639 def addSkip(self, test, reason=None, details=None):
640 super(TestByTestResult, self).addSkip(test, reason, details)
641 self._status = 'skip'
643 details = {'reason': text_content(reason)}
645 # XXX: What if details already has 'reason' key?
646 details['reason'] = text_content(reason)
647 self._details = details
649 def addExpectedFailure(self, test, err=None, details=None):
650 super(TestByTestResult, self).addExpectedFailure(test, err, details)
651 self._status = 'xfail'
652 self._details = self._err_to_details(test, err, details)
654 def addUnexpectedSuccess(self, test, details=None):
655 super(TestByTestResult, self).addUnexpectedSuccess(test, details)
656 self._status = 'success'
657 self._details = details
660 class CsvResult(TestByTestResult):
662 def __init__(self, stream):
663 super(CsvResult, self).__init__(self._on_test)
664 self._write_row = csv.writer(stream).writerow
666 def _on_test(self, test, status, start_time, stop_time, tags, details):
667 self._write_row([test.id(), status, start_time, stop_time])
669 def startTestRun(self):
670 super(CsvResult, self).startTestRun()
671 self._write_row(['test', 'status', 'start_time', 'stop_time'])