2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from subunit import iso8601
26 # NOT a TestResult, because we are implementing the interface, not inheriting
28 class TestResultDecorator(object):
29 """General pass-through decorator.
31 This provides a base that other TestResults can inherit from to
32 gain basic forwarding functionality. It also takes care of
33 handling the case where the target doesn't support newer methods
34 or features by degrading them.
37 def __init__(self, decorated):
38 """Create a TestResultDecorator forwarding to decorated."""
39 # Make every decorator degrade gracefully.
40 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
42 def startTest(self, test):
43 return self.decorated.startTest(test)
45 def startTestRun(self):
46 return self.decorated.startTestRun()
48 def stopTest(self, test):
49 return self.decorated.stopTest(test)
51 def stopTestRun(self):
52 return self.decorated.stopTestRun()
54 def addError(self, test, err=None, details=None):
55 return self.decorated.addError(test, err, details=details)
57 def addFailure(self, test, err=None, details=None):
58 return self.decorated.addFailure(test, err, details=details)
60 def addSuccess(self, test, details=None):
61 return self.decorated.addSuccess(test, details=details)
63 def addSkip(self, test, reason=None, details=None):
64 return self.decorated.addSkip(test, reason, details=details)
66 def addExpectedFailure(self, test, err=None, details=None):
67 return self.decorated.addExpectedFailure(test, err, details=details)
69 def addUnexpectedSuccess(self, test, details=None):
70 return self.decorated.addUnexpectedSuccess(test, details=details)
72 def progress(self, offset, whence):
73 return self.decorated.progress(offset, whence)
75 def wasSuccessful(self):
76 return self.decorated.wasSuccessful()
80 return self.decorated.shouldStop
83 return self.decorated.stop()
87 return self.decorated.testsRun
89 def tags(self, new_tags, gone_tags):
90 return self.decorated.tags(new_tags, gone_tags)
92 def time(self, a_datetime):
93 return self.decorated.time(a_datetime)
96 class HookedTestResultDecorator(TestResultDecorator):
97 """A TestResult which calls a hook on every event."""
99 def __init__(self, decorated):
100 self.super = super(HookedTestResultDecorator, self)
101 self.super.__init__(decorated)
103 def startTest(self, test):
105 return self.super.startTest(test)
107 def startTestRun(self):
109 return self.super.startTestRun()
111 def stopTest(self, test):
113 return self.super.stopTest(test)
115 def stopTestRun(self):
117 return self.super.stopTestRun()
119 def addError(self, test, err=None, details=None):
121 return self.super.addError(test, err, details=details)
123 def addFailure(self, test, err=None, details=None):
125 return self.super.addFailure(test, err, details=details)
127 def addSuccess(self, test, details=None):
129 return self.super.addSuccess(test, details=details)
131 def addSkip(self, test, reason=None, details=None):
133 return self.super.addSkip(test, reason, details=details)
135 def addExpectedFailure(self, test, err=None, details=None):
137 return self.super.addExpectedFailure(test, err, details=details)
139 def addUnexpectedSuccess(self, test, details=None):
141 return self.super.addUnexpectedSuccess(test, details=details)
143 def progress(self, offset, whence):
145 return self.super.progress(offset, whence)
147 def wasSuccessful(self):
149 return self.super.wasSuccessful()
152 def shouldStop(self):
154 return self.super.shouldStop
158 return self.super.stop()
160 def time(self, a_datetime):
162 return self.super.time(a_datetime)
165 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
166 """Decorate a TestResult to add time events to a test run.
168 By default this will cause a time event before every test event,
169 but if explicit time data is being provided by the test run, then
170 this decorator will turn itself off to prevent causing confusion.
173 def __init__(self, decorated):
175 super(AutoTimingTestResultDecorator, self).__init__(decorated)
177 def _before_event(self):
181 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
182 self.decorated.time(time)
184 def progress(self, offset, whence):
185 return self.decorated.progress(offset, whence)
188 def shouldStop(self):
189 return self.decorated.shouldStop
191 def time(self, a_datetime):
192 """Provide a timestamp for the current test activity.
194 :param a_datetime: If None, automatically add timestamps before every
195 event (this is the default behaviour if time() is not called at
196 all). If not None, pass the provided time onto the decorated
197 result object and disable automatic timestamps.
199 self._time = a_datetime
200 return self.decorated.time(a_datetime)
203 class TagCollapsingDecorator(TestResultDecorator):
204 """Collapses many 'tags' calls into one where possible."""
206 def __init__(self, result):
207 super(TagCollapsingDecorator, self).__init__(result)
208 # The (new, gone) tags for the current test.
209 self._current_test_tags = None
211 def startTest(self, test):
214 Not directly passed to the client, but used for handling of tags
217 self.decorated.startTest(test)
218 self._current_test_tags = set(), set()
220 def stopTest(self, test):
223 Not directly passed to the client, but used for handling of tags
226 # Tags to output for this test.
227 if self._current_test_tags[0] or self._current_test_tags[1]:
228 self.decorated.tags(*self._current_test_tags)
229 self.decorated.stopTest(test)
230 self._current_test_tags = None
232 def tags(self, new_tags, gone_tags):
233 """Handle tag instructions.
235 Adds and removes tags as appropriate. If a test is currently running,
236 tags are not affected for subsequent tests.
238 :param new_tags: Tags to add,
239 :param gone_tags: Tags to remove.
241 if self._current_test_tags is not None:
242 # gather the tags until the test stops.
243 self._current_test_tags[0].update(new_tags)
244 self._current_test_tags[0].difference_update(gone_tags)
245 self._current_test_tags[1].update(gone_tags)
246 self._current_test_tags[1].difference_update(new_tags)
248 return self.decorated.tags(new_tags, gone_tags)
251 class TimeCollapsingDecorator(HookedTestResultDecorator):
252 """Only pass on the first and last of a consecutive sequence of times."""
254 def __init__(self, decorated):
255 super(TimeCollapsingDecorator, self).__init__(decorated)
256 self._last_received_time = None
257 self._last_sent_time = None
259 def _before_event(self):
260 if self._last_received_time is None:
262 if self._last_received_time != self._last_sent_time:
263 self.decorated.time(self._last_received_time)
264 self._last_sent_time = self._last_received_time
265 self._last_received_time = None
267 def time(self, a_time):
268 # Don't upcall, because we don't want to call _before_event, it's only
269 # for non-time events.
270 if self._last_received_time is None:
271 self.decorated.time(a_time)
272 self._last_sent_time = a_time
273 self._last_received_time = a_time
277 """Return True if all of 'bools' are True. False otherwise."""
284 class TestResultFilter(TestResultDecorator):
285 """A pyunit TestResult interface implementation which filters tests.
287 Tests that pass the filter are handed on to another TestResult instance
288 for further processing/reporting. To obtain the filtered results,
289 the other instance must be interrogated.
291 :ivar result: The result that tests are passed to after filtering.
292 :ivar filter_predicate: The callback run to decide whether to pass
296 def __init__(self, result, filter_error=False, filter_failure=False,
297 filter_success=True, filter_skip=False, filter_xfail=False,
298 filter_predicate=None, fixup_expected_failures=None):
299 """Create a FilterResult object filtering to result.
301 :param filter_error: Filter out errors.
302 :param filter_failure: Filter out failures.
303 :param filter_success: Filter out successful tests.
304 :param filter_skip: Filter out skipped tests.
305 :param filter_xfail: Filter out expected failure tests.
306 :param filter_predicate: A callable taking (test, outcome, err,
307 details) and returning True if the result should be passed
308 through. err and details may be none if no error or extra
309 metadata is available. outcome is the name of the outcome such
310 as 'success' or 'failure'.
311 :param fixup_expected_failures: Set of test ids to consider known
314 super(TestResultFilter, self).__init__(result)
315 self.decorated = TimeCollapsingDecorator(
316 TagCollapsingDecorator(self.decorated))
319 predicates.append(lambda t, outcome, e, d: outcome != 'error')
321 predicates.append(lambda t, outcome, e, d: outcome != 'failure')
323 predicates.append(lambda t, outcome, e, d: outcome != 'success')
325 predicates.append(lambda t, outcome, e, d: outcome != 'skip')
327 predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
328 if filter_predicate is not None:
329 predicates.append(filter_predicate)
330 self.filter_predicate = (
331 lambda test, outcome, err, details:
332 all_true(p(test, outcome, err, details) for p in predicates))
333 # The current test (for filtering tags)
334 self._current_test = None
335 # Has the current test been filtered (for outputting test tags)
336 self._current_test_filtered = None
337 # Calls to this result that we don't know whether to forward on yet.
338 self._buffered_calls = []
339 if fixup_expected_failures is None:
340 self._fixup_expected_failures = frozenset()
342 self._fixup_expected_failures = fixup_expected_failures
344 def addError(self, test, err=None, details=None):
345 if (self.filter_predicate(test, 'error', err, details)):
346 if self._failure_expected(test):
347 self._buffered_calls.append(
348 ('addExpectedFailure', [test, err], {'details': details}))
350 self._buffered_calls.append(
351 ('addError', [test, err], {'details': details}))
355 def addFailure(self, test, err=None, details=None):
356 if (self.filter_predicate(test, 'failure', err, details)):
357 if self._failure_expected(test):
358 self._buffered_calls.append(
359 ('addExpectedFailure', [test, err], {'details': details}))
361 self._buffered_calls.append(
362 ('addFailure', [test, err], {'details': details}))
366 def addSkip(self, test, reason=None, details=None):
367 if (self.filter_predicate(test, 'skip', reason, details)):
368 self._buffered_calls.append(
369 ('addSkip', [test, reason], {'details': details}))
373 def addSuccess(self, test, details=None):
374 if (self.filter_predicate(test, 'success', None, details)):
375 if self._failure_expected(test):
376 self._buffered_calls.append(
377 ('addUnexpectedSuccess', [test], {'details': details}))
379 self._buffered_calls.append(
380 ('addSuccess', [test], {'details': details}))
384 def addExpectedFailure(self, test, err=None, details=None):
385 if self.filter_predicate(test, 'expectedfailure', err, details):
386 self._buffered_calls.append(
387 ('addExpectedFailure', [test, err], {'details': details}))
391 def addUnexpectedSuccess(self, test, details=None):
392 self._buffered_calls.append(
393 ('addUnexpectedSuccess', [test], {'details': details}))
396 self._current_test_filtered = True
398 def _failure_expected(self, test):
399 return (test.id() in self._fixup_expected_failures)
401 def startTest(self, test):
404 Not directly passed to the client, but used for handling of tags
407 self._current_test = test
408 self._current_test_filtered = False
409 self._buffered_calls.append(('startTest', [test], {}))
411 def stopTest(self, test):
414 Not directly passed to the client, but used for handling of tags
417 if not self._current_test_filtered:
418 # Tags to output for this test.
419 for method, args, kwargs in self._buffered_calls:
420 getattr(self.decorated, method)(*args, **kwargs)
421 self.decorated.stopTest(test)
422 self._current_test = None
423 self._current_test_filtered = None
424 self._buffered_calls = []
426 def time(self, a_time):
427 if self._current_test is not None:
428 self._buffered_calls.append(('time', [a_time], {}))
430 return self.decorated.time(a_time)
432 def id_to_orig_id(self, id):
433 if id.startswith("subunit.RemotedTestCase."):
434 return id[len("subunit.RemotedTestCase."):]
438 class TestIdPrintingResult(testtools.TestResult):
440 def __init__(self, stream, show_times=False):
441 """Create a FilterResult object outputting to stream."""
442 super(TestIdPrintingResult, self).__init__()
443 self._stream = stream
444 self.failed_tests = 0
446 self.show_times = show_times
448 self._test_duration = 0
450 def addError(self, test, err):
451 self.failed_tests += 1
454 def addFailure(self, test, err):
455 self.failed_tests += 1
458 def addSuccess(self, test):
461 def addSkip(self, test, reason=None, details=None):
464 def addUnexpectedSuccess(self, test, details=None):
465 self.failed_tests += 1
468 def addExpectedFailure(self, test, err=None, details=None):
471 def reportTest(self, test, duration):
473 seconds = duration.seconds
474 seconds += duration.days * 3600 * 24
475 seconds += duration.microseconds / 1000000.0
476 self._stream.write(test.id() + ' %0.3f\n' % seconds)
478 self._stream.write(test.id() + '\n')
480 def startTest(self, test):
481 self._start_time = self._time()
483 def stopTest(self, test):
484 test_duration = self._time() - self._start_time
485 self.reportTest(self._test, test_duration)
487 def time(self, time):
493 def wasSuccessful(self):
494 "Tells whether or not this result was a success"
495 return self.failed_tests == 0