2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
25 # NOT a TestResult, because we are implementing the interface, not inheriting
27 class TestResultDecorator(object):
28 """General pass-through decorator.
30 This provides a base that other TestResults can inherit from to
31 gain basic forwarding functionality. It also takes care of
32 handling the case where the target doesn't support newer methods
33 or features by degrading them.
36 def __init__(self, decorated):
37 """Create a TestResultDecorator forwarding to decorated."""
38 # Make every decorator degrade gracefully.
39 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
41 def startTest(self, test):
42 return self.decorated.startTest(test)
44 def startTestRun(self):
45 return self.decorated.startTestRun()
47 def stopTest(self, test):
48 return self.decorated.stopTest(test)
50 def stopTestRun(self):
51 return self.decorated.stopTestRun()
53 def addError(self, test, err=None, details=None):
54 return self.decorated.addError(test, err, details=details)
56 def addFailure(self, test, err=None, details=None):
57 return self.decorated.addFailure(test, err, details=details)
59 def addSuccess(self, test, details=None):
60 return self.decorated.addSuccess(test, details=details)
62 def addSkip(self, test, reason=None, details=None):
63 return self.decorated.addSkip(test, reason, details=details)
65 def addExpectedFailure(self, test, err=None, details=None):
66 return self.decorated.addExpectedFailure(test, err, details=details)
68 def addUnexpectedSuccess(self, test, details=None):
69 return self.decorated.addUnexpectedSuccess(test, details=details)
71 def progress(self, offset, whence):
72 return self.decorated.progress(offset, whence)
74 def wasSuccessful(self):
75 return self.decorated.wasSuccessful()
79 return self.decorated.shouldStop
82 return self.decorated.stop()
84 def tags(self, new_tags, gone_tags):
85 return self.decorated.tags(new_tags, gone_tags)
87 def time(self, a_datetime):
88 return self.decorated.time(a_datetime)
91 class HookedTestResultDecorator(TestResultDecorator):
92 """A TestResult which calls a hook on every event."""
94 def __init__(self, decorated):
95 self.super = super(HookedTestResultDecorator, self)
96 self.super.__init__(decorated)
98 def startTest(self, test):
100 return self.super.startTest(test)
102 def startTestRun(self):
104 return self.super.startTestRun()
106 def stopTest(self, test):
108 return self.super.stopTest(test)
110 def stopTestRun(self):
112 return self.super.stopTestRun()
114 def addError(self, test, err=None, details=None):
116 return self.super.addError(test, err, details=details)
118 def addFailure(self, test, err=None, details=None):
120 return self.super.addFailure(test, err, details=details)
122 def addSuccess(self, test, details=None):
124 return self.super.addSuccess(test, details=details)
126 def addSkip(self, test, reason=None, details=None):
128 return self.super.addSkip(test, reason, details=details)
130 def addExpectedFailure(self, test, err=None, details=None):
132 return self.super.addExpectedFailure(test, err, details=details)
134 def addUnexpectedSuccess(self, test, details=None):
136 return self.super.addUnexpectedSuccess(test, details=details)
138 def progress(self, offset, whence):
140 return self.super.progress(offset, whence)
142 def wasSuccessful(self):
144 return self.super.wasSuccessful()
147 def shouldStop(self):
149 return self.super.shouldStop
153 return self.super.stop()
155 def time(self, a_datetime):
157 return self.super.time(a_datetime)
160 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
161 """Decorate a TestResult to add time events to a test run.
163 By default this will cause a time event before every test event,
164 but if explicit time data is being provided by the test run, then
165 this decorator will turn itself off to prevent causing confusion.
168 def __init__(self, decorated):
170 super(AutoTimingTestResultDecorator, self).__init__(decorated)
172 def _before_event(self):
176 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
177 self.decorated.time(time)
179 def progress(self, offset, whence):
180 return self.decorated.progress(offset, whence)
183 def shouldStop(self):
184 return self.decorated.shouldStop
186 def time(self, a_datetime):
187 """Provide a timestamp for the current test activity.
189 :param a_datetime: If None, automatically add timestamps before every
190 event (this is the default behaviour if time() is not called at
191 all). If not None, pass the provided time onto the decorated
192 result object and disable automatic timestamps.
194 self._time = a_datetime
195 return self.decorated.time(a_datetime)
198 class TagCollapsingDecorator(TestResultDecorator):
199 """Collapses many 'tags' calls into one where possible."""
201 def __init__(self, result):
202 super(TagCollapsingDecorator, self).__init__(result)
203 # The current test (for filtering tags)
204 self._current_test = None
205 # The (new, gone) tags for the current test.
206 self._current_test_tags = None
208 def startTest(self, test):
211 Not directly passed to the client, but used for handling of tags
214 self.decorated.startTest(test)
215 self._current_test = test
216 self._current_test_tags = set(), set()
218 def stopTest(self, test):
221 Not directly passed to the client, but used for handling of tags
224 # Tags to output for this test.
225 if self._current_test_tags[0] or self._current_test_tags[1]:
226 self.decorated.tags(*self._current_test_tags)
227 self.decorated.stopTest(test)
228 self._current_test = None
229 self._current_test_tags = None
231 def tags(self, new_tags, gone_tags):
232 """Handle tag instructions.
234 Adds and removes tags as appropriate. If a test is currently running,
235 tags are not affected for subsequent tests.
237 :param new_tags: Tags to add,
238 :param gone_tags: Tags to remove.
240 if self._current_test is not None:
241 # gather the tags until the test stops.
242 self._current_test_tags[0].update(new_tags)
243 self._current_test_tags[0].difference_update(gone_tags)
244 self._current_test_tags[1].update(gone_tags)
245 self._current_test_tags[1].difference_update(new_tags)
247 return self.decorated.tags(new_tags, gone_tags)
250 class TimeCollapsingDecorator(HookedTestResultDecorator):
251 """Only pass on the first and last of a consecutive sequence of times."""
253 def __init__(self, decorated):
254 super(TimeCollapsingDecorator, self).__init__(decorated)
255 self._last_received_time = None
256 self._last_sent_time = None
258 def _before_event(self):
259 if self._last_received_time is None:
261 if self._last_received_time != self._last_sent_time:
262 self.decorated.time(self._last_received_time)
263 self._last_sent_time = self._last_received_time
264 self._last_received_time = None
266 def time(self, a_time):
267 # Don't upcall, because we don't want to call _before_event, it's only
268 # for non-time events.
269 if self._last_received_time is None:
270 self.decorated.time(a_time)
271 self._last_sent_time = a_time
272 self._last_received_time = a_time
276 """Return True if all of 'bools' are True. False otherwise."""
283 class TestResultFilter(TestResultDecorator):
284 """A pyunit TestResult interface implementation which filters tests.
286 Tests that pass the filter are handed on to another TestResult instance
287 for further processing/reporting. To obtain the filtered results,
288 the other instance must be interrogated.
290 :ivar result: The result that tests are passed to after filtering.
291 :ivar filter_predicate: The callback run to decide whether to pass
295 def __init__(self, result, filter_error=False, filter_failure=False,
296 filter_success=True, filter_skip=False,
297 filter_predicate=None, fixup_expected_failures=None):
298 """Create a FilterResult object filtering to result.
300 :param filter_error: Filter out errors.
301 :param filter_failure: Filter out failures.
302 :param filter_success: Filter out successful tests.
303 :param filter_skip: Filter out skipped tests.
304 :param filter_predicate: A callable taking (test, outcome, err,
305 details) and returning True if the result should be passed
306 through. err and details may be none if no error or extra
307 metadata is available. outcome is the name of the outcome such
308 as 'success' or 'failure'.
309 :param fixup_expected_failures: Set of test ids to consider known
312 super(TestResultFilter, self).__init__(result)
313 self.decorated = TimeCollapsingDecorator(
314 TagCollapsingDecorator(self.decorated))
317 predicates.append(lambda t, outcome, e, d: outcome != 'error')
319 predicates.append(lambda t, outcome, e, d: outcome != 'failure')
321 predicates.append(lambda t, outcome, e, d: outcome != 'success')
323 predicates.append(lambda t, outcome, e, d: outcome != 'skip')
324 if filter_predicate is not None:
325 predicates.append(filter_predicate)
326 self.filter_predicate = (
327 lambda test, outcome, err, details:
328 all_true(p(test, outcome, err, details) for p in predicates))
329 # The current test (for filtering tags)
330 self._current_test = None
331 # Has the current test been filtered (for outputting test tags)
332 self._current_test_filtered = None
333 # Calls to this result that we don't know whether to forward on yet.
334 self._buffered_calls = []
335 if fixup_expected_failures is None:
336 self._fixup_expected_failures = frozenset()
338 self._fixup_expected_failures = fixup_expected_failures
340 def addError(self, test, err=None, details=None):
341 if (self.filter_predicate(test, 'error', err, details)):
342 if test.id() in self._fixup_expected_failures:
343 self._buffered_calls.append(
344 ('addExpectedFailure', [test, err], {'details': details}))
346 self._buffered_calls.append(
347 ('addError', [test, err], {'details': details}))
351 def addFailure(self, test, err=None, details=None):
352 if (self.filter_predicate(test, 'failure', err, details)):
353 if test.id() in self._fixup_expected_failures:
354 self._buffered_calls.append(
355 ('addExpectedFailure', [test, err], {'details': details}))
357 self._buffered_calls.append(
358 ('addFailure', [test, err], {'details': details}))
362 def addSkip(self, test, reason=None, details=None):
363 if (self.filter_predicate(test, 'skip', reason, details)):
364 self._buffered_calls.append(
365 ('addSkip', [reason], {'details': details}))
369 def addSuccess(self, test, details=None):
370 if (self.filter_predicate(test, 'success', None, details)):
371 self._buffered_calls.append(
372 ('addSuccess', [test], {'details': details}))
376 def addExpectedFailure(self, test, err=None, details=None):
377 if self.filter_predicate(test, 'expectedfailure', err, details):
378 self._buffered_calls.append(
379 ('addExpectedFailure', [test, err], {'details': details}))
383 def addUnexpectedSuccess(self, test, details=None):
384 self._buffered_calls.append(
385 ('addUnexpectedSuccess', [test], {'details': details}))
388 self._current_test_filtered = True
390 def startTest(self, test):
393 Not directly passed to the client, but used for handling of tags
396 self._current_test = test
397 self._current_test_filtered = False
398 self._buffered_calls.append(('startTest', [test], {}))
400 def stopTest(self, test):
403 Not directly passed to the client, but used for handling of tags
406 if not self._current_test_filtered:
407 # Tags to output for this test.
408 for method, args, kwargs in self._buffered_calls:
409 getattr(self.decorated, method)(*args, **kwargs)
410 self.decorated.stopTest(test)
411 self._current_test = None
412 self._current_test_filtered = None
413 self._buffered_calls = []
415 def time(self, a_time):
416 if self._current_test is not None:
417 self._buffered_calls.append(('time', [a_time], {}))
419 return self.decorated.time(a_time)
421 def id_to_orig_id(self, id):
422 if id.startswith("subunit.RemotedTestCase."):
423 return id[len("subunit.RemotedTestCase."):]
427 class TestIdPrintingResult(testtools.TestResult):
429 def __init__(self, stream, show_times=False):
430 """Create a FilterResult object outputting to stream."""
431 super(TestIdPrintingResult, self).__init__()
432 self._stream = stream
433 self.failed_tests = 0
435 self.show_times = show_times
437 self._test_duration = 0
439 def addError(self, test, err):
440 self.failed_tests += 1
443 def addFailure(self, test, err):
444 self.failed_tests += 1
447 def addSuccess(self, test):
450 def reportTest(self, test, duration):
452 seconds = duration.seconds
453 seconds += duration.days * 3600 * 24
454 seconds += duration.microseconds / 1000000.0
455 self._stream.write(test.id() + ' %0.3f\n' % seconds)
457 self._stream.write(test.id() + '\n')
459 def startTest(self, test):
460 self._start_time = self._time()
462 def stopTest(self, test):
463 test_duration = self._time() - self._start_time
464 self.reportTest(self._test, test_duration)
466 def time(self, time):
472 def wasSuccessful(self):
473 "Tells whether or not this result was a success"
474 return self.failed_tests == 0