Make sure tags directives are sent before addSuccess etc.
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.content import (
24     text_content,
25     TracebackContent,
26     )
27
28 from subunit import iso8601
29
30
31 # NOT a TestResult, because we are implementing the interface, not inheriting
32 # it.
33 class TestResultDecorator(object):
34     """General pass-through decorator.
35
36     This provides a base that other TestResults can inherit from to
37     gain basic forwarding functionality. It also takes care of
38     handling the case where the target doesn't support newer methods
39     or features by degrading them.
40     """
41
42     def __init__(self, decorated):
43         """Create a TestResultDecorator forwarding to decorated."""
44         # Make every decorator degrade gracefully.
45         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
46
47     def startTest(self, test):
48         return self.decorated.startTest(test)
49
50     def startTestRun(self):
51         return self.decorated.startTestRun()
52
53     def stopTest(self, test):
54         return self.decorated.stopTest(test)
55
56     def stopTestRun(self):
57         return self.decorated.stopTestRun()
58
59     def addError(self, test, err=None, details=None):
60         return self.decorated.addError(test, err, details=details)
61
62     def addFailure(self, test, err=None, details=None):
63         return self.decorated.addFailure(test, err, details=details)
64
65     def addSuccess(self, test, details=None):
66         return self.decorated.addSuccess(test, details=details)
67
68     def addSkip(self, test, reason=None, details=None):
69         return self.decorated.addSkip(test, reason, details=details)
70
71     def addExpectedFailure(self, test, err=None, details=None):
72         return self.decorated.addExpectedFailure(test, err, details=details)
73
74     def addUnexpectedSuccess(self, test, details=None):
75         return self.decorated.addUnexpectedSuccess(test, details=details)
76
77     def progress(self, offset, whence):
78         return self.decorated.progress(offset, whence)
79
80     def wasSuccessful(self):
81         return self.decorated.wasSuccessful()
82
83     @property
84     def shouldStop(self):
85         return self.decorated.shouldStop
86
87     def stop(self):
88         return self.decorated.stop()
89
90     @property
91     def testsRun(self):
92         return self.decorated.testsRun
93
94     def tags(self, new_tags, gone_tags):
95         return self.decorated.tags(new_tags, gone_tags)
96
97     def time(self, a_datetime):
98         return self.decorated.time(a_datetime)
99
100
101 class HookedTestResultDecorator(TestResultDecorator):
102     """A TestResult which calls a hook on every event."""
103
104     def __init__(self, decorated):
105         self.super = super(HookedTestResultDecorator, self)
106         self.super.__init__(decorated)
107
108     def startTest(self, test):
109         self._before_event()
110         return self.super.startTest(test)
111
112     def startTestRun(self):
113         self._before_event()
114         return self.super.startTestRun()
115
116     def stopTest(self, test):
117         self._before_event()
118         return self.super.stopTest(test)
119
120     def stopTestRun(self):
121         self._before_event()
122         return self.super.stopTestRun()
123
124     def addError(self, test, err=None, details=None):
125         self._before_event()
126         return self.super.addError(test, err, details=details)
127
128     def addFailure(self, test, err=None, details=None):
129         self._before_event()
130         return self.super.addFailure(test, err, details=details)
131
132     def addSuccess(self, test, details=None):
133         self._before_event()
134         return self.super.addSuccess(test, details=details)
135
136     def addSkip(self, test, reason=None, details=None):
137         self._before_event()
138         return self.super.addSkip(test, reason, details=details)
139
140     def addExpectedFailure(self, test, err=None, details=None):
141         self._before_event()
142         return self.super.addExpectedFailure(test, err, details=details)
143
144     def addUnexpectedSuccess(self, test, details=None):
145         self._before_event()
146         return self.super.addUnexpectedSuccess(test, details=details)
147
148     def progress(self, offset, whence):
149         self._before_event()
150         return self.super.progress(offset, whence)
151
152     def wasSuccessful(self):
153         self._before_event()
154         return self.super.wasSuccessful()
155
156     @property
157     def shouldStop(self):
158         self._before_event()
159         return self.super.shouldStop
160
161     def stop(self):
162         self._before_event()
163         return self.super.stop()
164
165     def time(self, a_datetime):
166         self._before_event()
167         return self.super.time(a_datetime)
168
169
170 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
171     """Decorate a TestResult to add time events to a test run.
172
173     By default this will cause a time event before every test event,
174     but if explicit time data is being provided by the test run, then
175     this decorator will turn itself off to prevent causing confusion.
176     """
177
178     def __init__(self, decorated):
179         self._time = None
180         super(AutoTimingTestResultDecorator, self).__init__(decorated)
181
182     def _before_event(self):
183         time = self._time
184         if time is not None:
185             return
186         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
187         self.decorated.time(time)
188
189     def progress(self, offset, whence):
190         return self.decorated.progress(offset, whence)
191
192     @property
193     def shouldStop(self):
194         return self.decorated.shouldStop
195
196     def time(self, a_datetime):
197         """Provide a timestamp for the current test activity.
198
199         :param a_datetime: If None, automatically add timestamps before every
200             event (this is the default behaviour if time() is not called at
201             all).  If not None, pass the provided time onto the decorated
202             result object and disable automatic timestamps.
203         """
204         self._time = a_datetime
205         return self.decorated.time(a_datetime)
206
207
208 class TagCollapsingDecorator(HookedTestResultDecorator):
209     """Collapses many 'tags' calls into one where possible."""
210
211     def __init__(self, result):
212         super(TagCollapsingDecorator, self).__init__(result)
213         # The (new, gone) tags for the current test.
214         self._current_test_tags = None
215
216     def startTest(self, test):
217         """Start a test.
218
219         Not directly passed to the client, but used for handling of tags
220         correctly.
221         """
222         self.decorated.startTest(test)
223         self._current_test_tags = set(), set()
224
225     def stopTest(self, test):
226         super(TagCollapsingDecorator, self).stopTest(test)
227         self._current_test_tags = None
228
229     def _before_event(self):
230         if not self._current_test_tags:
231             return
232         if self._current_test_tags[0] or self._current_test_tags[1]:
233             self.decorated.tags(*self._current_test_tags)
234         self._current_test_tags = set(), set()
235
236     def tags(self, new_tags, gone_tags):
237         """Handle tag instructions.
238
239         Adds and removes tags as appropriate. If a test is currently running,
240         tags are not affected for subsequent tests.
241
242         :param new_tags: Tags to add,
243         :param gone_tags: Tags to remove.
244         """
245         if self._current_test_tags is not None:
246             # gather the tags until the test stops.
247             self._current_test_tags[0].update(new_tags)
248             self._current_test_tags[0].difference_update(gone_tags)
249             self._current_test_tags[1].update(gone_tags)
250             self._current_test_tags[1].difference_update(new_tags)
251         else:
252             return self.decorated.tags(new_tags, gone_tags)
253
254
255 class TimeCollapsingDecorator(HookedTestResultDecorator):
256     """Only pass on the first and last of a consecutive sequence of times."""
257
258     def __init__(self, decorated):
259         super(TimeCollapsingDecorator, self).__init__(decorated)
260         self._last_received_time = None
261         self._last_sent_time = None
262
263     def _before_event(self):
264         if self._last_received_time is None:
265             return
266         if self._last_received_time != self._last_sent_time:
267             self.decorated.time(self._last_received_time)
268             self._last_sent_time = self._last_received_time
269         self._last_received_time = None
270
271     def time(self, a_time):
272         # Don't upcall, because we don't want to call _before_event, it's only
273         # for non-time events.
274         if self._last_received_time is None:
275             self.decorated.time(a_time)
276             self._last_sent_time = a_time
277         self._last_received_time = a_time
278
279
280 def all_true(bools):
281     """Return True if all of 'bools' are True. False otherwise."""
282     for b in bools:
283         if not b:
284             return False
285     return True
286
287
288 class TestResultFilter(TestResultDecorator):
289     """A pyunit TestResult interface implementation which filters tests.
290
291     Tests that pass the filter are handed on to another TestResult instance
292     for further processing/reporting. To obtain the filtered results,
293     the other instance must be interrogated.
294
295     :ivar result: The result that tests are passed to after filtering.
296     :ivar filter_predicate: The callback run to decide whether to pass
297         a result.
298     """
299
300     def __init__(self, result, filter_error=False, filter_failure=False,
301         filter_success=True, filter_skip=False, filter_xfail=False,
302         filter_predicate=None, fixup_expected_failures=None):
303         """Create a FilterResult object filtering to result.
304
305         :param filter_error: Filter out errors.
306         :param filter_failure: Filter out failures.
307         :param filter_success: Filter out successful tests.
308         :param filter_skip: Filter out skipped tests.
309         :param filter_xfail: Filter out expected failure tests.
310         :param filter_predicate: A callable taking (test, outcome, err,
311             details) and returning True if the result should be passed
312             through.  err and details may be none if no error or extra
313             metadata is available. outcome is the name of the outcome such
314             as 'success' or 'failure'.
315         :param fixup_expected_failures: Set of test ids to consider known
316             failing.
317         """
318         super(TestResultFilter, self).__init__(result)
319         self.decorated = TimeCollapsingDecorator(
320             TagCollapsingDecorator(self.decorated))
321         predicates = []
322         if filter_error:
323             predicates.append(lambda t, outcome, e, d: outcome != 'error')
324         if filter_failure:
325             predicates.append(lambda t, outcome, e, d: outcome != 'failure')
326         if filter_success:
327             predicates.append(lambda t, outcome, e, d: outcome != 'success')
328         if filter_skip:
329             predicates.append(lambda t, outcome, e, d: outcome != 'skip')
330         if filter_xfail:
331             predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
332         if filter_predicate is not None:
333             predicates.append(filter_predicate)
334         self.filter_predicate = (
335             lambda test, outcome, err, details:
336                 all_true(p(test, outcome, err, details) for p in predicates))
337         # The current test (for filtering tags)
338         self._current_test = None
339         # Has the current test been filtered (for outputting test tags)
340         self._current_test_filtered = None
341         # Calls to this result that we don't know whether to forward on yet.
342         self._buffered_calls = []
343         if fixup_expected_failures is None:
344             self._fixup_expected_failures = frozenset()
345         else:
346             self._fixup_expected_failures = fixup_expected_failures
347
348     def addError(self, test, err=None, details=None):
349         if (self.filter_predicate(test, 'error', err, details)):
350             if self._failure_expected(test):
351                 self._buffered_calls.append(
352                     ('addExpectedFailure', [test, err], {'details': details}))
353             else:
354                 self._buffered_calls.append(
355                     ('addError', [test, err], {'details': details}))
356         else:
357             self._filtered()
358
359     def addFailure(self, test, err=None, details=None):
360         if (self.filter_predicate(test, 'failure', err, details)):
361             if self._failure_expected(test):
362                 self._buffered_calls.append(
363                     ('addExpectedFailure', [test, err], {'details': details}))
364             else:
365                 self._buffered_calls.append(
366                     ('addFailure', [test, err], {'details': details}))
367         else:
368             self._filtered()
369
370     def addSkip(self, test, reason=None, details=None):
371         if (self.filter_predicate(test, 'skip', reason, details)):
372             self._buffered_calls.append(
373                 ('addSkip', [test, reason], {'details': details}))
374         else:
375             self._filtered()
376
377     def addSuccess(self, test, details=None):
378         if (self.filter_predicate(test, 'success', None, details)):
379             if self._failure_expected(test):
380                 self._buffered_calls.append(
381                     ('addUnexpectedSuccess', [test], {'details': details}))
382             else:
383                 self._buffered_calls.append(
384                     ('addSuccess', [test], {'details': details}))
385         else:
386             self._filtered()
387
388     def addExpectedFailure(self, test, err=None, details=None):
389         if self.filter_predicate(test, 'expectedfailure', err, details):
390             self._buffered_calls.append(
391                 ('addExpectedFailure', [test, err], {'details': details}))
392         else:
393             self._filtered()
394
395     def addUnexpectedSuccess(self, test, details=None):
396         self._buffered_calls.append(
397             ('addUnexpectedSuccess', [test], {'details': details}))
398
399     def _filtered(self):
400         self._current_test_filtered = True
401
402     def _failure_expected(self, test):
403         return (test.id() in self._fixup_expected_failures)
404
405     def startTest(self, test):
406         """Start a test.
407
408         Not directly passed to the client, but used for handling of tags
409         correctly.
410         """
411         self._current_test = test
412         self._current_test_filtered = False
413         self._buffered_calls.append(('startTest', [test], {}))
414
415     def stopTest(self, test):
416         """Stop a test.
417
418         Not directly passed to the client, but used for handling of tags
419         correctly.
420         """
421         if not self._current_test_filtered:
422             # Tags to output for this test.
423             for method, args, kwargs in self._buffered_calls:
424                 getattr(self.decorated, method)(*args, **kwargs)
425             self.decorated.stopTest(test)
426         self._current_test = None
427         self._current_test_filtered = None
428         self._buffered_calls = []
429
430     def time(self, a_time):
431         if self._current_test is not None:
432             self._buffered_calls.append(('time', [a_time], {}))
433         else:
434             return self.decorated.time(a_time)
435
436     def id_to_orig_id(self, id):
437         if id.startswith("subunit.RemotedTestCase."):
438             return id[len("subunit.RemotedTestCase."):]
439         return id
440
441
442 class TestIdPrintingResult(testtools.TestResult):
443
444     def __init__(self, stream, show_times=False):
445         """Create a FilterResult object outputting to stream."""
446         super(TestIdPrintingResult, self).__init__()
447         self._stream = stream
448         self.failed_tests = 0
449         self.__time = None
450         self.show_times = show_times
451         self._test = None
452         self._test_duration = 0
453
454     def addError(self, test, err):
455         self.failed_tests += 1
456         self._test = test
457
458     def addFailure(self, test, err):
459         self.failed_tests += 1
460         self._test = test
461
462     def addSuccess(self, test):
463         self._test = test
464
465     def addSkip(self, test, reason=None, details=None):
466         self._test = test
467
468     def addUnexpectedSuccess(self, test, details=None):
469         self.failed_tests += 1
470         self._test = test
471
472     def addExpectedFailure(self, test, err=None, details=None):
473         self._test = test
474
475     def reportTest(self, test, duration):
476         if self.show_times:
477             seconds = duration.seconds
478             seconds += duration.days * 3600 * 24
479             seconds += duration.microseconds / 1000000.0
480             self._stream.write(test.id() + ' %0.3f\n' % seconds)
481         else:
482             self._stream.write(test.id() + '\n')
483
484     def startTest(self, test):
485         self._start_time = self._time()
486
487     def stopTest(self, test):
488         test_duration = self._time() - self._start_time
489         self.reportTest(self._test, test_duration)
490
491     def time(self, time):
492         self.__time = time
493
494     def _time(self):
495         return self.__time
496
497     def wasSuccessful(self):
498         "Tells whether or not this result was a success"
499         return self.failed_tests == 0
500
501
502 class TestByTestResult(testtools.TestResult):
503     """Call something every time a test completes."""
504
505     # XXX: Arguably belongs in testtools.
506
507     def __init__(self, on_test):
508         """Construct a ``TestByTestResult``.
509
510         :param on_test: A callable that take a test case, a status (one of
511             "success", "failure", "error", "skip", or "xfail"), a start time
512             (a ``datetime`` with timezone), a stop time, an iterable of tags,
513             and a details dict. Is called at the end of each test (i.e. on
514             ``stopTest``) with the accumulated values for that test.
515         """
516         super(TestByTestResult, self).__init__()
517         self._on_test = on_test
518
519     def startTest(self, test):
520         super(TestByTestResult, self).startTest(test)
521         self._start_time = self._now()
522         # There's no supported (i.e. tested) behaviour that relies on these
523         # being set, but it makes me more comfortable all the same. -- jml
524         self._status = None
525         self._details = None
526         self._stop_time = None
527
528     def stopTest(self, test):
529         self._stop_time = self._now()
530         super(TestByTestResult, self).stopTest(test)
531         self._on_test(
532             test=test,
533             status=self._status,
534             start_time=self._start_time,
535             stop_time=self._stop_time,
536             # current_tags is new in testtools 0.9.13.
537             tags=getattr(self, 'current_tags', None),
538             details=self._details)
539
540     def _err_to_details(self, test, err, details):
541         if details:
542             return details
543         return {'traceback': TracebackContent(err, test)}
544
545     def addSuccess(self, test, details=None):
546         super(TestByTestResult, self).addSuccess(test)
547         self._status = 'success'
548         self._details = details
549
550     def addFailure(self, test, err=None, details=None):
551         super(TestByTestResult, self).addFailure(test, err, details)
552         self._status = 'failure'
553         self._details = self._err_to_details(test, err, details)
554
555     def addError(self, test, err=None, details=None):
556         super(TestByTestResult, self).addError(test, err, details)
557         self._status = 'error'
558         self._details = self._err_to_details(test, err, details)
559
560     def addSkip(self, test, reason=None, details=None):
561         super(TestByTestResult, self).addSkip(test, reason, details)
562         self._status = 'skip'
563         if details is None:
564             details = {'reason': text_content(reason)}
565         elif reason:
566             # XXX: What if details already has 'reason' key?
567             details['reason'] = text_content(reason)
568         self._details = details
569
570     def addExpectedFailure(self, test, err=None, details=None):
571         super(TestByTestResult, self).addExpectedFailure(test, err, details)
572         self._status = 'xfail'
573         self._details = self._err_to_details(test, err, details)
574
575     def addUnexpectedSuccess(self, test, details=None):
576         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
577         self._status = 'success'
578         self._details = details
579
580
581 class CsvResult(TestByTestResult):
582
583     def __init__(self, stream):
584         super(CsvResult, self).__init__(self._on_test)
585         self._write_row = csv.writer(stream).writerow
586
587     def _on_test(self, test, status, start_time, stop_time, tags, details):
588         self._write_row([test.id(), status, start_time, stop_time])
589
590     def startTestRun(self):
591         super(CsvResult, self).startTestRun()
592         self._write_row(['test', 'status', 'start_time', 'stop_time'])