2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from testtools.content import (
28 from subunit import iso8601
31 # NOT a TestResult, because we are implementing the interface, not inheriting
33 class TestResultDecorator(object):
34 """General pass-through decorator.
36 This provides a base that other TestResults can inherit from to
37 gain basic forwarding functionality. It also takes care of
38 handling the case where the target doesn't support newer methods
39 or features by degrading them.
42 def __init__(self, decorated):
43 """Create a TestResultDecorator forwarding to decorated."""
44 # Make every decorator degrade gracefully.
45 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
47 def startTest(self, test):
48 return self.decorated.startTest(test)
50 def startTestRun(self):
51 return self.decorated.startTestRun()
53 def stopTest(self, test):
54 return self.decorated.stopTest(test)
56 def stopTestRun(self):
57 return self.decorated.stopTestRun()
59 def addError(self, test, err=None, details=None):
60 return self.decorated.addError(test, err, details=details)
62 def addFailure(self, test, err=None, details=None):
63 return self.decorated.addFailure(test, err, details=details)
65 def addSuccess(self, test, details=None):
66 return self.decorated.addSuccess(test, details=details)
68 def addSkip(self, test, reason=None, details=None):
69 return self.decorated.addSkip(test, reason, details=details)
71 def addExpectedFailure(self, test, err=None, details=None):
72 return self.decorated.addExpectedFailure(test, err, details=details)
74 def addUnexpectedSuccess(self, test, details=None):
75 return self.decorated.addUnexpectedSuccess(test, details=details)
77 def progress(self, offset, whence):
78 return self.decorated.progress(offset, whence)
80 def wasSuccessful(self):
81 return self.decorated.wasSuccessful()
85 return self.decorated.shouldStop
88 return self.decorated.stop()
92 return self.decorated.testsRun
94 def tags(self, new_tags, gone_tags):
95 return self.decorated.tags(new_tags, gone_tags)
97 def time(self, a_datetime):
98 return self.decorated.time(a_datetime)
101 class HookedTestResultDecorator(TestResultDecorator):
102 """A TestResult which calls a hook on every event."""
104 def __init__(self, decorated):
105 self.super = super(HookedTestResultDecorator, self)
106 self.super.__init__(decorated)
108 def startTest(self, test):
110 return self.super.startTest(test)
112 def startTestRun(self):
114 return self.super.startTestRun()
116 def stopTest(self, test):
118 return self.super.stopTest(test)
120 def stopTestRun(self):
122 return self.super.stopTestRun()
124 def addError(self, test, err=None, details=None):
126 return self.super.addError(test, err, details=details)
128 def addFailure(self, test, err=None, details=None):
130 return self.super.addFailure(test, err, details=details)
132 def addSuccess(self, test, details=None):
134 return self.super.addSuccess(test, details=details)
136 def addSkip(self, test, reason=None, details=None):
138 return self.super.addSkip(test, reason, details=details)
140 def addExpectedFailure(self, test, err=None, details=None):
142 return self.super.addExpectedFailure(test, err, details=details)
144 def addUnexpectedSuccess(self, test, details=None):
146 return self.super.addUnexpectedSuccess(test, details=details)
148 def progress(self, offset, whence):
150 return self.super.progress(offset, whence)
152 def wasSuccessful(self):
154 return self.super.wasSuccessful()
157 def shouldStop(self):
159 return self.super.shouldStop
163 return self.super.stop()
165 def time(self, a_datetime):
167 return self.super.time(a_datetime)
170 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
171 """Decorate a TestResult to add time events to a test run.
173 By default this will cause a time event before every test event,
174 but if explicit time data is being provided by the test run, then
175 this decorator will turn itself off to prevent causing confusion.
178 def __init__(self, decorated):
180 super(AutoTimingTestResultDecorator, self).__init__(decorated)
182 def _before_event(self):
186 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
187 self.decorated.time(time)
189 def progress(self, offset, whence):
190 return self.decorated.progress(offset, whence)
193 def shouldStop(self):
194 return self.decorated.shouldStop
196 def time(self, a_datetime):
197 """Provide a timestamp for the current test activity.
199 :param a_datetime: If None, automatically add timestamps before every
200 event (this is the default behaviour if time() is not called at
201 all). If not None, pass the provided time onto the decorated
202 result object and disable automatic timestamps.
204 self._time = a_datetime
205 return self.decorated.time(a_datetime)
208 class TagCollapsingDecorator(TestResultDecorator):
209 """Collapses many 'tags' calls into one where possible."""
211 def __init__(self, result):
212 super(TagCollapsingDecorator, self).__init__(result)
213 # The (new, gone) tags for the current test.
214 self._current_test_tags = None
216 def startTest(self, test):
219 Not directly passed to the client, but used for handling of tags
222 self.decorated.startTest(test)
223 self._current_test_tags = set(), set()
225 def stopTest(self, test):
228 Not directly passed to the client, but used for handling of tags
231 # Tags to output for this test.
232 if self._current_test_tags[0] or self._current_test_tags[1]:
233 self.decorated.tags(*self._current_test_tags)
234 self.decorated.stopTest(test)
235 self._current_test_tags = None
237 def tags(self, new_tags, gone_tags):
238 """Handle tag instructions.
240 Adds and removes tags as appropriate. If a test is currently running,
241 tags are not affected for subsequent tests.
243 :param new_tags: Tags to add,
244 :param gone_tags: Tags to remove.
246 if self._current_test_tags is not None:
247 # gather the tags until the test stops.
248 self._current_test_tags[0].update(new_tags)
249 self._current_test_tags[0].difference_update(gone_tags)
250 self._current_test_tags[1].update(gone_tags)
251 self._current_test_tags[1].difference_update(new_tags)
253 return self.decorated.tags(new_tags, gone_tags)
256 class TimeCollapsingDecorator(HookedTestResultDecorator):
257 """Only pass on the first and last of a consecutive sequence of times."""
259 def __init__(self, decorated):
260 super(TimeCollapsingDecorator, self).__init__(decorated)
261 self._last_received_time = None
262 self._last_sent_time = None
264 def _before_event(self):
265 if self._last_received_time is None:
267 if self._last_received_time != self._last_sent_time:
268 self.decorated.time(self._last_received_time)
269 self._last_sent_time = self._last_received_time
270 self._last_received_time = None
272 def time(self, a_time):
273 # Don't upcall, because we don't want to call _before_event, it's only
274 # for non-time events.
275 if self._last_received_time is None:
276 self.decorated.time(a_time)
277 self._last_sent_time = a_time
278 self._last_received_time = a_time
282 """Return True if all of 'bools' are True. False otherwise."""
289 class TestResultFilter(TestResultDecorator):
290 """A pyunit TestResult interface implementation which filters tests.
292 Tests that pass the filter are handed on to another TestResult instance
293 for further processing/reporting. To obtain the filtered results,
294 the other instance must be interrogated.
296 :ivar result: The result that tests are passed to after filtering.
297 :ivar filter_predicate: The callback run to decide whether to pass
301 def __init__(self, result, filter_error=False, filter_failure=False,
302 filter_success=True, filter_skip=False, filter_xfail=False,
303 filter_predicate=None, fixup_expected_failures=None):
304 """Create a FilterResult object filtering to result.
306 :param filter_error: Filter out errors.
307 :param filter_failure: Filter out failures.
308 :param filter_success: Filter out successful tests.
309 :param filter_skip: Filter out skipped tests.
310 :param filter_xfail: Filter out expected failure tests.
311 :param filter_predicate: A callable taking (test, outcome, err,
312 details) and returning True if the result should be passed
313 through. err and details may be none if no error or extra
314 metadata is available. outcome is the name of the outcome such
315 as 'success' or 'failure'.
316 :param fixup_expected_failures: Set of test ids to consider known
319 super(TestResultFilter, self).__init__(result)
320 self.decorated = TimeCollapsingDecorator(
321 TagCollapsingDecorator(self.decorated))
324 predicates.append(lambda t, outcome, e, d: outcome != 'error')
326 predicates.append(lambda t, outcome, e, d: outcome != 'failure')
328 predicates.append(lambda t, outcome, e, d: outcome != 'success')
330 predicates.append(lambda t, outcome, e, d: outcome != 'skip')
332 predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
333 if filter_predicate is not None:
334 predicates.append(filter_predicate)
335 self.filter_predicate = (
336 lambda test, outcome, err, details:
337 all_true(p(test, outcome, err, details) for p in predicates))
338 # The current test (for filtering tags)
339 self._current_test = None
340 # Has the current test been filtered (for outputting test tags)
341 self._current_test_filtered = None
342 # Calls to this result that we don't know whether to forward on yet.
343 self._buffered_calls = []
344 if fixup_expected_failures is None:
345 self._fixup_expected_failures = frozenset()
347 self._fixup_expected_failures = fixup_expected_failures
349 def addError(self, test, err=None, details=None):
350 if (self.filter_predicate(test, 'error', err, details)):
351 if self._failure_expected(test):
352 self._buffered_calls.append(
353 ('addExpectedFailure', [test, err], {'details': details}))
355 self._buffered_calls.append(
356 ('addError', [test, err], {'details': details}))
360 def addFailure(self, test, err=None, details=None):
361 if (self.filter_predicate(test, 'failure', err, details)):
362 if self._failure_expected(test):
363 self._buffered_calls.append(
364 ('addExpectedFailure', [test, err], {'details': details}))
366 self._buffered_calls.append(
367 ('addFailure', [test, err], {'details': details}))
371 def addSkip(self, test, reason=None, details=None):
372 if (self.filter_predicate(test, 'skip', reason, details)):
373 self._buffered_calls.append(
374 ('addSkip', [test, reason], {'details': details}))
378 def addSuccess(self, test, details=None):
379 if (self.filter_predicate(test, 'success', None, details)):
380 if self._failure_expected(test):
381 self._buffered_calls.append(
382 ('addUnexpectedSuccess', [test], {'details': details}))
384 self._buffered_calls.append(
385 ('addSuccess', [test], {'details': details}))
389 def addExpectedFailure(self, test, err=None, details=None):
390 if self.filter_predicate(test, 'expectedfailure', err, details):
391 self._buffered_calls.append(
392 ('addExpectedFailure', [test, err], {'details': details}))
396 def addUnexpectedSuccess(self, test, details=None):
397 self._buffered_calls.append(
398 ('addUnexpectedSuccess', [test], {'details': details}))
401 self._current_test_filtered = True
403 def _failure_expected(self, test):
404 return (test.id() in self._fixup_expected_failures)
406 def startTest(self, test):
409 Not directly passed to the client, but used for handling of tags
412 self._current_test = test
413 self._current_test_filtered = False
414 self._buffered_calls.append(('startTest', [test], {}))
416 def stopTest(self, test):
419 Not directly passed to the client, but used for handling of tags
422 if not self._current_test_filtered:
423 # Tags to output for this test.
424 for method, args, kwargs in self._buffered_calls:
425 getattr(self.decorated, method)(*args, **kwargs)
426 self.decorated.stopTest(test)
427 self._current_test = None
428 self._current_test_filtered = None
429 self._buffered_calls = []
431 def time(self, a_time):
432 if self._current_test is not None:
433 self._buffered_calls.append(('time', [a_time], {}))
435 return self.decorated.time(a_time)
437 def id_to_orig_id(self, id):
438 if id.startswith("subunit.RemotedTestCase."):
439 return id[len("subunit.RemotedTestCase."):]
443 class TestIdPrintingResult(testtools.TestResult):
445 def __init__(self, stream, show_times=False):
446 """Create a FilterResult object outputting to stream."""
447 super(TestIdPrintingResult, self).__init__()
448 self._stream = stream
449 self.failed_tests = 0
451 self.show_times = show_times
453 self._test_duration = 0
455 def addError(self, test, err):
456 self.failed_tests += 1
459 def addFailure(self, test, err):
460 self.failed_tests += 1
463 def addSuccess(self, test):
466 def addSkip(self, test, reason=None, details=None):
469 def addUnexpectedSuccess(self, test, details=None):
470 self.failed_tests += 1
473 def addExpectedFailure(self, test, err=None, details=None):
476 def reportTest(self, test, duration):
478 seconds = duration.seconds
479 seconds += duration.days * 3600 * 24
480 seconds += duration.microseconds / 1000000.0
481 self._stream.write(test.id() + ' %0.3f\n' % seconds)
483 self._stream.write(test.id() + '\n')
485 def startTest(self, test):
486 self._start_time = self._time()
488 def stopTest(self, test):
489 test_duration = self._time() - self._start_time
490 self.reportTest(self._test, test_duration)
492 def time(self, time):
498 def wasSuccessful(self):
499 "Tells whether or not this result was a success"
500 return self.failed_tests == 0
503 class TestByTestResult(testtools.TestResult):
504 """Call something every time a test completes."""
506 # XXX: Arguably belongs in testtools.
508 def __init__(self, on_test):
509 """Construct a ``TestByTestResult``.
511 :param on_test: A callable that take a test case, a status (one of
512 "success", "failure", "error", "skip", or "xfail"), a start time
513 (a ``datetime`` with timezone), a stop time, an iterable of tags,
514 and a details dict. Is called at the end of each test (i.e. on
515 ``stopTest``) with the accumulated values for that test.
517 super(TestByTestResult, self).__init__()
518 self._on_test = on_test
520 def startTest(self, test):
521 super(TestByTestResult, self).startTest(test)
522 self._start_time = self._now()
523 # There's no supported (i.e. tested) behaviour that relies on these
524 # being set, but it makes me more comfortable all the same. -- jml
527 self._stop_time = None
529 def stopTest(self, test):
530 self._stop_time = self._now()
531 super(TestByTestResult, self).stopTest(test)
535 start_time=self._start_time,
536 stop_time=self._stop_time,
537 # current_tags is new in testtools 0.9.13.
538 tags=getattr(self, 'current_tags', None),
539 details=self._details)
541 def _err_to_details(self, test, err, details):
544 return {'traceback': TracebackContent(err, test)}
546 def addSuccess(self, test, details=None):
547 super(TestByTestResult, self).addSuccess(test)
548 self._status = 'success'
549 self._details = details
551 def addFailure(self, test, err=None, details=None):
552 super(TestByTestResult, self).addFailure(test, err, details)
553 self._status = 'failure'
554 self._details = self._err_to_details(test, err, details)
556 def addError(self, test, err=None, details=None):
557 super(TestByTestResult, self).addError(test, err, details)
558 self._status = 'error'
559 self._details = self._err_to_details(test, err, details)
561 def addSkip(self, test, reason=None, details=None):
562 super(TestByTestResult, self).addSkip(test, reason, details)
563 self._status = 'skip'
565 details = {'reason': text_content(reason)}
567 # XXX: What if details already has 'reason' key?
568 details['reason'] = text_content(reason)
569 self._details = details
571 def addExpectedFailure(self, test, err=None, details=None):
572 super(TestByTestResult, self).addExpectedFailure(test, err, details)
573 self._status = 'xfail'
574 self._details = self._err_to_details(test, err, details)
576 def addUnexpectedSuccess(self, test, details=None):
577 super(TestByTestResult, self).addUnexpectedSuccess(test, details)
578 self._status = 'success'
579 self._details = details
582 class CsvResult(TestByTestResult):
584 def __init__(self, stream):
585 super(CsvResult, self).__init__(self._on_test)
586 self._write_row = csv.writer(stream).writerow
588 def _on_test(self, test, status, start_time, stop_time, tags, details):
589 self._write_row([test.id(), status, start_time, stop_time])
591 def startTestRun(self):
592 super(CsvResult, self).startTestRun()
593 self._write_row(['test', 'status', 'start_time', 'stop_time'])