Implement the tag collapsing logic by stealing stuff from TRF and fixing it.
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import datetime
20
21 import iso8601
22 import testtools
23
24
25 # NOT a TestResult, because we are implementing the interface, not inheriting
26 # it.
27 class TestResultDecorator(object):
28     """General pass-through decorator.
29
30     This provides a base that other TestResults can inherit from to
31     gain basic forwarding functionality. It also takes care of
32     handling the case where the target doesn't support newer methods
33     or features by degrading them.
34     """
35
36     def __init__(self, decorated):
37         """Create a TestResultDecorator forwarding to decorated."""
38         # Make every decorator degrade gracefully.
39         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
40
41     def startTest(self, test):
42         return self.decorated.startTest(test)
43
44     def startTestRun(self):
45         return self.decorated.startTestRun()
46
47     def stopTest(self, test):
48         return self.decorated.stopTest(test)
49
50     def stopTestRun(self):
51         return self.decorated.stopTestRun()
52
53     def addError(self, test, err=None, details=None):
54         return self.decorated.addError(test, err, details=details)
55
56     def addFailure(self, test, err=None, details=None):
57         return self.decorated.addFailure(test, err, details=details)
58
59     def addSuccess(self, test, details=None):
60         return self.decorated.addSuccess(test, details=details)
61
62     def addSkip(self, test, reason=None, details=None):
63         return self.decorated.addSkip(test, reason, details=details)
64
65     def addExpectedFailure(self, test, err=None, details=None):
66         return self.decorated.addExpectedFailure(test, err, details=details)
67
68     def addUnexpectedSuccess(self, test, details=None):
69         return self.decorated.addUnexpectedSuccess(test, details=details)
70
71     def progress(self, offset, whence):
72         return self.decorated.progress(offset, whence)
73
74     def wasSuccessful(self):
75         return self.decorated.wasSuccessful()
76
77     @property
78     def shouldStop(self):
79         return self.decorated.shouldStop
80
81     def stop(self):
82         return self.decorated.stop()
83
84     def tags(self, new_tags, gone_tags):
85         return self.decorated.tags(new_tags, gone_tags)
86
87     def time(self, a_datetime):
88         return self.decorated.time(a_datetime)
89
90
91 class HookedTestResultDecorator(TestResultDecorator):
92     """A TestResult which calls a hook on every event."""
93
94     def __init__(self, decorated):
95         self.super = super(HookedTestResultDecorator, self)
96         self.super.__init__(decorated)
97
98     def startTest(self, test):
99         self._before_event()
100         return self.super.startTest(test)
101
102     def startTestRun(self):
103         self._before_event()
104         return self.super.startTestRun()
105
106     def stopTest(self, test):
107         self._before_event()
108         return self.super.stopTest(test)
109
110     def stopTestRun(self):
111         self._before_event()
112         return self.super.stopTestRun()
113
114     def addError(self, test, err=None, details=None):
115         self._before_event()
116         return self.super.addError(test, err, details=details)
117
118     def addFailure(self, test, err=None, details=None):
119         self._before_event()
120         return self.super.addFailure(test, err, details=details)
121
122     def addSuccess(self, test, details=None):
123         self._before_event()
124         return self.super.addSuccess(test, details=details)
125
126     def addSkip(self, test, reason=None, details=None):
127         self._before_event()
128         return self.super.addSkip(test, reason, details=details)
129
130     def addExpectedFailure(self, test, err=None, details=None):
131         self._before_event()
132         return self.super.addExpectedFailure(test, err, details=details)
133
134     def addUnexpectedSuccess(self, test, details=None):
135         self._before_event()
136         return self.super.addUnexpectedSuccess(test, details=details)
137
138     def progress(self, offset, whence):
139         self._before_event()
140         return self.super.progress(offset, whence)
141
142     def wasSuccessful(self):
143         self._before_event()
144         return self.super.wasSuccessful()
145
146     @property
147     def shouldStop(self):
148         self._before_event()
149         return self.super.shouldStop
150
151     def stop(self):
152         self._before_event()
153         return self.super.stop()
154
155     def time(self, a_datetime):
156         self._before_event()
157         return self.super.time(a_datetime)
158
159
160 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
161     """Decorate a TestResult to add time events to a test run.
162
163     By default this will cause a time event before every test event,
164     but if explicit time data is being provided by the test run, then
165     this decorator will turn itself off to prevent causing confusion.
166     """
167
168     def __init__(self, decorated):
169         self._time = None
170         super(AutoTimingTestResultDecorator, self).__init__(decorated)
171
172     def _before_event(self):
173         time = self._time
174         if time is not None:
175             return
176         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
177         self.decorated.time(time)
178
179     def progress(self, offset, whence):
180         return self.decorated.progress(offset, whence)
181
182     @property
183     def shouldStop(self):
184         return self.decorated.shouldStop
185
186     def time(self, a_datetime):
187         """Provide a timestamp for the current test activity.
188
189         :param a_datetime: If None, automatically add timestamps before every
190             event (this is the default behaviour if time() is not called at
191             all).  If not None, pass the provided time onto the decorated
192             result object and disable automatic timestamps.
193         """
194         self._time = a_datetime
195         return self.decorated.time(a_datetime)
196
197
198 class TagCollapsingDecorator(TestResultDecorator):
199     """Collapses many 'tags' calls into one where possible."""
200
201     def __init__(self, result):
202         TestResultDecorator.__init__(self, result)
203         # The current test (for filtering tags)
204         self._current_test = None
205         # The (new, gone) tags for the current test.
206         self._current_test_tags = None
207
208     def startTest(self, test):
209         """Start a test.
210
211         Not directly passed to the client, but used for handling of tags
212         correctly.
213         """
214         self.decorated.startTest(test)
215         self._current_test = test
216         self._current_test_tags = set(), set()
217
218     def stopTest(self, test):
219         """Stop a test.
220
221         Not directly passed to the client, but used for handling of tags
222         correctly.
223         """
224         # Tags to output for this test.
225         if self._current_test_tags[0] or self._current_test_tags[1]:
226             self.decorated.tags(*self._current_test_tags)
227         self.decorated.stopTest(test)
228         self._current_test = None
229         self._current_test_tags = None
230
231     def tags(self, new_tags, gone_tags):
232         """Handle tag instructions.
233
234         Adds and removes tags as appropriate. If a test is currently running,
235         tags are not affected for subsequent tests.
236
237         :param new_tags: Tags to add,
238         :param gone_tags: Tags to remove.
239         """
240         if self._current_test is not None:
241             # gather the tags until the test stops.
242             self._current_test_tags[0].update(new_tags)
243             self._current_test_tags[0].difference_update(gone_tags)
244             self._current_test_tags[1].update(gone_tags)
245             self._current_test_tags[1].difference_update(new_tags)
246         else:
247             return self.decorated.tags(new_tags, gone_tags)
248
249
250 class TestResultFilter(TestResultDecorator):
251     """A pyunit TestResult interface implementation which filters tests.
252
253     Tests that pass the filter are handed on to another TestResult instance
254     for further processing/reporting. To obtain the filtered results,
255     the other instance must be interrogated.
256
257     :ivar result: The result that tests are passed to after filtering.
258     :ivar filter_predicate: The callback run to decide whether to pass
259         a result.
260     """
261
262     def __init__(self, result, filter_error=False, filter_failure=False,
263         filter_success=True, filter_skip=False,
264         filter_predicate=None):
265         """Create a FilterResult object filtering to result.
266
267         :param filter_error: Filter out errors.
268         :param filter_failure: Filter out failures.
269         :param filter_success: Filter out successful tests.
270         :param filter_skip: Filter out skipped tests.
271         :param filter_predicate: A callable taking (test, outcome, err,
272             details) and returning True if the result should be passed
273             through.  err and details may be none if no error or extra
274             metadata is available. outcome is the name of the outcome such
275             as 'success' or 'failure'.
276         """
277         TestResultDecorator.__init__(self, result)
278         self._filter_error = filter_error
279         self._filter_failure = filter_failure
280         self._filter_success = filter_success
281         self._filter_skip = filter_skip
282         if filter_predicate is None:
283             filter_predicate = lambda test, outcome, err, details: True
284         self.filter_predicate = filter_predicate
285         # The current test (for filtering tags)
286         self._current_test = None
287         # Has the current test been filtered (for outputting test tags)
288         self._current_test_filtered = None
289         # The (new, gone) tags for the current test.
290         self._current_test_tags = None
291         # Calls to this result that we don't know whether to forward on yet.
292         self._buffered_calls = []
293
294     def addError(self, test, err=None, details=None):
295         if (not self._filter_error and
296             self.filter_predicate(test, 'error', err, details)):
297             self._buffered_calls.append(
298                 ('addError', [test, err], {'details': details}))
299         else:
300             self._filtered()
301
302     def addFailure(self, test, err=None, details=None):
303         if (not self._filter_failure and
304             self.filter_predicate(test, 'failure', err, details)):
305             self._buffered_calls.append(
306                 ('addFailure', [test, err], {'details': details}))
307         else:
308             self._filtered()
309
310     def addSkip(self, test, reason=None, details=None):
311         if (not self._filter_skip and
312             self.filter_predicate(test, 'skip', reason, details)):
313             self._buffered_calls.append(
314                 ('addSkip', [reason], {'details': details}))
315         else:
316             self._filtered()
317
318     def addSuccess(self, test, details=None):
319         if (not self._filter_success and
320             self.filter_predicate(test, 'success', None, details)):
321             self._buffered_calls.append(
322                 ('addSuccess', [test], {'details': details}))
323         else:
324             self._filtered()
325
326     def addExpectedFailure(self, test, err=None, details=None):
327         if self.filter_predicate(test, 'expectedfailure', err, details):
328             self._buffered_calls.append(
329                 ('addExpectedFailure', [test, err], {'details': details}))
330         else:
331             self._filtered()
332
333     def addUnexpectedSuccess(self, test, details=None):
334         self._buffered_calls.append(
335             ('addUnexpectedSuccess', [test], {'details': details}))
336
337     def _filtered(self):
338         self._current_test_filtered = True
339
340     def startTest(self, test):
341         """Start a test.
342
343         Not directly passed to the client, but used for handling of tags
344         correctly.
345         """
346         self._current_test = test
347         self._current_test_filtered = False
348         self._current_test_tags = set(), set()
349         self._buffered_calls.append(('startTest', [test], {}))
350
351     def stopTest(self, test):
352         """Stop a test.
353
354         Not directly passed to the client, but used for handling of tags
355         correctly.
356         """
357         if not self._current_test_filtered:
358             # Tags to output for this test.
359             for method, args, kwargs in self._buffered_calls:
360                 getattr(self.decorated, method)(*args, **kwargs)
361             if self._current_test_tags[0] or self._current_test_tags[1]:
362                 self.decorated.tags(*self._current_test_tags)
363             self.decorated.stopTest(test)
364         self._current_test = None
365         self._current_test_filtered = None
366         self._current_test_tags = None
367         self._buffered_calls = []
368
369     def tags(self, new_tags, gone_tags):
370         """Handle tag instructions.
371
372         Adds and removes tags as appropriate. If a test is currently running,
373         tags are not affected for subsequent tests.
374
375         :param new_tags: Tags to add,
376         :param gone_tags: Tags to remove.
377         """
378         if self._current_test is not None:
379             # gather the tags until the test stops.
380             self._current_test_tags[0].update(new_tags)
381             self._current_test_tags[0].difference_update(gone_tags)
382             self._current_test_tags[1].update(gone_tags)
383             self._current_test_tags[1].difference_update(new_tags)
384         return self.decorated.tags(new_tags, gone_tags)
385
386     def time(self, a_time):
387         if self._current_test is not None:
388             self._buffered_calls.append(('time', [a_time], {}))
389         else:
390             return self.decorated.time(a_time)
391
392     def id_to_orig_id(self, id):
393         if id.startswith("subunit.RemotedTestCase."):
394             return id[len("subunit.RemotedTestCase."):]
395         return id
396
397
398 class TestIdPrintingResult(testtools.TestResult):
399
400     def __init__(self, stream, show_times=False):
401         """Create a FilterResult object outputting to stream."""
402         testtools.TestResult.__init__(self)
403         self._stream = stream
404         self.failed_tests = 0
405         self.__time = 0
406         self.show_times = show_times
407         self._test = None
408         self._test_duration = 0
409
410     def addError(self, test, err):
411         self.failed_tests += 1
412         self._test = test
413
414     def addFailure(self, test, err):
415         self.failed_tests += 1
416         self._test = test
417
418     def addSuccess(self, test):
419         self._test = test
420
421     def reportTest(self, test, duration):
422         if self.show_times:
423             seconds = duration.seconds
424             seconds += duration.days * 3600 * 24
425             seconds += duration.microseconds / 1000000.0
426             self._stream.write(test.id() + ' %0.3f\n' % seconds)
427         else:
428             self._stream.write(test.id() + '\n')
429
430     def startTest(self, test):
431         self._start_time = self._time()
432
433     def stopTest(self, test):
434         test_duration = self._time() - self._start_time
435         self.reportTest(self._test, test_duration)
436
437     def time(self, time):
438         self.__time = time
439
440     def _time(self):
441         return self.__time
442
443     def wasSuccessful(self):
444         "Tells whether or not this result was a success"
445         return self.failed_tests == 0