Fix handling of addSkip in TestResultFilter.
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import datetime
20
21 import testtools
22
23 from subunit import iso8601
24
25
26 # NOT a TestResult, because we are implementing the interface, not inheriting
27 # it.
28 class TestResultDecorator(object):
29     """General pass-through decorator.
30
31     This provides a base that other TestResults can inherit from to
32     gain basic forwarding functionality. It also takes care of
33     handling the case where the target doesn't support newer methods
34     or features by degrading them.
35     """
36
37     def __init__(self, decorated):
38         """Create a TestResultDecorator forwarding to decorated."""
39         # Make every decorator degrade gracefully.
40         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
41
42     def startTest(self, test):
43         return self.decorated.startTest(test)
44
45     def startTestRun(self):
46         return self.decorated.startTestRun()
47
48     def stopTest(self, test):
49         return self.decorated.stopTest(test)
50
51     def stopTestRun(self):
52         return self.decorated.stopTestRun()
53
54     def addError(self, test, err=None, details=None):
55         return self.decorated.addError(test, err, details=details)
56
57     def addFailure(self, test, err=None, details=None):
58         return self.decorated.addFailure(test, err, details=details)
59
60     def addSuccess(self, test, details=None):
61         return self.decorated.addSuccess(test, details=details)
62
63     def addSkip(self, test, reason=None, details=None):
64         return self.decorated.addSkip(test, reason, details=details)
65
66     def addExpectedFailure(self, test, err=None, details=None):
67         return self.decorated.addExpectedFailure(test, err, details=details)
68
69     def addUnexpectedSuccess(self, test, details=None):
70         return self.decorated.addUnexpectedSuccess(test, details=details)
71
72     def progress(self, offset, whence):
73         return self.decorated.progress(offset, whence)
74
75     def wasSuccessful(self):
76         return self.decorated.wasSuccessful()
77
78     @property
79     def shouldStop(self):
80         return self.decorated.shouldStop
81
82     def stop(self):
83         return self.decorated.stop()
84
85     @property
86     def testsRun(self):
87         return self.decorated.testsRun
88
89     def tags(self, new_tags, gone_tags):
90         return self.decorated.tags(new_tags, gone_tags)
91
92     def time(self, a_datetime):
93         return self.decorated.time(a_datetime)
94
95
96 class HookedTestResultDecorator(TestResultDecorator):
97     """A TestResult which calls a hook on every event."""
98
99     def __init__(self, decorated):
100         self.super = super(HookedTestResultDecorator, self)
101         self.super.__init__(decorated)
102
103     def startTest(self, test):
104         self._before_event()
105         return self.super.startTest(test)
106
107     def startTestRun(self):
108         self._before_event()
109         return self.super.startTestRun()
110
111     def stopTest(self, test):
112         self._before_event()
113         return self.super.stopTest(test)
114
115     def stopTestRun(self):
116         self._before_event()
117         return self.super.stopTestRun()
118
119     def addError(self, test, err=None, details=None):
120         self._before_event()
121         return self.super.addError(test, err, details=details)
122
123     def addFailure(self, test, err=None, details=None):
124         self._before_event()
125         return self.super.addFailure(test, err, details=details)
126
127     def addSuccess(self, test, details=None):
128         self._before_event()
129         return self.super.addSuccess(test, details=details)
130
131     def addSkip(self, test, reason=None, details=None):
132         self._before_event()
133         return self.super.addSkip(test, reason, details=details)
134
135     def addExpectedFailure(self, test, err=None, details=None):
136         self._before_event()
137         return self.super.addExpectedFailure(test, err, details=details)
138
139     def addUnexpectedSuccess(self, test, details=None):
140         self._before_event()
141         return self.super.addUnexpectedSuccess(test, details=details)
142
143     def progress(self, offset, whence):
144         self._before_event()
145         return self.super.progress(offset, whence)
146
147     def wasSuccessful(self):
148         self._before_event()
149         return self.super.wasSuccessful()
150
151     @property
152     def shouldStop(self):
153         self._before_event()
154         return self.super.shouldStop
155
156     def stop(self):
157         self._before_event()
158         return self.super.stop()
159
160     def time(self, a_datetime):
161         self._before_event()
162         return self.super.time(a_datetime)
163
164
165 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
166     """Decorate a TestResult to add time events to a test run.
167
168     By default this will cause a time event before every test event,
169     but if explicit time data is being provided by the test run, then
170     this decorator will turn itself off to prevent causing confusion.
171     """
172
173     def __init__(self, decorated):
174         self._time = None
175         super(AutoTimingTestResultDecorator, self).__init__(decorated)
176
177     def _before_event(self):
178         time = self._time
179         if time is not None:
180             return
181         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
182         self.decorated.time(time)
183
184     def progress(self, offset, whence):
185         return self.decorated.progress(offset, whence)
186
187     @property
188     def shouldStop(self):
189         return self.decorated.shouldStop
190
191     def time(self, a_datetime):
192         """Provide a timestamp for the current test activity.
193
194         :param a_datetime: If None, automatically add timestamps before every
195             event (this is the default behaviour if time() is not called at
196             all).  If not None, pass the provided time onto the decorated
197             result object and disable automatic timestamps.
198         """
199         self._time = a_datetime
200         return self.decorated.time(a_datetime)
201
202
203 class TagCollapsingDecorator(TestResultDecorator):
204     """Collapses many 'tags' calls into one where possible."""
205
206     def __init__(self, result):
207         super(TagCollapsingDecorator, self).__init__(result)
208         # The (new, gone) tags for the current test.
209         self._current_test_tags = None
210
211     def startTest(self, test):
212         """Start a test.
213
214         Not directly passed to the client, but used for handling of tags
215         correctly.
216         """
217         self.decorated.startTest(test)
218         self._current_test_tags = set(), set()
219
220     def stopTest(self, test):
221         """Stop a test.
222
223         Not directly passed to the client, but used for handling of tags
224         correctly.
225         """
226         # Tags to output for this test.
227         if self._current_test_tags[0] or self._current_test_tags[1]:
228             self.decorated.tags(*self._current_test_tags)
229         self.decorated.stopTest(test)
230         self._current_test_tags = None
231
232     def tags(self, new_tags, gone_tags):
233         """Handle tag instructions.
234
235         Adds and removes tags as appropriate. If a test is currently running,
236         tags are not affected for subsequent tests.
237
238         :param new_tags: Tags to add,
239         :param gone_tags: Tags to remove.
240         """
241         if self._current_test_tags is not None:
242             # gather the tags until the test stops.
243             self._current_test_tags[0].update(new_tags)
244             self._current_test_tags[0].difference_update(gone_tags)
245             self._current_test_tags[1].update(gone_tags)
246             self._current_test_tags[1].difference_update(new_tags)
247         else:
248             return self.decorated.tags(new_tags, gone_tags)
249
250
251 class TimeCollapsingDecorator(HookedTestResultDecorator):
252     """Only pass on the first and last of a consecutive sequence of times."""
253
254     def __init__(self, decorated):
255         super(TimeCollapsingDecorator, self).__init__(decorated)
256         self._last_received_time = None
257         self._last_sent_time = None
258
259     def _before_event(self):
260         if self._last_received_time is None:
261             return
262         if self._last_received_time != self._last_sent_time:
263             self.decorated.time(self._last_received_time)
264             self._last_sent_time = self._last_received_time
265         self._last_received_time = None
266
267     def time(self, a_time):
268         # Don't upcall, because we don't want to call _before_event, it's only
269         # for non-time events.
270         if self._last_received_time is None:
271             self.decorated.time(a_time)
272             self._last_sent_time = a_time
273         self._last_received_time = a_time
274
275
276 def all_true(bools):
277     """Return True if all of 'bools' are True. False otherwise."""
278     for b in bools:
279         if not b:
280             return False
281     return True
282
283
284 class TestResultFilter(TestResultDecorator):
285     """A pyunit TestResult interface implementation which filters tests.
286
287     Tests that pass the filter are handed on to another TestResult instance
288     for further processing/reporting. To obtain the filtered results,
289     the other instance must be interrogated.
290
291     :ivar result: The result that tests are passed to after filtering.
292     :ivar filter_predicate: The callback run to decide whether to pass
293         a result.
294     """
295
296     def __init__(self, result, filter_error=False, filter_failure=False,
297         filter_success=True, filter_skip=False,
298         filter_predicate=None, fixup_expected_failures=None):
299         """Create a FilterResult object filtering to result.
300
301         :param filter_error: Filter out errors.
302         :param filter_failure: Filter out failures.
303         :param filter_success: Filter out successful tests.
304         :param filter_skip: Filter out skipped tests.
305         :param filter_predicate: A callable taking (test, outcome, err,
306             details) and returning True if the result should be passed
307             through.  err and details may be none if no error or extra
308             metadata is available. outcome is the name of the outcome such
309             as 'success' or 'failure'.
310         :param fixup_expected_failures: Set of test ids to consider known
311             failing.
312         """
313         super(TestResultFilter, self).__init__(result)
314         self.decorated = TimeCollapsingDecorator(
315             TagCollapsingDecorator(self.decorated))
316         predicates = []
317         if filter_error:
318             predicates.append(lambda t, outcome, e, d: outcome != 'error')
319         if filter_failure:
320             predicates.append(lambda t, outcome, e, d: outcome != 'failure')
321         if filter_success:
322             predicates.append(lambda t, outcome, e, d: outcome != 'success')
323         if filter_skip:
324             predicates.append(lambda t, outcome, e, d: outcome != 'skip')
325         if filter_predicate is not None:
326             predicates.append(filter_predicate)
327         self.filter_predicate = (
328             lambda test, outcome, err, details:
329                 all_true(p(test, outcome, err, details) for p in predicates))
330         # The current test (for filtering tags)
331         self._current_test = None
332         # Has the current test been filtered (for outputting test tags)
333         self._current_test_filtered = None
334         # Calls to this result that we don't know whether to forward on yet.
335         self._buffered_calls = []
336         if fixup_expected_failures is None:
337             self._fixup_expected_failures = frozenset()
338         else:
339             self._fixup_expected_failures = fixup_expected_failures
340
341     def addError(self, test, err=None, details=None):
342         if (self.filter_predicate(test, 'error', err, details)):
343             if self._failure_expected(test):
344                 self._buffered_calls.append(
345                     ('addExpectedFailure', [test, err], {'details': details}))
346             else:
347                 self._buffered_calls.append(
348                     ('addError', [test, err], {'details': details}))
349         else:
350             self._filtered()
351
352     def addFailure(self, test, err=None, details=None):
353         if (self.filter_predicate(test, 'failure', err, details)):
354             if self._failure_expected(test):
355                 self._buffered_calls.append(
356                     ('addExpectedFailure', [test, err], {'details': details}))
357             else:
358                 self._buffered_calls.append(
359                     ('addFailure', [test, err], {'details': details}))
360         else:
361             self._filtered()
362
363     def addSkip(self, test, reason=None, details=None):
364         if (self.filter_predicate(test, 'skip', reason, details)):
365             self._buffered_calls.append(
366                 ('addSkip', [test, reason], {'details': details}))
367         else:
368             self._filtered()
369
370     def addSuccess(self, test, details=None):
371         if (self.filter_predicate(test, 'success', None, details)):
372             if self._failure_expected(test):
373                 self._buffered_calls.append(
374                     ('addUnexpectedSuccess', [test], {'details': details}))
375             else:
376                 self._buffered_calls.append(
377                     ('addSuccess', [test], {'details': details}))
378         else:
379             self._filtered()
380
381     def addExpectedFailure(self, test, err=None, details=None):
382         if self.filter_predicate(test, 'expectedfailure', err, details):
383             self._buffered_calls.append(
384                 ('addExpectedFailure', [test, err], {'details': details}))
385         else:
386             self._filtered()
387
388     def addUnexpectedSuccess(self, test, details=None):
389         self._buffered_calls.append(
390             ('addUnexpectedSuccess', [test], {'details': details}))
391
392     def _filtered(self):
393         self._current_test_filtered = True
394
395     def _failure_expected(self, test):
396         return (test.id() in self._fixup_expected_failures)
397
398     def startTest(self, test):
399         """Start a test.
400
401         Not directly passed to the client, but used for handling of tags
402         correctly.
403         """
404         self._current_test = test
405         self._current_test_filtered = False
406         self._buffered_calls.append(('startTest', [test], {}))
407
408     def stopTest(self, test):
409         """Stop a test.
410
411         Not directly passed to the client, but used for handling of tags
412         correctly.
413         """
414         if not self._current_test_filtered:
415             # Tags to output for this test.
416             for method, args, kwargs in self._buffered_calls:
417                 getattr(self.decorated, method)(*args, **kwargs)
418             self.decorated.stopTest(test)
419         self._current_test = None
420         self._current_test_filtered = None
421         self._buffered_calls = []
422
423     def time(self, a_time):
424         if self._current_test is not None:
425             self._buffered_calls.append(('time', [a_time], {}))
426         else:
427             return self.decorated.time(a_time)
428
429     def id_to_orig_id(self, id):
430         if id.startswith("subunit.RemotedTestCase."):
431             return id[len("subunit.RemotedTestCase."):]
432         return id
433
434
435 class TestIdPrintingResult(testtools.TestResult):
436
437     def __init__(self, stream, show_times=False):
438         """Create a FilterResult object outputting to stream."""
439         super(TestIdPrintingResult, self).__init__()
440         self._stream = stream
441         self.failed_tests = 0
442         self.__time = 0
443         self.show_times = show_times
444         self._test = None
445         self._test_duration = 0
446
447     def addError(self, test, err):
448         self.failed_tests += 1
449         self._test = test
450
451     def addFailure(self, test, err):
452         self.failed_tests += 1
453         self._test = test
454
455     def addSuccess(self, test):
456         self._test = test
457
458     def reportTest(self, test, duration):
459         if self.show_times:
460             seconds = duration.seconds
461             seconds += duration.days * 3600 * 24
462             seconds += duration.microseconds / 1000000.0
463             self._stream.write(test.id() + ' %0.3f\n' % seconds)
464         else:
465             self._stream.write(test.id() + '\n')
466
467     def startTest(self, test):
468         self._start_time = self._time()
469
470     def stopTest(self, test):
471         test_duration = self._time() - self._start_time
472         self.reportTest(self._test, test_duration)
473
474     def time(self, time):
475         self.__time = time
476
477     def _time(self):
478         return self.__time
479
480     def wasSuccessful(self):
481         "Tells whether or not this result was a success"
482         return self.failed_tests == 0