Factor a TagsMixin out of TagCollapsingDecorator
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.compat import all
24 from testtools.content import (
25     text_content,
26     TracebackContent,
27     )
28
29 from subunit import iso8601
30
31
32 # NOT a TestResult, because we are implementing the interface, not inheriting
33 # it.
34 class TestResultDecorator(object):
35     """General pass-through decorator.
36
37     This provides a base that other TestResults can inherit from to
38     gain basic forwarding functionality. It also takes care of
39     handling the case where the target doesn't support newer methods
40     or features by degrading them.
41     """
42
43     # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44     # we should gut this and just use that.
45
46     def __init__(self, decorated):
47         """Create a TestResultDecorator forwarding to decorated."""
48         # Make every decorator degrade gracefully.
49         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
50
51     def startTest(self, test):
52         return self.decorated.startTest(test)
53
54     def startTestRun(self):
55         return self.decorated.startTestRun()
56
57     def stopTest(self, test):
58         return self.decorated.stopTest(test)
59
60     def stopTestRun(self):
61         return self.decorated.stopTestRun()
62
63     def addError(self, test, err=None, details=None):
64         return self.decorated.addError(test, err, details=details)
65
66     def addFailure(self, test, err=None, details=None):
67         return self.decorated.addFailure(test, err, details=details)
68
69     def addSuccess(self, test, details=None):
70         return self.decorated.addSuccess(test, details=details)
71
72     def addSkip(self, test, reason=None, details=None):
73         return self.decorated.addSkip(test, reason, details=details)
74
75     def addExpectedFailure(self, test, err=None, details=None):
76         return self.decorated.addExpectedFailure(test, err, details=details)
77
78     def addUnexpectedSuccess(self, test, details=None):
79         return self.decorated.addUnexpectedSuccess(test, details=details)
80
81     def progress(self, offset, whence):
82         return self.decorated.progress(offset, whence)
83
84     def wasSuccessful(self):
85         return self.decorated.wasSuccessful()
86
87     @property
88     def shouldStop(self):
89         return self.decorated.shouldStop
90
91     def stop(self):
92         return self.decorated.stop()
93
94     @property
95     def testsRun(self):
96         return self.decorated.testsRun
97
98     def tags(self, new_tags, gone_tags):
99         return self.decorated.tags(new_tags, gone_tags)
100
101     def time(self, a_datetime):
102         return self.decorated.time(a_datetime)
103
104
105 class HookedTestResultDecorator(TestResultDecorator):
106     """A TestResult which calls a hook on every event."""
107
108     def __init__(self, decorated):
109         self.super = super(HookedTestResultDecorator, self)
110         self.super.__init__(decorated)
111
112     def startTest(self, test):
113         self._before_event()
114         return self.super.startTest(test)
115
116     def startTestRun(self):
117         self._before_event()
118         return self.super.startTestRun()
119
120     def stopTest(self, test):
121         self._before_event()
122         return self.super.stopTest(test)
123
124     def stopTestRun(self):
125         self._before_event()
126         return self.super.stopTestRun()
127
128     def addError(self, test, err=None, details=None):
129         self._before_event()
130         return self.super.addError(test, err, details=details)
131
132     def addFailure(self, test, err=None, details=None):
133         self._before_event()
134         return self.super.addFailure(test, err, details=details)
135
136     def addSuccess(self, test, details=None):
137         self._before_event()
138         return self.super.addSuccess(test, details=details)
139
140     def addSkip(self, test, reason=None, details=None):
141         self._before_event()
142         return self.super.addSkip(test, reason, details=details)
143
144     def addExpectedFailure(self, test, err=None, details=None):
145         self._before_event()
146         return self.super.addExpectedFailure(test, err, details=details)
147
148     def addUnexpectedSuccess(self, test, details=None):
149         self._before_event()
150         return self.super.addUnexpectedSuccess(test, details=details)
151
152     def progress(self, offset, whence):
153         self._before_event()
154         return self.super.progress(offset, whence)
155
156     def wasSuccessful(self):
157         self._before_event()
158         return self.super.wasSuccessful()
159
160     @property
161     def shouldStop(self):
162         self._before_event()
163         return self.super.shouldStop
164
165     def stop(self):
166         self._before_event()
167         return self.super.stop()
168
169     def time(self, a_datetime):
170         self._before_event()
171         return self.super.time(a_datetime)
172
173
174 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
175     """Decorate a TestResult to add time events to a test run.
176
177     By default this will cause a time event before every test event,
178     but if explicit time data is being provided by the test run, then
179     this decorator will turn itself off to prevent causing confusion.
180     """
181
182     def __init__(self, decorated):
183         self._time = None
184         super(AutoTimingTestResultDecorator, self).__init__(decorated)
185
186     def _before_event(self):
187         time = self._time
188         if time is not None:
189             return
190         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
191         self.decorated.time(time)
192
193     def progress(self, offset, whence):
194         return self.decorated.progress(offset, whence)
195
196     @property
197     def shouldStop(self):
198         return self.decorated.shouldStop
199
200     def time(self, a_datetime):
201         """Provide a timestamp for the current test activity.
202
203         :param a_datetime: If None, automatically add timestamps before every
204             event (this is the default behaviour if time() is not called at
205             all).  If not None, pass the provided time onto the decorated
206             result object and disable automatic timestamps.
207         """
208         self._time = a_datetime
209         return self.decorated.time(a_datetime)
210
211
212 class TagsMixin(object):
213
214     def __init__(self):
215         self._clear_tags()
216
217     def _clear_tags(self):
218         self._global_tags = set(), set()
219         self._test_tags = None
220
221     def _get_active_tags(self):
222         global_new, global_gone = self._global_tags
223         if self._test_tags is None:
224             return set(global_new)
225         test_new, test_gone = self._test_tags
226         return global_new.difference(test_gone).union(test_new)
227
228     def _get_current_scope(self):
229         if self._test_tags:
230             return self._test_tags
231         return self._global_tags
232
233     def _flush_current_scope(self, tag_receiver):
234         new_tags, gone_tags = self._get_current_scope()
235         if new_tags or gone_tags:
236             tag_receiver.tags(new_tags, gone_tags)
237         if self._test_tags:
238             self._test_tags = set(), set()
239         else:
240             self._global_tags = set(), set()
241
242     def startTestRun(self):
243         self._clear_tags()
244
245     def startTest(self, test):
246         self._test_tags = set(), set()
247
248     def stopTest(self, test):
249         self._test_tags = None
250
251     def tags(self, new_tags, gone_tags):
252         """Handle tag instructions.
253
254         Adds and removes tags as appropriate. If a test is currently running,
255         tags are not affected for subsequent tests.
256
257         :param new_tags: Tags to add,
258         :param gone_tags: Tags to remove.
259         """
260         current_new_tags, current_gone_tags = self._get_current_scope()
261         current_new_tags.update(new_tags)
262         current_new_tags.difference_update(gone_tags)
263         current_gone_tags.update(gone_tags)
264         current_gone_tags.difference_update(new_tags)
265
266
267 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
268     """Collapses many 'tags' calls into one where possible."""
269
270     def __init__(self, result):
271         super(TagCollapsingDecorator, self).__init__(result)
272         self._clear_tags()
273
274     def _before_event(self):
275         self._flush_current_scope(self.decorated)
276
277     def tags(self, new_tags, gone_tags):
278         TagsMixin.tags(self, new_tags, gone_tags)
279
280
281 class TimeCollapsingDecorator(HookedTestResultDecorator):
282     """Only pass on the first and last of a consecutive sequence of times."""
283
284     def __init__(self, decorated):
285         super(TimeCollapsingDecorator, self).__init__(decorated)
286         self._last_received_time = None
287         self._last_sent_time = None
288
289     def _before_event(self):
290         if self._last_received_time is None:
291             return
292         if self._last_received_time != self._last_sent_time:
293             self.decorated.time(self._last_received_time)
294             self._last_sent_time = self._last_received_time
295         self._last_received_time = None
296
297     def time(self, a_time):
298         # Don't upcall, because we don't want to call _before_event, it's only
299         # for non-time events.
300         if self._last_received_time is None:
301             self.decorated.time(a_time)
302             self._last_sent_time = a_time
303         self._last_received_time = a_time
304
305
306 def and_predicates(predicates):
307     """Return a predicate that is true iff all predicates are true."""
308     # XXX: Should probably be in testtools to be better used by matchers. jml
309     return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
310
311
312 def _make_tag_filter(with_tags, without_tags):
313     """Make a callback that checks tests against tags."""
314
315     with_tags = with_tags and set(with_tags) or None
316     without_tags = without_tags and set(without_tags) or None
317
318     def check_tags(test, outcome, err, details, tags):
319         if with_tags and not with_tags <= tags:
320             return False
321         if without_tags and bool(without_tags & tags):
322             return False
323         return True
324
325     return check_tags
326
327
328 class _PredicateFilter(TestResultDecorator):
329
330     def __init__(self, result, predicate):
331         super(_PredicateFilter, self).__init__(result)
332         self.decorated = TimeCollapsingDecorator(
333             TagCollapsingDecorator(self.decorated))
334         self._predicate = predicate
335         self._current_tags = set()
336         # The current test (for filtering tags)
337         self._current_test = None
338         # Has the current test been filtered (for outputting test tags)
339         self._current_test_filtered = None
340         # Calls to this result that we don't know whether to forward on yet.
341         self._buffered_calls = []
342
343     def filter_predicate(self, test, outcome, error, details):
344         # XXX: ExtendedToOriginalDecorator doesn't properly wrap current_tags.
345         # https://bugs.launchpad.net/testtools/+bug/978027
346         return self._predicate(
347             test, outcome, error, details, self._current_tags)
348
349     def addError(self, test, err=None, details=None):
350         if (self.filter_predicate(test, 'error', err, details)):
351             self._buffered_calls.append(
352                 ('addError', [test, err], {'details': details}))
353         else:
354             self._filtered()
355
356     def addFailure(self, test, err=None, details=None):
357         if (self.filter_predicate(test, 'failure', err, details)):
358             self._buffered_calls.append(
359                 ('addFailure', [test, err], {'details': details}))
360         else:
361             self._filtered()
362
363     def addSkip(self, test, reason=None, details=None):
364         if (self.filter_predicate(test, 'skip', reason, details)):
365             self._buffered_calls.append(
366                 ('addSkip', [test, reason], {'details': details}))
367         else:
368             self._filtered()
369
370     def addExpectedFailure(self, test, err=None, details=None):
371         if self.filter_predicate(test, 'expectedfailure', err, details):
372             self._buffered_calls.append(
373                 ('addExpectedFailure', [test, err], {'details': details}))
374         else:
375             self._filtered()
376
377     def addUnexpectedSuccess(self, test, details=None):
378         self._buffered_calls.append(
379             ('addUnexpectedSuccess', [test], {'details': details}))
380
381     def addSuccess(self, test, details=None):
382         if (self.filter_predicate(test, 'success', None, details)):
383             self._buffered_calls.append(
384                 ('addSuccess', [test], {'details': details}))
385         else:
386             self._filtered()
387
388     def _filtered(self):
389         self._current_test_filtered = True
390
391     def startTest(self, test):
392         """Start a test.
393
394         Not directly passed to the client, but used for handling of tags
395         correctly.
396         """
397         self._current_test = test
398         self._current_test_filtered = False
399         self._buffered_calls.append(('startTest', [test], {}))
400
401     def stopTest(self, test):
402         """Stop a test.
403
404         Not directly passed to the client, but used for handling of tags
405         correctly.
406         """
407         if not self._current_test_filtered:
408             for method, args, kwargs in self._buffered_calls:
409                 getattr(self.decorated, method)(*args, **kwargs)
410             self.decorated.stopTest(test)
411         self._current_test = None
412         self._current_test_filtered = None
413         self._buffered_calls = []
414
415     def tags(self, new_tags, gone_tags):
416         new_tags, gone_tags = set(new_tags), set(gone_tags)
417         self._current_tags.update(new_tags)
418         self._current_tags.difference_update(gone_tags)
419         if self._current_test is not None:
420             self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
421         else:
422             return super(_PredicateFilter, self).tags(new_tags, gone_tags)
423
424     def time(self, a_time):
425         if self._current_test is not None:
426             self._buffered_calls.append(('time', [a_time], {}))
427         else:
428             return self.decorated.time(a_time)
429
430     def id_to_orig_id(self, id):
431         if id.startswith("subunit.RemotedTestCase."):
432             return id[len("subunit.RemotedTestCase."):]
433         return id
434
435
436 class TestResultFilter(TestResultDecorator):
437     """A pyunit TestResult interface implementation which filters tests.
438
439     Tests that pass the filter are handed on to another TestResult instance
440     for further processing/reporting. To obtain the filtered results,
441     the other instance must be interrogated.
442
443     :ivar result: The result that tests are passed to after filtering.
444     :ivar filter_predicate: The callback run to decide whether to pass
445         a result.
446     """
447
448     def __init__(self, result, filter_error=False, filter_failure=False,
449         filter_success=True, filter_skip=False, filter_xfail=False,
450         filter_predicate=None, fixup_expected_failures=None):
451         """Create a FilterResult object filtering to result.
452
453         :param filter_error: Filter out errors.
454         :param filter_failure: Filter out failures.
455         :param filter_success: Filter out successful tests.
456         :param filter_skip: Filter out skipped tests.
457         :param filter_xfail: Filter out expected failure tests.
458         :param filter_predicate: A callable taking (test, outcome, err,
459             details) and returning True if the result should be passed
460             through.  err and details may be none if no error or extra
461             metadata is available. outcome is the name of the outcome such
462             as 'success' or 'failure'.
463         :param fixup_expected_failures: Set of test ids to consider known
464             failing.
465         """
466         predicates = []
467         if filter_error:
468             predicates.append(
469                 lambda t, outcome, e, d, tags: outcome != 'error')
470         if filter_failure:
471             predicates.append(
472                 lambda t, outcome, e, d, tags: outcome != 'failure')
473         if filter_success:
474             predicates.append(
475                 lambda t, outcome, e, d, tags: outcome != 'success')
476         if filter_skip:
477             predicates.append(
478                 lambda t, outcome, e, d, tags: outcome != 'skip')
479         if filter_xfail:
480             predicates.append(
481                 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
482         if filter_predicate is not None:
483             def compat(test, outcome, error, details, tags):
484                 # 0.0.7 and earlier did not support the 'tags' parameter.
485                 try:
486                     return filter_predicate(
487                         test, outcome, error, details, tags)
488                 except TypeError:
489                     return filter_predicate(test, outcome, error, details)
490             predicates.append(compat)
491         predicate = and_predicates(predicates)
492         super(TestResultFilter, self).__init__(
493             _PredicateFilter(result, predicate))
494         if fixup_expected_failures is None:
495             self._fixup_expected_failures = frozenset()
496         else:
497             self._fixup_expected_failures = fixup_expected_failures
498
499     def addError(self, test, err=None, details=None):
500         if self._failure_expected(test):
501             self.addExpectedFailure(test, err=err, details=details)
502         else:
503             super(TestResultFilter, self).addError(
504                 test, err=err, details=details)
505
506     def addFailure(self, test, err=None, details=None):
507         if self._failure_expected(test):
508             self.addExpectedFailure(test, err=err, details=details)
509         else:
510             super(TestResultFilter, self).addFailure(
511                 test, err=err, details=details)
512
513     def addSuccess(self, test, details=None):
514         if self._failure_expected(test):
515             self.addUnexpectedSuccess(test, details=details)
516         else:
517             super(TestResultFilter, self).addSuccess(test, details=details)
518
519     def _failure_expected(self, test):
520         return (test.id() in self._fixup_expected_failures)
521
522
523 class TestIdPrintingResult(testtools.TestResult):
524
525     def __init__(self, stream, show_times=False):
526         """Create a FilterResult object outputting to stream."""
527         super(TestIdPrintingResult, self).__init__()
528         self._stream = stream
529         self.failed_tests = 0
530         self.__time = None
531         self.show_times = show_times
532         self._test = None
533         self._test_duration = 0
534
535     def addError(self, test, err):
536         self.failed_tests += 1
537         self._test = test
538
539     def addFailure(self, test, err):
540         self.failed_tests += 1
541         self._test = test
542
543     def addSuccess(self, test):
544         self._test = test
545
546     def addSkip(self, test, reason=None, details=None):
547         self._test = test
548
549     def addUnexpectedSuccess(self, test, details=None):
550         self.failed_tests += 1
551         self._test = test
552
553     def addExpectedFailure(self, test, err=None, details=None):
554         self._test = test
555
556     def reportTest(self, test, duration):
557         if self.show_times:
558             seconds = duration.seconds
559             seconds += duration.days * 3600 * 24
560             seconds += duration.microseconds / 1000000.0
561             self._stream.write(test.id() + ' %0.3f\n' % seconds)
562         else:
563             self._stream.write(test.id() + '\n')
564
565     def startTest(self, test):
566         self._start_time = self._time()
567
568     def stopTest(self, test):
569         test_duration = self._time() - self._start_time
570         self.reportTest(self._test, test_duration)
571
572     def time(self, time):
573         self.__time = time
574
575     def _time(self):
576         return self.__time
577
578     def wasSuccessful(self):
579         "Tells whether or not this result was a success"
580         return self.failed_tests == 0
581
582
583 class TestByTestResult(testtools.TestResult):
584     """Call something every time a test completes."""
585
586 # XXX: Arguably belongs in testtools.
587
588     def __init__(self, on_test):
589         """Construct a ``TestByTestResult``.
590
591         :param on_test: A callable that take a test case, a status (one of
592             "success", "failure", "error", "skip", or "xfail"), a start time
593             (a ``datetime`` with timezone), a stop time, an iterable of tags,
594             and a details dict. Is called at the end of each test (i.e. on
595             ``stopTest``) with the accumulated values for that test.
596         """
597         super(TestByTestResult, self).__init__()
598         self._on_test = on_test
599
600     def startTest(self, test):
601         super(TestByTestResult, self).startTest(test)
602         self._start_time = self._now()
603         # There's no supported (i.e. tested) behaviour that relies on these
604         # being set, but it makes me more comfortable all the same. -- jml
605         self._status = None
606         self._details = None
607         self._stop_time = None
608
609     def stopTest(self, test):
610         self._stop_time = self._now()
611         super(TestByTestResult, self).stopTest(test)
612         self._on_test(
613             test=test,
614             status=self._status,
615             start_time=self._start_time,
616             stop_time=self._stop_time,
617             # current_tags is new in testtools 0.9.13.
618             tags=getattr(self, 'current_tags', None),
619             details=self._details)
620
621     def _err_to_details(self, test, err, details):
622         if details:
623             return details
624         return {'traceback': TracebackContent(err, test)}
625
626     def addSuccess(self, test, details=None):
627         super(TestByTestResult, self).addSuccess(test)
628         self._status = 'success'
629         self._details = details
630
631     def addFailure(self, test, err=None, details=None):
632         super(TestByTestResult, self).addFailure(test, err, details)
633         self._status = 'failure'
634         self._details = self._err_to_details(test, err, details)
635
636     def addError(self, test, err=None, details=None):
637         super(TestByTestResult, self).addError(test, err, details)
638         self._status = 'error'
639         self._details = self._err_to_details(test, err, details)
640
641     def addSkip(self, test, reason=None, details=None):
642         super(TestByTestResult, self).addSkip(test, reason, details)
643         self._status = 'skip'
644         if details is None:
645             details = {'reason': text_content(reason)}
646         elif reason:
647             # XXX: What if details already has 'reason' key?
648             details['reason'] = text_content(reason)
649         self._details = details
650
651     def addExpectedFailure(self, test, err=None, details=None):
652         super(TestByTestResult, self).addExpectedFailure(test, err, details)
653         self._status = 'xfail'
654         self._details = self._err_to_details(test, err, details)
655
656     def addUnexpectedSuccess(self, test, details=None):
657         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
658         self._status = 'success'
659         self._details = details
660
661
662 class CsvResult(TestByTestResult):
663
664     def __init__(self, stream):
665         super(CsvResult, self).__init__(self._on_test)
666         self._write_row = csv.writer(stream).writerow
667
668     def _on_test(self, test, status, start_time, stop_time, tags, details):
669         self._write_row([test.id(), status, start_time, stop_time])
670
671     def startTestRun(self):
672         super(CsvResult, self).startTestRun()
673         self._write_row(['test', 'status', 'start_time', 'stop_time'])