Merge Stewart Smith's branch to add timestamps to shell functions.
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import datetime
20
21 import testtools
22
23 from subunit import iso8601
24
25
26 # NOT a TestResult, because we are implementing the interface, not inheriting
27 # it.
28 class TestResultDecorator(object):
29     """General pass-through decorator.
30
31     This provides a base that other TestResults can inherit from to
32     gain basic forwarding functionality. It also takes care of
33     handling the case where the target doesn't support newer methods
34     or features by degrading them.
35     """
36
37     def __init__(self, decorated):
38         """Create a TestResultDecorator forwarding to decorated."""
39         # Make every decorator degrade gracefully.
40         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
41
42     def startTest(self, test):
43         return self.decorated.startTest(test)
44
45     def startTestRun(self):
46         return self.decorated.startTestRun()
47
48     def stopTest(self, test):
49         return self.decorated.stopTest(test)
50
51     def stopTestRun(self):
52         return self.decorated.stopTestRun()
53
54     def addError(self, test, err=None, details=None):
55         return self.decorated.addError(test, err, details=details)
56
57     def addFailure(self, test, err=None, details=None):
58         return self.decorated.addFailure(test, err, details=details)
59
60     def addSuccess(self, test, details=None):
61         return self.decorated.addSuccess(test, details=details)
62
63     def addSkip(self, test, reason=None, details=None):
64         return self.decorated.addSkip(test, reason, details=details)
65
66     def addExpectedFailure(self, test, err=None, details=None):
67         return self.decorated.addExpectedFailure(test, err, details=details)
68
69     def addUnexpectedSuccess(self, test, details=None):
70         return self.decorated.addUnexpectedSuccess(test, details=details)
71
72     def progress(self, offset, whence):
73         return self.decorated.progress(offset, whence)
74
75     def wasSuccessful(self):
76         return self.decorated.wasSuccessful()
77
78     @property
79     def shouldStop(self):
80         return self.decorated.shouldStop
81
82     def stop(self):
83         return self.decorated.stop()
84
85     @property
86     def testsRun(self):
87         return self.decorated.testsRun
88
89     def tags(self, new_tags, gone_tags):
90         return self.decorated.tags(new_tags, gone_tags)
91
92     def time(self, a_datetime):
93         return self.decorated.time(a_datetime)
94
95
96 class HookedTestResultDecorator(TestResultDecorator):
97     """A TestResult which calls a hook on every event."""
98
99     def __init__(self, decorated):
100         self.super = super(HookedTestResultDecorator, self)
101         self.super.__init__(decorated)
102
103     def startTest(self, test):
104         self._before_event()
105         return self.super.startTest(test)
106
107     def startTestRun(self):
108         self._before_event()
109         return self.super.startTestRun()
110
111     def stopTest(self, test):
112         self._before_event()
113         return self.super.stopTest(test)
114
115     def stopTestRun(self):
116         self._before_event()
117         return self.super.stopTestRun()
118
119     def addError(self, test, err=None, details=None):
120         self._before_event()
121         return self.super.addError(test, err, details=details)
122
123     def addFailure(self, test, err=None, details=None):
124         self._before_event()
125         return self.super.addFailure(test, err, details=details)
126
127     def addSuccess(self, test, details=None):
128         self._before_event()
129         return self.super.addSuccess(test, details=details)
130
131     def addSkip(self, test, reason=None, details=None):
132         self._before_event()
133         return self.super.addSkip(test, reason, details=details)
134
135     def addExpectedFailure(self, test, err=None, details=None):
136         self._before_event()
137         return self.super.addExpectedFailure(test, err, details=details)
138
139     def addUnexpectedSuccess(self, test, details=None):
140         self._before_event()
141         return self.super.addUnexpectedSuccess(test, details=details)
142
143     def progress(self, offset, whence):
144         self._before_event()
145         return self.super.progress(offset, whence)
146
147     def wasSuccessful(self):
148         self._before_event()
149         return self.super.wasSuccessful()
150
151     @property
152     def shouldStop(self):
153         self._before_event()
154         return self.super.shouldStop
155
156     def stop(self):
157         self._before_event()
158         return self.super.stop()
159
160     def time(self, a_datetime):
161         self._before_event()
162         return self.super.time(a_datetime)
163
164
165 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
166     """Decorate a TestResult to add time events to a test run.
167
168     By default this will cause a time event before every test event,
169     but if explicit time data is being provided by the test run, then
170     this decorator will turn itself off to prevent causing confusion.
171     """
172
173     def __init__(self, decorated):
174         self._time = None
175         super(AutoTimingTestResultDecorator, self).__init__(decorated)
176
177     def _before_event(self):
178         time = self._time
179         if time is not None:
180             return
181         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
182         self.decorated.time(time)
183
184     def progress(self, offset, whence):
185         return self.decorated.progress(offset, whence)
186
187     @property
188     def shouldStop(self):
189         return self.decorated.shouldStop
190
191     def time(self, a_datetime):
192         """Provide a timestamp for the current test activity.
193
194         :param a_datetime: If None, automatically add timestamps before every
195             event (this is the default behaviour if time() is not called at
196             all).  If not None, pass the provided time onto the decorated
197             result object and disable automatic timestamps.
198         """
199         self._time = a_datetime
200         return self.decorated.time(a_datetime)
201
202
203 class TagCollapsingDecorator(TestResultDecorator):
204     """Collapses many 'tags' calls into one where possible."""
205
206     def __init__(self, result):
207         super(TagCollapsingDecorator, self).__init__(result)
208         # The (new, gone) tags for the current test.
209         self._current_test_tags = None
210
211     def startTest(self, test):
212         """Start a test.
213
214         Not directly passed to the client, but used for handling of tags
215         correctly.
216         """
217         self.decorated.startTest(test)
218         self._current_test_tags = set(), set()
219
220     def stopTest(self, test):
221         """Stop a test.
222
223         Not directly passed to the client, but used for handling of tags
224         correctly.
225         """
226         # Tags to output for this test.
227         if self._current_test_tags[0] or self._current_test_tags[1]:
228             self.decorated.tags(*self._current_test_tags)
229         self.decorated.stopTest(test)
230         self._current_test_tags = None
231
232     def tags(self, new_tags, gone_tags):
233         """Handle tag instructions.
234
235         Adds and removes tags as appropriate. If a test is currently running,
236         tags are not affected for subsequent tests.
237
238         :param new_tags: Tags to add,
239         :param gone_tags: Tags to remove.
240         """
241         if self._current_test_tags is not None:
242             # gather the tags until the test stops.
243             self._current_test_tags[0].update(new_tags)
244             self._current_test_tags[0].difference_update(gone_tags)
245             self._current_test_tags[1].update(gone_tags)
246             self._current_test_tags[1].difference_update(new_tags)
247         else:
248             return self.decorated.tags(new_tags, gone_tags)
249
250
251 class TimeCollapsingDecorator(HookedTestResultDecorator):
252     """Only pass on the first and last of a consecutive sequence of times."""
253
254     def __init__(self, decorated):
255         super(TimeCollapsingDecorator, self).__init__(decorated)
256         self._last_received_time = None
257         self._last_sent_time = None
258
259     def _before_event(self):
260         if self._last_received_time is None:
261             return
262         if self._last_received_time != self._last_sent_time:
263             self.decorated.time(self._last_received_time)
264             self._last_sent_time = self._last_received_time
265         self._last_received_time = None
266
267     def time(self, a_time):
268         # Don't upcall, because we don't want to call _before_event, it's only
269         # for non-time events.
270         if self._last_received_time is None:
271             self.decorated.time(a_time)
272             self._last_sent_time = a_time
273         self._last_received_time = a_time
274
275
276 def all_true(bools):
277     """Return True if all of 'bools' are True. False otherwise."""
278     for b in bools:
279         if not b:
280             return False
281     return True
282
283
284 class TestResultFilter(TestResultDecorator):
285     """A pyunit TestResult interface implementation which filters tests.
286
287     Tests that pass the filter are handed on to another TestResult instance
288     for further processing/reporting. To obtain the filtered results,
289     the other instance must be interrogated.
290
291     :ivar result: The result that tests are passed to after filtering.
292     :ivar filter_predicate: The callback run to decide whether to pass
293         a result.
294     """
295
296     def __init__(self, result, filter_error=False, filter_failure=False,
297         filter_success=True, filter_skip=False, filter_xfail=False,
298         filter_predicate=None, fixup_expected_failures=None):
299         """Create a FilterResult object filtering to result.
300
301         :param filter_error: Filter out errors.
302         :param filter_failure: Filter out failures.
303         :param filter_success: Filter out successful tests.
304         :param filter_skip: Filter out skipped tests.
305         :param filter_xfail: Filter out expected failure tests.
306         :param filter_predicate: A callable taking (test, outcome, err,
307             details) and returning True if the result should be passed
308             through.  err and details may be none if no error or extra
309             metadata is available. outcome is the name of the outcome such
310             as 'success' or 'failure'.
311         :param fixup_expected_failures: Set of test ids to consider known
312             failing.
313         """
314         super(TestResultFilter, self).__init__(result)
315         self.decorated = TimeCollapsingDecorator(
316             TagCollapsingDecorator(self.decorated))
317         predicates = []
318         if filter_error:
319             predicates.append(lambda t, outcome, e, d: outcome != 'error')
320         if filter_failure:
321             predicates.append(lambda t, outcome, e, d: outcome != 'failure')
322         if filter_success:
323             predicates.append(lambda t, outcome, e, d: outcome != 'success')
324         if filter_skip:
325             predicates.append(lambda t, outcome, e, d: outcome != 'skip')
326         if filter_xfail:
327             predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
328         if filter_predicate is not None:
329             predicates.append(filter_predicate)
330         self.filter_predicate = (
331             lambda test, outcome, err, details:
332                 all_true(p(test, outcome, err, details) for p in predicates))
333         # The current test (for filtering tags)
334         self._current_test = None
335         # Has the current test been filtered (for outputting test tags)
336         self._current_test_filtered = None
337         # Calls to this result that we don't know whether to forward on yet.
338         self._buffered_calls = []
339         if fixup_expected_failures is None:
340             self._fixup_expected_failures = frozenset()
341         else:
342             self._fixup_expected_failures = fixup_expected_failures
343
344     def addError(self, test, err=None, details=None):
345         if (self.filter_predicate(test, 'error', err, details)):
346             if self._failure_expected(test):
347                 self._buffered_calls.append(
348                     ('addExpectedFailure', [test, err], {'details': details}))
349             else:
350                 self._buffered_calls.append(
351                     ('addError', [test, err], {'details': details}))
352         else:
353             self._filtered()
354
355     def addFailure(self, test, err=None, details=None):
356         if (self.filter_predicate(test, 'failure', err, details)):
357             if self._failure_expected(test):
358                 self._buffered_calls.append(
359                     ('addExpectedFailure', [test, err], {'details': details}))
360             else:
361                 self._buffered_calls.append(
362                     ('addFailure', [test, err], {'details': details}))
363         else:
364             self._filtered()
365
366     def addSkip(self, test, reason=None, details=None):
367         if (self.filter_predicate(test, 'skip', reason, details)):
368             self._buffered_calls.append(
369                 ('addSkip', [test, reason], {'details': details}))
370         else:
371             self._filtered()
372
373     def addSuccess(self, test, details=None):
374         if (self.filter_predicate(test, 'success', None, details)):
375             if self._failure_expected(test):
376                 self._buffered_calls.append(
377                     ('addUnexpectedSuccess', [test], {'details': details}))
378             else:
379                 self._buffered_calls.append(
380                     ('addSuccess', [test], {'details': details}))
381         else:
382             self._filtered()
383
384     def addExpectedFailure(self, test, err=None, details=None):
385         if self.filter_predicate(test, 'expectedfailure', err, details):
386             self._buffered_calls.append(
387                 ('addExpectedFailure', [test, err], {'details': details}))
388         else:
389             self._filtered()
390
391     def addUnexpectedSuccess(self, test, details=None):
392         self._buffered_calls.append(
393             ('addUnexpectedSuccess', [test], {'details': details}))
394
395     def _filtered(self):
396         self._current_test_filtered = True
397
398     def _failure_expected(self, test):
399         return (test.id() in self._fixup_expected_failures)
400
401     def startTest(self, test):
402         """Start a test.
403
404         Not directly passed to the client, but used for handling of tags
405         correctly.
406         """
407         self._current_test = test
408         self._current_test_filtered = False
409         self._buffered_calls.append(('startTest', [test], {}))
410
411     def stopTest(self, test):
412         """Stop a test.
413
414         Not directly passed to the client, but used for handling of tags
415         correctly.
416         """
417         if not self._current_test_filtered:
418             # Tags to output for this test.
419             for method, args, kwargs in self._buffered_calls:
420                 getattr(self.decorated, method)(*args, **kwargs)
421             self.decorated.stopTest(test)
422         self._current_test = None
423         self._current_test_filtered = None
424         self._buffered_calls = []
425
426     def time(self, a_time):
427         if self._current_test is not None:
428             self._buffered_calls.append(('time', [a_time], {}))
429         else:
430             return self.decorated.time(a_time)
431
432     def id_to_orig_id(self, id):
433         if id.startswith("subunit.RemotedTestCase."):
434             return id[len("subunit.RemotedTestCase."):]
435         return id
436
437
438 class TestIdPrintingResult(testtools.TestResult):
439
440     def __init__(self, stream, show_times=False):
441         """Create a FilterResult object outputting to stream."""
442         super(TestIdPrintingResult, self).__init__()
443         self._stream = stream
444         self.failed_tests = 0
445         self.__time = None
446         self.show_times = show_times
447         self._test = None
448         self._test_duration = 0
449
450     def addError(self, test, err):
451         self.failed_tests += 1
452         self._test = test
453
454     def addFailure(self, test, err):
455         self.failed_tests += 1
456         self._test = test
457
458     def addSuccess(self, test):
459         self._test = test
460
461     def addSkip(self, test, reason=None, details=None):
462         self._test = test
463
464     def addUnexpectedSuccess(self, test, details=None):
465         self.failed_tests += 1
466         self._test = test
467
468     def addExpectedFailure(self, test, err=None, details=None):
469         self._test = test
470
471     def reportTest(self, test, duration):
472         if self.show_times:
473             seconds = duration.seconds
474             seconds += duration.days * 3600 * 24
475             seconds += duration.microseconds / 1000000.0
476             self._stream.write(test.id() + ' %0.3f\n' % seconds)
477         else:
478             self._stream.write(test.id() + '\n')
479
480     def startTest(self, test):
481         self._start_time = self._time()
482
483     def stopTest(self, test):
484         test_duration = self._time() - self._start_time
485         self.reportTest(self._test, test_duration)
486
487     def time(self, time):
488         self.__time = time
489
490     def _time(self):
491         return self.__time
492
493     def wasSuccessful(self):
494         "Tells whether or not this result was a success"
495         return self.failed_tests == 0