2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from subunit import iso8601
26 # NOT a TestResult, because we are implementing the interface, not inheriting
28 class TestResultDecorator(object):
29 """General pass-through decorator.
31 This provides a base that other TestResults can inherit from to
32 gain basic forwarding functionality. It also takes care of
33 handling the case where the target doesn't support newer methods
34 or features by degrading them.
37 def __init__(self, decorated):
38 """Create a TestResultDecorator forwarding to decorated."""
39 # Make every decorator degrade gracefully.
40 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
42 def startTest(self, test):
43 return self.decorated.startTest(test)
45 def startTestRun(self):
46 return self.decorated.startTestRun()
48 def stopTest(self, test):
49 return self.decorated.stopTest(test)
51 def stopTestRun(self):
52 return self.decorated.stopTestRun()
54 def addError(self, test, err=None, details=None):
55 return self.decorated.addError(test, err, details=details)
57 def addFailure(self, test, err=None, details=None):
58 return self.decorated.addFailure(test, err, details=details)
60 def addSuccess(self, test, details=None):
61 return self.decorated.addSuccess(test, details=details)
63 def addSkip(self, test, reason=None, details=None):
64 return self.decorated.addSkip(test, reason, details=details)
66 def addExpectedFailure(self, test, err=None, details=None):
67 return self.decorated.addExpectedFailure(test, err, details=details)
69 def addUnexpectedSuccess(self, test, details=None):
70 return self.decorated.addUnexpectedSuccess(test, details=details)
72 def progress(self, offset, whence):
73 return self.decorated.progress(offset, whence)
75 def wasSuccessful(self):
76 return self.decorated.wasSuccessful()
80 return self.decorated.shouldStop
83 return self.decorated.stop()
87 return self.decorated.testsRun
89 def tags(self, new_tags, gone_tags):
90 return self.decorated.tags(new_tags, gone_tags)
92 def time(self, a_datetime):
93 return self.decorated.time(a_datetime)
96 class HookedTestResultDecorator(TestResultDecorator):
97 """A TestResult which calls a hook on every event."""
99 def __init__(self, decorated):
100 self.super = super(HookedTestResultDecorator, self)
101 self.super.__init__(decorated)
103 def startTest(self, test):
105 return self.super.startTest(test)
107 def startTestRun(self):
109 return self.super.startTestRun()
111 def stopTest(self, test):
113 return self.super.stopTest(test)
115 def stopTestRun(self):
117 return self.super.stopTestRun()
119 def addError(self, test, err=None, details=None):
121 return self.super.addError(test, err, details=details)
123 def addFailure(self, test, err=None, details=None):
125 return self.super.addFailure(test, err, details=details)
127 def addSuccess(self, test, details=None):
129 return self.super.addSuccess(test, details=details)
131 def addSkip(self, test, reason=None, details=None):
133 return self.super.addSkip(test, reason, details=details)
135 def addExpectedFailure(self, test, err=None, details=None):
137 return self.super.addExpectedFailure(test, err, details=details)
139 def addUnexpectedSuccess(self, test, details=None):
141 return self.super.addUnexpectedSuccess(test, details=details)
143 def progress(self, offset, whence):
145 return self.super.progress(offset, whence)
147 def wasSuccessful(self):
149 return self.super.wasSuccessful()
152 def shouldStop(self):
154 return self.super.shouldStop
158 return self.super.stop()
160 def time(self, a_datetime):
162 return self.super.time(a_datetime)
165 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
166 """Decorate a TestResult to add time events to a test run.
168 By default this will cause a time event before every test event,
169 but if explicit time data is being provided by the test run, then
170 this decorator will turn itself off to prevent causing confusion.
173 def __init__(self, decorated):
175 super(AutoTimingTestResultDecorator, self).__init__(decorated)
177 def _before_event(self):
181 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
182 self.decorated.time(time)
184 def progress(self, offset, whence):
185 return self.decorated.progress(offset, whence)
188 def shouldStop(self):
189 return self.decorated.shouldStop
191 def time(self, a_datetime):
192 """Provide a timestamp for the current test activity.
194 :param a_datetime: If None, automatically add timestamps before every
195 event (this is the default behaviour if time() is not called at
196 all). If not None, pass the provided time onto the decorated
197 result object and disable automatic timestamps.
199 self._time = a_datetime
200 return self.decorated.time(a_datetime)
203 class TagCollapsingDecorator(TestResultDecorator):
204 """Collapses many 'tags' calls into one where possible."""
206 def __init__(self, result):
207 super(TagCollapsingDecorator, self).__init__(result)
208 # The (new, gone) tags for the current test.
209 self._current_test_tags = None
211 def startTest(self, test):
214 Not directly passed to the client, but used for handling of tags
217 self.decorated.startTest(test)
218 self._current_test_tags = set(), set()
220 def stopTest(self, test):
223 Not directly passed to the client, but used for handling of tags
226 # Tags to output for this test.
227 if self._current_test_tags[0] or self._current_test_tags[1]:
228 self.decorated.tags(*self._current_test_tags)
229 self.decorated.stopTest(test)
230 self._current_test_tags = None
232 def tags(self, new_tags, gone_tags):
233 """Handle tag instructions.
235 Adds and removes tags as appropriate. If a test is currently running,
236 tags are not affected for subsequent tests.
238 :param new_tags: Tags to add,
239 :param gone_tags: Tags to remove.
241 if self._current_test_tags is not None:
242 # gather the tags until the test stops.
243 self._current_test_tags[0].update(new_tags)
244 self._current_test_tags[0].difference_update(gone_tags)
245 self._current_test_tags[1].update(gone_tags)
246 self._current_test_tags[1].difference_update(new_tags)
248 return self.decorated.tags(new_tags, gone_tags)
251 class TimeCollapsingDecorator(HookedTestResultDecorator):
252 """Only pass on the first and last of a consecutive sequence of times."""
254 def __init__(self, decorated):
255 super(TimeCollapsingDecorator, self).__init__(decorated)
256 self._last_received_time = None
257 self._last_sent_time = None
259 def _before_event(self):
260 if self._last_received_time is None:
262 if self._last_received_time != self._last_sent_time:
263 self.decorated.time(self._last_received_time)
264 self._last_sent_time = self._last_received_time
265 self._last_received_time = None
267 def time(self, a_time):
268 # Don't upcall, because we don't want to call _before_event, it's only
269 # for non-time events.
270 if self._last_received_time is None:
271 self.decorated.time(a_time)
272 self._last_sent_time = a_time
273 self._last_received_time = a_time
277 """Return True if all of 'bools' are True. False otherwise."""
284 class TestResultFilter(TestResultDecorator):
285 """A pyunit TestResult interface implementation which filters tests.
287 Tests that pass the filter are handed on to another TestResult instance
288 for further processing/reporting. To obtain the filtered results,
289 the other instance must be interrogated.
291 :ivar result: The result that tests are passed to after filtering.
292 :ivar filter_predicate: The callback run to decide whether to pass
296 def __init__(self, result, filter_error=False, filter_failure=False,
297 filter_success=True, filter_skip=False,
298 filter_predicate=None, fixup_expected_failures=None):
299 """Create a FilterResult object filtering to result.
301 :param filter_error: Filter out errors.
302 :param filter_failure: Filter out failures.
303 :param filter_success: Filter out successful tests.
304 :param filter_skip: Filter out skipped tests.
305 :param filter_predicate: A callable taking (test, outcome, err,
306 details) and returning True if the result should be passed
307 through. err and details may be none if no error or extra
308 metadata is available. outcome is the name of the outcome such
309 as 'success' or 'failure'.
310 :param fixup_expected_failures: Set of test ids to consider known
313 super(TestResultFilter, self).__init__(result)
314 self.decorated = TimeCollapsingDecorator(
315 TagCollapsingDecorator(self.decorated))
318 predicates.append(lambda t, outcome, e, d: outcome != 'error')
320 predicates.append(lambda t, outcome, e, d: outcome != 'failure')
322 predicates.append(lambda t, outcome, e, d: outcome != 'success')
324 predicates.append(lambda t, outcome, e, d: outcome != 'skip')
325 if filter_predicate is not None:
326 predicates.append(filter_predicate)
327 self.filter_predicate = (
328 lambda test, outcome, err, details:
329 all_true(p(test, outcome, err, details) for p in predicates))
330 # The current test (for filtering tags)
331 self._current_test = None
332 # Has the current test been filtered (for outputting test tags)
333 self._current_test_filtered = None
334 # Calls to this result that we don't know whether to forward on yet.
335 self._buffered_calls = []
336 if fixup_expected_failures is None:
337 self._fixup_expected_failures = frozenset()
339 self._fixup_expected_failures = fixup_expected_failures
341 def addError(self, test, err=None, details=None):
342 if (self.filter_predicate(test, 'error', err, details)):
343 if self._failure_expected(test):
344 self._buffered_calls.append(
345 ('addExpectedFailure', [test, err], {'details': details}))
347 self._buffered_calls.append(
348 ('addError', [test, err], {'details': details}))
352 def addFailure(self, test, err=None, details=None):
353 if (self.filter_predicate(test, 'failure', err, details)):
354 if self._failure_expected(test):
355 self._buffered_calls.append(
356 ('addExpectedFailure', [test, err], {'details': details}))
358 self._buffered_calls.append(
359 ('addFailure', [test, err], {'details': details}))
363 def addSkip(self, test, reason=None, details=None):
364 if (self.filter_predicate(test, 'skip', reason, details)):
365 self._buffered_calls.append(
366 ('addSkip', [test, reason], {'details': details}))
370 def addSuccess(self, test, details=None):
371 if (self.filter_predicate(test, 'success', None, details)):
372 if self._failure_expected(test):
373 self._buffered_calls.append(
374 ('addUnexpectedSuccess', [test], {'details': details}))
376 self._buffered_calls.append(
377 ('addSuccess', [test], {'details': details}))
381 def addExpectedFailure(self, test, err=None, details=None):
382 if self.filter_predicate(test, 'expectedfailure', err, details):
383 self._buffered_calls.append(
384 ('addExpectedFailure', [test, err], {'details': details}))
388 def addUnexpectedSuccess(self, test, details=None):
389 self._buffered_calls.append(
390 ('addUnexpectedSuccess', [test], {'details': details}))
393 self._current_test_filtered = True
395 def _failure_expected(self, test):
396 return (test.id() in self._fixup_expected_failures)
398 def startTest(self, test):
401 Not directly passed to the client, but used for handling of tags
404 self._current_test = test
405 self._current_test_filtered = False
406 self._buffered_calls.append(('startTest', [test], {}))
408 def stopTest(self, test):
411 Not directly passed to the client, but used for handling of tags
414 if not self._current_test_filtered:
415 # Tags to output for this test.
416 for method, args, kwargs in self._buffered_calls:
417 getattr(self.decorated, method)(*args, **kwargs)
418 self.decorated.stopTest(test)
419 self._current_test = None
420 self._current_test_filtered = None
421 self._buffered_calls = []
423 def time(self, a_time):
424 if self._current_test is not None:
425 self._buffered_calls.append(('time', [a_time], {}))
427 return self.decorated.time(a_time)
429 def id_to_orig_id(self, id):
430 if id.startswith("subunit.RemotedTestCase."):
431 return id[len("subunit.RemotedTestCase."):]
435 class TestIdPrintingResult(testtools.TestResult):
437 def __init__(self, stream, show_times=False):
438 """Create a FilterResult object outputting to stream."""
439 super(TestIdPrintingResult, self).__init__()
440 self._stream = stream
441 self.failed_tests = 0
443 self.show_times = show_times
445 self._test_duration = 0
447 def addError(self, test, err):
448 self.failed_tests += 1
451 def addFailure(self, test, err):
452 self.failed_tests += 1
455 def addSuccess(self, test):
458 def addSkip(self, test, reason=None, details=None):
461 def addUnexpectedSuccess(self, test, details=None):
462 self.failed_tests += 1
465 def addExpectedFailure(self, test, err=None, details=None):
468 def reportTest(self, test, duration):
470 seconds = duration.seconds
471 seconds += duration.days * 3600 * 24
472 seconds += duration.microseconds / 1000000.0
473 self._stream.write(test.id() + ' %0.3f\n' % seconds)
475 self._stream.write(test.id() + '\n')
477 def startTest(self, test):
478 self._start_time = self._time()
480 def stopTest(self, test):
481 test_duration = self._time() - self._start_time
482 self.reportTest(self._test, test_duration)
484 def time(self, time):
490 def wasSuccessful(self):
491 "Tells whether or not this result was a success"
492 return self.failed_tests == 0