2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
25 # NOT a TestResult, because we are implementing the interface, not inheriting
27 class TestResultDecorator(object):
28 """General pass-through decorator.
30 This provides a base that other TestResults can inherit from to
31 gain basic forwarding functionality. It also takes care of
32 handling the case where the target doesn't support newer methods
33 or features by degrading them.
36 def __init__(self, decorated):
37 """Create a TestResultDecorator forwarding to decorated."""
38 # Make every decorator degrade gracefully.
39 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
41 def startTest(self, test):
42 return self.decorated.startTest(test)
44 def startTestRun(self):
45 return self.decorated.startTestRun()
47 def stopTest(self, test):
48 return self.decorated.stopTest(test)
50 def stopTestRun(self):
51 return self.decorated.stopTestRun()
53 def addError(self, test, err=None, details=None):
54 return self.decorated.addError(test, err, details=details)
56 def addFailure(self, test, err=None, details=None):
57 return self.decorated.addFailure(test, err, details=details)
59 def addSuccess(self, test, details=None):
60 return self.decorated.addSuccess(test, details=details)
62 def addSkip(self, test, reason=None, details=None):
63 return self.decorated.addSkip(test, reason, details=details)
65 def addExpectedFailure(self, test, err=None, details=None):
66 return self.decorated.addExpectedFailure(test, err, details=details)
68 def addUnexpectedSuccess(self, test, details=None):
69 return self.decorated.addUnexpectedSuccess(test, details=details)
71 def progress(self, offset, whence):
72 return self.decorated.progress(offset, whence)
74 def wasSuccessful(self):
75 return self.decorated.wasSuccessful()
79 return self.decorated.shouldStop
82 return self.decorated.stop()
84 def tags(self, new_tags, gone_tags):
85 return self.decorated.tags(new_tags, gone_tags)
87 def time(self, a_datetime):
88 return self.decorated.time(a_datetime)
91 class HookedTestResultDecorator(TestResultDecorator):
92 """A TestResult which calls a hook on every event."""
94 def __init__(self, decorated):
95 self.super = super(HookedTestResultDecorator, self)
96 self.super.__init__(decorated)
98 def startTest(self, test):
100 return self.super.startTest(test)
102 def startTestRun(self):
104 return self.super.startTestRun()
106 def stopTest(self, test):
108 return self.super.stopTest(test)
110 def stopTestRun(self):
112 return self.super.stopTestRun()
114 def addError(self, test, err=None, details=None):
116 return self.super.addError(test, err, details=details)
118 def addFailure(self, test, err=None, details=None):
120 return self.super.addFailure(test, err, details=details)
122 def addSuccess(self, test, details=None):
124 return self.super.addSuccess(test, details=details)
126 def addSkip(self, test, reason=None, details=None):
128 return self.super.addSkip(test, reason, details=details)
130 def addExpectedFailure(self, test, err=None, details=None):
132 return self.super.addExpectedFailure(test, err, details=details)
134 def addUnexpectedSuccess(self, test, details=None):
136 return self.super.addUnexpectedSuccess(test, details=details)
138 def progress(self, offset, whence):
140 return self.super.progress(offset, whence)
142 def wasSuccessful(self):
144 return self.super.wasSuccessful()
147 def shouldStop(self):
149 return self.super.shouldStop
153 return self.super.stop()
155 def time(self, a_datetime):
157 return self.super.time(a_datetime)
160 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
161 """Decorate a TestResult to add time events to a test run.
163 By default this will cause a time event before every test event,
164 but if explicit time data is being provided by the test run, then
165 this decorator will turn itself off to prevent causing confusion.
168 def __init__(self, decorated):
170 super(AutoTimingTestResultDecorator, self).__init__(decorated)
172 def _before_event(self):
176 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
177 self.decorated.time(time)
179 def progress(self, offset, whence):
180 return self.decorated.progress(offset, whence)
183 def shouldStop(self):
184 return self.decorated.shouldStop
186 def time(self, a_datetime):
187 """Provide a timestamp for the current test activity.
189 :param a_datetime: If None, automatically add timestamps before every
190 event (this is the default behaviour if time() is not called at
191 all). If not None, pass the provided time onto the decorated
192 result object and disable automatic timestamps.
194 self._time = a_datetime
195 return self.decorated.time(a_datetime)
198 class TagCollapsingDecorator(TestResultDecorator):
199 """Collapses many 'tags' calls into one where possible."""
201 def __init__(self, result):
202 TestResultDecorator.__init__(self, result)
203 # The current test (for filtering tags)
204 self._current_test = None
205 # The (new, gone) tags for the current test.
206 self._current_test_tags = None
208 def startTest(self, test):
211 Not directly passed to the client, but used for handling of tags
214 self.decorated.startTest(test)
215 self._current_test = test
216 self._current_test_tags = set(), set()
218 def stopTest(self, test):
221 Not directly passed to the client, but used for handling of tags
224 # Tags to output for this test.
225 if self._current_test_tags[0] or self._current_test_tags[1]:
226 self.decorated.tags(*self._current_test_tags)
227 self.decorated.stopTest(test)
228 self._current_test = None
229 self._current_test_tags = None
231 def tags(self, new_tags, gone_tags):
232 """Handle tag instructions.
234 Adds and removes tags as appropriate. If a test is currently running,
235 tags are not affected for subsequent tests.
237 :param new_tags: Tags to add,
238 :param gone_tags: Tags to remove.
240 if self._current_test is not None:
241 # gather the tags until the test stops.
242 self._current_test_tags[0].update(new_tags)
243 self._current_test_tags[0].difference_update(gone_tags)
244 self._current_test_tags[1].update(gone_tags)
245 self._current_test_tags[1].difference_update(new_tags)
247 return self.decorated.tags(new_tags, gone_tags)
250 class TestResultFilter(TestResultDecorator):
251 """A pyunit TestResult interface implementation which filters tests.
253 Tests that pass the filter are handed on to another TestResult instance
254 for further processing/reporting. To obtain the filtered results,
255 the other instance must be interrogated.
257 :ivar result: The result that tests are passed to after filtering.
258 :ivar filter_predicate: The callback run to decide whether to pass
262 def __init__(self, result, filter_error=False, filter_failure=False,
263 filter_success=True, filter_skip=False,
264 filter_predicate=None):
265 """Create a FilterResult object filtering to result.
267 :param filter_error: Filter out errors.
268 :param filter_failure: Filter out failures.
269 :param filter_success: Filter out successful tests.
270 :param filter_skip: Filter out skipped tests.
271 :param filter_predicate: A callable taking (test, outcome, err,
272 details) and returning True if the result should be passed
273 through. err and details may be none if no error or extra
274 metadata is available. outcome is the name of the outcome such
275 as 'success' or 'failure'.
277 TestResultDecorator.__init__(self, result)
278 self._filter_error = filter_error
279 self._filter_failure = filter_failure
280 self._filter_success = filter_success
281 self._filter_skip = filter_skip
282 if filter_predicate is None:
283 filter_predicate = lambda test, outcome, err, details: True
284 self.filter_predicate = filter_predicate
285 # The current test (for filtering tags)
286 self._current_test = None
287 # Has the current test been filtered (for outputting test tags)
288 self._current_test_filtered = None
289 # The (new, gone) tags for the current test.
290 self._current_test_tags = None
291 # Calls to this result that we don't know whether to forward on yet.
292 self._buffered_calls = []
294 def addError(self, test, err=None, details=None):
295 if (not self._filter_error and
296 self.filter_predicate(test, 'error', err, details)):
297 self._buffered_calls.append(
298 ('addError', [test, err], {'details': details}))
302 def addFailure(self, test, err=None, details=None):
303 if (not self._filter_failure and
304 self.filter_predicate(test, 'failure', err, details)):
305 self._buffered_calls.append(
306 ('addFailure', [test, err], {'details': details}))
310 def addSkip(self, test, reason=None, details=None):
311 if (not self._filter_skip and
312 self.filter_predicate(test, 'skip', reason, details)):
313 self._buffered_calls.append(
314 ('addSkip', [reason], {'details': details}))
318 def addSuccess(self, test, details=None):
319 if (not self._filter_success and
320 self.filter_predicate(test, 'success', None, details)):
321 self._buffered_calls.append(
322 ('addSuccess', [test], {'details': details}))
326 def addExpectedFailure(self, test, err=None, details=None):
327 if self.filter_predicate(test, 'expectedfailure', err, details):
328 self._buffered_calls.append(
329 ('addExpectedFailure', [test, err], {'details': details}))
333 def addUnexpectedSuccess(self, test, details=None):
334 self._buffered_calls.append(
335 ('addUnexpectedSuccess', [test], {'details': details}))
338 self._current_test_filtered = True
340 def startTest(self, test):
343 Not directly passed to the client, but used for handling of tags
346 self._current_test = test
347 self._current_test_filtered = False
348 self._current_test_tags = set(), set()
349 self._buffered_calls.append(('startTest', [test], {}))
351 def stopTest(self, test):
354 Not directly passed to the client, but used for handling of tags
357 if not self._current_test_filtered:
358 # Tags to output for this test.
359 for method, args, kwargs in self._buffered_calls:
360 getattr(self.decorated, method)(*args, **kwargs)
361 if self._current_test_tags[0] or self._current_test_tags[1]:
362 self.decorated.tags(*self._current_test_tags)
363 self.decorated.stopTest(test)
364 self._current_test = None
365 self._current_test_filtered = None
366 self._current_test_tags = None
367 self._buffered_calls = []
369 def tags(self, new_tags, gone_tags):
370 """Handle tag instructions.
372 Adds and removes tags as appropriate. If a test is currently running,
373 tags are not affected for subsequent tests.
375 :param new_tags: Tags to add,
376 :param gone_tags: Tags to remove.
378 if self._current_test is not None:
379 # gather the tags until the test stops.
380 self._current_test_tags[0].update(new_tags)
381 self._current_test_tags[0].difference_update(gone_tags)
382 self._current_test_tags[1].update(gone_tags)
383 self._current_test_tags[1].difference_update(new_tags)
384 return self.decorated.tags(new_tags, gone_tags)
386 def time(self, a_time):
387 if self._current_test is not None:
388 self._buffered_calls.append(('time', [a_time], {}))
390 return self.decorated.time(a_time)
392 def id_to_orig_id(self, id):
393 if id.startswith("subunit.RemotedTestCase."):
394 return id[len("subunit.RemotedTestCase."):]
398 class TestIdPrintingResult(testtools.TestResult):
400 def __init__(self, stream, show_times=False):
401 """Create a FilterResult object outputting to stream."""
402 testtools.TestResult.__init__(self)
403 self._stream = stream
404 self.failed_tests = 0
406 self.show_times = show_times
408 self._test_duration = 0
410 def addError(self, test, err):
411 self.failed_tests += 1
414 def addFailure(self, test, err):
415 self.failed_tests += 1
418 def addSuccess(self, test):
421 def reportTest(self, test, duration):
423 seconds = duration.seconds
424 seconds += duration.days * 3600 * 24
425 seconds += duration.microseconds / 1000000.0
426 self._stream.write(test.id() + ' %0.3f\n' % seconds)
428 self._stream.write(test.id() + '\n')
430 def startTest(self, test):
431 self._start_time = self._time()
433 def stopTest(self, test):
434 test_duration = self._time() - self._start_time
435 self.reportTest(self._test, test_duration)
437 def time(self, time):
443 def wasSuccessful(self):
444 "Tells whether or not this result was a success"
445 return self.failed_tests == 0