2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from testtools.content import (
28 from subunit import iso8601
31 # NOT a TestResult, because we are implementing the interface, not inheriting
33 class TestResultDecorator(object):
34 """General pass-through decorator.
36 This provides a base that other TestResults can inherit from to
37 gain basic forwarding functionality. It also takes care of
38 handling the case where the target doesn't support newer methods
39 or features by degrading them.
42 def __init__(self, decorated):
43 """Create a TestResultDecorator forwarding to decorated."""
44 # Make every decorator degrade gracefully.
45 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
47 def startTest(self, test):
48 return self.decorated.startTest(test)
50 def startTestRun(self):
51 return self.decorated.startTestRun()
53 def stopTest(self, test):
54 return self.decorated.stopTest(test)
56 def stopTestRun(self):
57 return self.decorated.stopTestRun()
59 def addError(self, test, err=None, details=None):
60 return self.decorated.addError(test, err, details=details)
62 def addFailure(self, test, err=None, details=None):
63 return self.decorated.addFailure(test, err, details=details)
65 def addSuccess(self, test, details=None):
66 return self.decorated.addSuccess(test, details=details)
68 def addSkip(self, test, reason=None, details=None):
69 return self.decorated.addSkip(test, reason, details=details)
71 def addExpectedFailure(self, test, err=None, details=None):
72 return self.decorated.addExpectedFailure(test, err, details=details)
74 def addUnexpectedSuccess(self, test, details=None):
75 return self.decorated.addUnexpectedSuccess(test, details=details)
77 def progress(self, offset, whence):
78 return self.decorated.progress(offset, whence)
80 def wasSuccessful(self):
81 return self.decorated.wasSuccessful()
85 return self.decorated.shouldStop
88 return self.decorated.stop()
92 return self.decorated.testsRun
94 def tags(self, new_tags, gone_tags):
95 return self.decorated.tags(new_tags, gone_tags)
97 def time(self, a_datetime):
98 return self.decorated.time(a_datetime)
101 class HookedTestResultDecorator(TestResultDecorator):
102 """A TestResult which calls a hook on every event."""
104 def __init__(self, decorated):
105 self.super = super(HookedTestResultDecorator, self)
106 self.super.__init__(decorated)
108 def startTest(self, test):
110 return self.super.startTest(test)
112 def startTestRun(self):
114 return self.super.startTestRun()
116 def stopTest(self, test):
118 return self.super.stopTest(test)
120 def stopTestRun(self):
122 return self.super.stopTestRun()
124 def addError(self, test, err=None, details=None):
126 return self.super.addError(test, err, details=details)
128 def addFailure(self, test, err=None, details=None):
130 return self.super.addFailure(test, err, details=details)
132 def addSuccess(self, test, details=None):
134 return self.super.addSuccess(test, details=details)
136 def addSkip(self, test, reason=None, details=None):
138 return self.super.addSkip(test, reason, details=details)
140 def addExpectedFailure(self, test, err=None, details=None):
142 return self.super.addExpectedFailure(test, err, details=details)
144 def addUnexpectedSuccess(self, test, details=None):
146 return self.super.addUnexpectedSuccess(test, details=details)
148 def progress(self, offset, whence):
150 return self.super.progress(offset, whence)
152 def wasSuccessful(self):
154 return self.super.wasSuccessful()
157 def shouldStop(self):
159 return self.super.shouldStop
163 return self.super.stop()
165 def time(self, a_datetime):
167 return self.super.time(a_datetime)
170 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
171 """Decorate a TestResult to add time events to a test run.
173 By default this will cause a time event before every test event,
174 but if explicit time data is being provided by the test run, then
175 this decorator will turn itself off to prevent causing confusion.
178 def __init__(self, decorated):
180 super(AutoTimingTestResultDecorator, self).__init__(decorated)
182 def _before_event(self):
186 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
187 self.decorated.time(time)
189 def progress(self, offset, whence):
190 return self.decorated.progress(offset, whence)
193 def shouldStop(self):
194 return self.decorated.shouldStop
196 def time(self, a_datetime):
197 """Provide a timestamp for the current test activity.
199 :param a_datetime: If None, automatically add timestamps before every
200 event (this is the default behaviour if time() is not called at
201 all). If not None, pass the provided time onto the decorated
202 result object and disable automatic timestamps.
204 self._time = a_datetime
205 return self.decorated.time(a_datetime)
208 class TagCollapsingDecorator(HookedTestResultDecorator):
209 """Collapses many 'tags' calls into one where possible."""
211 def __init__(self, result):
212 super(TagCollapsingDecorator, self).__init__(result)
213 # The (new, gone) tags for the current test.
214 self._current_test_tags = None
216 def startTest(self, test):
219 Not directly passed to the client, but used for handling of tags
222 self.decorated.startTest(test)
223 self._current_test_tags = set(), set()
225 def stopTest(self, test):
226 super(TagCollapsingDecorator, self).stopTest(test)
227 self._current_test_tags = None
229 def _before_event(self):
230 if not self._current_test_tags:
232 if self._current_test_tags[0] or self._current_test_tags[1]:
233 self.decorated.tags(*self._current_test_tags)
234 self._current_test_tags = set(), set()
236 def tags(self, new_tags, gone_tags):
237 """Handle tag instructions.
239 Adds and removes tags as appropriate. If a test is currently running,
240 tags are not affected for subsequent tests.
242 :param new_tags: Tags to add,
243 :param gone_tags: Tags to remove.
245 if self._current_test_tags is not None:
246 # gather the tags until the test stops.
247 self._current_test_tags[0].update(new_tags)
248 self._current_test_tags[0].difference_update(gone_tags)
249 self._current_test_tags[1].update(gone_tags)
250 self._current_test_tags[1].difference_update(new_tags)
252 return self.decorated.tags(new_tags, gone_tags)
255 class TimeCollapsingDecorator(HookedTestResultDecorator):
256 """Only pass on the first and last of a consecutive sequence of times."""
258 def __init__(self, decorated):
259 super(TimeCollapsingDecorator, self).__init__(decorated)
260 self._last_received_time = None
261 self._last_sent_time = None
263 def _before_event(self):
264 if self._last_received_time is None:
266 if self._last_received_time != self._last_sent_time:
267 self.decorated.time(self._last_received_time)
268 self._last_sent_time = self._last_received_time
269 self._last_received_time = None
271 def time(self, a_time):
272 # Don't upcall, because we don't want to call _before_event, it's only
273 # for non-time events.
274 if self._last_received_time is None:
275 self.decorated.time(a_time)
276 self._last_sent_time = a_time
277 self._last_received_time = a_time
281 """Return True if all of 'bools' are True. False otherwise."""
288 class TestResultFilter(TestResultDecorator):
289 """A pyunit TestResult interface implementation which filters tests.
291 Tests that pass the filter are handed on to another TestResult instance
292 for further processing/reporting. To obtain the filtered results,
293 the other instance must be interrogated.
295 :ivar result: The result that tests are passed to after filtering.
296 :ivar filter_predicate: The callback run to decide whether to pass
300 def __init__(self, result, filter_error=False, filter_failure=False,
301 filter_success=True, filter_skip=False, filter_xfail=False,
302 filter_predicate=None, fixup_expected_failures=None):
303 """Create a FilterResult object filtering to result.
305 :param filter_error: Filter out errors.
306 :param filter_failure: Filter out failures.
307 :param filter_success: Filter out successful tests.
308 :param filter_skip: Filter out skipped tests.
309 :param filter_xfail: Filter out expected failure tests.
310 :param filter_predicate: A callable taking (test, outcome, err,
311 details) and returning True if the result should be passed
312 through. err and details may be none if no error or extra
313 metadata is available. outcome is the name of the outcome such
314 as 'success' or 'failure'.
315 :param fixup_expected_failures: Set of test ids to consider known
318 super(TestResultFilter, self).__init__(result)
319 self.decorated = TimeCollapsingDecorator(
320 TagCollapsingDecorator(self.decorated))
323 predicates.append(lambda t, outcome, e, d: outcome != 'error')
325 predicates.append(lambda t, outcome, e, d: outcome != 'failure')
327 predicates.append(lambda t, outcome, e, d: outcome != 'success')
329 predicates.append(lambda t, outcome, e, d: outcome != 'skip')
331 predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
332 if filter_predicate is not None:
333 predicates.append(filter_predicate)
334 self.filter_predicate = (
335 lambda test, outcome, err, details:
336 all_true(p(test, outcome, err, details) for p in predicates))
337 # The current test (for filtering tags)
338 self._current_test = None
339 # Has the current test been filtered (for outputting test tags)
340 self._current_test_filtered = None
341 # Calls to this result that we don't know whether to forward on yet.
342 self._buffered_calls = []
343 if fixup_expected_failures is None:
344 self._fixup_expected_failures = frozenset()
346 self._fixup_expected_failures = fixup_expected_failures
348 def addError(self, test, err=None, details=None):
349 if (self.filter_predicate(test, 'error', err, details)):
350 if self._failure_expected(test):
351 self._buffered_calls.append(
352 ('addExpectedFailure', [test, err], {'details': details}))
354 self._buffered_calls.append(
355 ('addError', [test, err], {'details': details}))
359 def addFailure(self, test, err=None, details=None):
360 if (self.filter_predicate(test, 'failure', err, details)):
361 if self._failure_expected(test):
362 self._buffered_calls.append(
363 ('addExpectedFailure', [test, err], {'details': details}))
365 self._buffered_calls.append(
366 ('addFailure', [test, err], {'details': details}))
370 def addSkip(self, test, reason=None, details=None):
371 if (self.filter_predicate(test, 'skip', reason, details)):
372 self._buffered_calls.append(
373 ('addSkip', [test, reason], {'details': details}))
377 def addSuccess(self, test, details=None):
378 if (self.filter_predicate(test, 'success', None, details)):
379 if self._failure_expected(test):
380 self._buffered_calls.append(
381 ('addUnexpectedSuccess', [test], {'details': details}))
383 self._buffered_calls.append(
384 ('addSuccess', [test], {'details': details}))
388 def addExpectedFailure(self, test, err=None, details=None):
389 if self.filter_predicate(test, 'expectedfailure', err, details):
390 self._buffered_calls.append(
391 ('addExpectedFailure', [test, err], {'details': details}))
395 def addUnexpectedSuccess(self, test, details=None):
396 self._buffered_calls.append(
397 ('addUnexpectedSuccess', [test], {'details': details}))
400 self._current_test_filtered = True
402 def _failure_expected(self, test):
403 return (test.id() in self._fixup_expected_failures)
405 def startTest(self, test):
408 Not directly passed to the client, but used for handling of tags
411 self._current_test = test
412 self._current_test_filtered = False
413 self._buffered_calls.append(('startTest', [test], {}))
415 def stopTest(self, test):
418 Not directly passed to the client, but used for handling of tags
421 if not self._current_test_filtered:
422 # Tags to output for this test.
423 for method, args, kwargs in self._buffered_calls:
424 getattr(self.decorated, method)(*args, **kwargs)
425 self.decorated.stopTest(test)
426 self._current_test = None
427 self._current_test_filtered = None
428 self._buffered_calls = []
430 def time(self, a_time):
431 if self._current_test is not None:
432 self._buffered_calls.append(('time', [a_time], {}))
434 return self.decorated.time(a_time)
436 def id_to_orig_id(self, id):
437 if id.startswith("subunit.RemotedTestCase."):
438 return id[len("subunit.RemotedTestCase."):]
442 class TestIdPrintingResult(testtools.TestResult):
444 def __init__(self, stream, show_times=False):
445 """Create a FilterResult object outputting to stream."""
446 super(TestIdPrintingResult, self).__init__()
447 self._stream = stream
448 self.failed_tests = 0
450 self.show_times = show_times
452 self._test_duration = 0
454 def addError(self, test, err):
455 self.failed_tests += 1
458 def addFailure(self, test, err):
459 self.failed_tests += 1
462 def addSuccess(self, test):
465 def addSkip(self, test, reason=None, details=None):
468 def addUnexpectedSuccess(self, test, details=None):
469 self.failed_tests += 1
472 def addExpectedFailure(self, test, err=None, details=None):
475 def reportTest(self, test, duration):
477 seconds = duration.seconds
478 seconds += duration.days * 3600 * 24
479 seconds += duration.microseconds / 1000000.0
480 self._stream.write(test.id() + ' %0.3f\n' % seconds)
482 self._stream.write(test.id() + '\n')
484 def startTest(self, test):
485 self._start_time = self._time()
487 def stopTest(self, test):
488 test_duration = self._time() - self._start_time
489 self.reportTest(self._test, test_duration)
491 def time(self, time):
497 def wasSuccessful(self):
498 "Tells whether or not this result was a success"
499 return self.failed_tests == 0
502 class TestByTestResult(testtools.TestResult):
503 """Call something every time a test completes."""
505 # XXX: Arguably belongs in testtools.
507 def __init__(self, on_test):
508 """Construct a ``TestByTestResult``.
510 :param on_test: A callable that take a test case, a status (one of
511 "success", "failure", "error", "skip", or "xfail"), a start time
512 (a ``datetime`` with timezone), a stop time, an iterable of tags,
513 and a details dict. Is called at the end of each test (i.e. on
514 ``stopTest``) with the accumulated values for that test.
516 super(TestByTestResult, self).__init__()
517 self._on_test = on_test
519 def startTest(self, test):
520 super(TestByTestResult, self).startTest(test)
521 self._start_time = self._now()
522 # There's no supported (i.e. tested) behaviour that relies on these
523 # being set, but it makes me more comfortable all the same. -- jml
526 self._stop_time = None
528 def stopTest(self, test):
529 self._stop_time = self._now()
530 super(TestByTestResult, self).stopTest(test)
534 start_time=self._start_time,
535 stop_time=self._stop_time,
536 # current_tags is new in testtools 0.9.13.
537 tags=getattr(self, 'current_tags', None),
538 details=self._details)
540 def _err_to_details(self, test, err, details):
543 return {'traceback': TracebackContent(err, test)}
545 def addSuccess(self, test, details=None):
546 super(TestByTestResult, self).addSuccess(test)
547 self._status = 'success'
548 self._details = details
550 def addFailure(self, test, err=None, details=None):
551 super(TestByTestResult, self).addFailure(test, err, details)
552 self._status = 'failure'
553 self._details = self._err_to_details(test, err, details)
555 def addError(self, test, err=None, details=None):
556 super(TestByTestResult, self).addError(test, err, details)
557 self._status = 'error'
558 self._details = self._err_to_details(test, err, details)
560 def addSkip(self, test, reason=None, details=None):
561 super(TestByTestResult, self).addSkip(test, reason, details)
562 self._status = 'skip'
564 details = {'reason': text_content(reason)}
566 # XXX: What if details already has 'reason' key?
567 details['reason'] = text_content(reason)
568 self._details = details
570 def addExpectedFailure(self, test, err=None, details=None):
571 super(TestByTestResult, self).addExpectedFailure(test, err, details)
572 self._status = 'xfail'
573 self._details = self._err_to_details(test, err, details)
575 def addUnexpectedSuccess(self, test, details=None):
576 super(TestByTestResult, self).addUnexpectedSuccess(test, details)
577 self._status = 'success'
578 self._details = details
581 class CsvResult(TestByTestResult):
583 def __init__(self, stream):
584 super(CsvResult, self).__init__(self._on_test)
585 self._write_row = csv.writer(stream).writerow
587 def _on_test(self, test, status, start_time, stop_time, tags, details):
588 self._write_row([test.id(), status, start_time, stop_time])
590 def startTestRun(self):
591 super(CsvResult, self).startTestRun()
592 self._write_row(['test', 'status', 'start_time', 'stop_time'])