bb67df0a3526304d4e4e6d251a73d470fe1d95c9
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.compat import all
24 from testtools.content import (
25     text_content,
26     TracebackContent,
27     )
28
29 from subunit import iso8601
30
31
32 # NOT a TestResult, because we are implementing the interface, not inheriting
33 # it.
34 class TestResultDecorator(object):
35     """General pass-through decorator.
36
37     This provides a base that other TestResults can inherit from to
38     gain basic forwarding functionality. It also takes care of
39     handling the case where the target doesn't support newer methods
40     or features by degrading them.
41     """
42
43     # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44     # we should gut this and just use that.
45
46     def __init__(self, decorated):
47         """Create a TestResultDecorator forwarding to decorated."""
48         # Make every decorator degrade gracefully.
49         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
50
51     def startTest(self, test):
52         return self.decorated.startTest(test)
53
54     def startTestRun(self):
55         return self.decorated.startTestRun()
56
57     def stopTest(self, test):
58         return self.decorated.stopTest(test)
59
60     def stopTestRun(self):
61         return self.decorated.stopTestRun()
62
63     def addError(self, test, err=None, details=None):
64         return self.decorated.addError(test, err, details=details)
65
66     def addFailure(self, test, err=None, details=None):
67         return self.decorated.addFailure(test, err, details=details)
68
69     def addSuccess(self, test, details=None):
70         return self.decorated.addSuccess(test, details=details)
71
72     def addSkip(self, test, reason=None, details=None):
73         return self.decorated.addSkip(test, reason, details=details)
74
75     def addExpectedFailure(self, test, err=None, details=None):
76         return self.decorated.addExpectedFailure(test, err, details=details)
77
78     def addUnexpectedSuccess(self, test, details=None):
79         return self.decorated.addUnexpectedSuccess(test, details=details)
80
81     def progress(self, offset, whence):
82         return self.decorated.progress(offset, whence)
83
84     def wasSuccessful(self):
85         return self.decorated.wasSuccessful()
86
87     @property
88     def shouldStop(self):
89         return self.decorated.shouldStop
90
91     def stop(self):
92         return self.decorated.stop()
93
94     @property
95     def testsRun(self):
96         return self.decorated.testsRun
97
98     def tags(self, new_tags, gone_tags):
99         return self.decorated.tags(new_tags, gone_tags)
100
101     def time(self, a_datetime):
102         return self.decorated.time(a_datetime)
103
104
105 class HookedTestResultDecorator(TestResultDecorator):
106     """A TestResult which calls a hook on every event."""
107
108     def __init__(self, decorated):
109         self.super = super(HookedTestResultDecorator, self)
110         self.super.__init__(decorated)
111
112     def startTest(self, test):
113         self._before_event()
114         return self.super.startTest(test)
115
116     def startTestRun(self):
117         self._before_event()
118         return self.super.startTestRun()
119
120     def stopTest(self, test):
121         self._before_event()
122         return self.super.stopTest(test)
123
124     def stopTestRun(self):
125         self._before_event()
126         return self.super.stopTestRun()
127
128     def addError(self, test, err=None, details=None):
129         self._before_event()
130         return self.super.addError(test, err, details=details)
131
132     def addFailure(self, test, err=None, details=None):
133         self._before_event()
134         return self.super.addFailure(test, err, details=details)
135
136     def addSuccess(self, test, details=None):
137         self._before_event()
138         return self.super.addSuccess(test, details=details)
139
140     def addSkip(self, test, reason=None, details=None):
141         self._before_event()
142         return self.super.addSkip(test, reason, details=details)
143
144     def addExpectedFailure(self, test, err=None, details=None):
145         self._before_event()
146         return self.super.addExpectedFailure(test, err, details=details)
147
148     def addUnexpectedSuccess(self, test, details=None):
149         self._before_event()
150         return self.super.addUnexpectedSuccess(test, details=details)
151
152     def progress(self, offset, whence):
153         self._before_event()
154         return self.super.progress(offset, whence)
155
156     def wasSuccessful(self):
157         self._before_event()
158         return self.super.wasSuccessful()
159
160     @property
161     def shouldStop(self):
162         self._before_event()
163         return self.super.shouldStop
164
165     def stop(self):
166         self._before_event()
167         return self.super.stop()
168
169     def time(self, a_datetime):
170         self._before_event()
171         return self.super.time(a_datetime)
172
173
174 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
175     """Decorate a TestResult to add time events to a test run.
176
177     By default this will cause a time event before every test event,
178     but if explicit time data is being provided by the test run, then
179     this decorator will turn itself off to prevent causing confusion.
180     """
181
182     def __init__(self, decorated):
183         self._time = None
184         super(AutoTimingTestResultDecorator, self).__init__(decorated)
185
186     def _before_event(self):
187         time = self._time
188         if time is not None:
189             return
190         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
191         self.decorated.time(time)
192
193     def progress(self, offset, whence):
194         return self.decorated.progress(offset, whence)
195
196     @property
197     def shouldStop(self):
198         return self.decorated.shouldStop
199
200     def time(self, a_datetime):
201         """Provide a timestamp for the current test activity.
202
203         :param a_datetime: If None, automatically add timestamps before every
204             event (this is the default behaviour if time() is not called at
205             all).  If not None, pass the provided time onto the decorated
206             result object and disable automatic timestamps.
207         """
208         self._time = a_datetime
209         return self.decorated.time(a_datetime)
210
211
212 class TagsMixin(object):
213
214     def __init__(self):
215         self._clear_tags()
216
217     def _clear_tags(self):
218         self._global_tags = set(), set()
219         self._test_tags = None
220
221     def _get_active_tags(self):
222         global_new, global_gone = self._global_tags
223         if self._test_tags is None:
224             return set(global_new)
225         test_new, test_gone = self._test_tags
226         return global_new.difference(test_gone).union(test_new)
227
228     def _get_current_scope(self):
229         if self._test_tags:
230             return self._test_tags
231         return self._global_tags
232
233     def _flush_current_scope(self, tag_receiver):
234         new_tags, gone_tags = self._get_current_scope()
235         if new_tags or gone_tags:
236             tag_receiver.tags(new_tags, gone_tags)
237         if self._test_tags:
238             self._test_tags = set(), set()
239         else:
240             self._global_tags = set(), set()
241
242     def startTestRun(self):
243         self._clear_tags()
244
245     def startTest(self, test):
246         self._test_tags = set(), set()
247
248     def stopTest(self, test):
249         self._test_tags = None
250
251     def tags(self, new_tags, gone_tags):
252         """Handle tag instructions.
253
254         Adds and removes tags as appropriate. If a test is currently running,
255         tags are not affected for subsequent tests.
256
257         :param new_tags: Tags to add,
258         :param gone_tags: Tags to remove.
259         """
260         current_new_tags, current_gone_tags = self._get_current_scope()
261         current_new_tags.update(new_tags)
262         current_new_tags.difference_update(gone_tags)
263         current_gone_tags.update(gone_tags)
264         current_gone_tags.difference_update(new_tags)
265
266
267 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
268     """Collapses many 'tags' calls into one where possible."""
269
270     def __init__(self, result):
271         super(TagCollapsingDecorator, self).__init__(result)
272         self._clear_tags()
273
274     def _before_event(self):
275         self._flush_current_scope(self.decorated)
276
277     def tags(self, new_tags, gone_tags):
278         TagsMixin.tags(self, new_tags, gone_tags)
279
280
281 class TimeCollapsingDecorator(HookedTestResultDecorator):
282     """Only pass on the first and last of a consecutive sequence of times."""
283
284     def __init__(self, decorated):
285         super(TimeCollapsingDecorator, self).__init__(decorated)
286         self._last_received_time = None
287         self._last_sent_time = None
288
289     def _before_event(self):
290         if self._last_received_time is None:
291             return
292         if self._last_received_time != self._last_sent_time:
293             self.decorated.time(self._last_received_time)
294             self._last_sent_time = self._last_received_time
295         self._last_received_time = None
296
297     def time(self, a_time):
298         # Don't upcall, because we don't want to call _before_event, it's only
299         # for non-time events.
300         if self._last_received_time is None:
301             self.decorated.time(a_time)
302             self._last_sent_time = a_time
303         self._last_received_time = a_time
304
305
306 def and_predicates(predicates):
307     """Return a predicate that is true iff all predicates are true."""
308     # XXX: Should probably be in testtools to be better used by matchers. jml
309     return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
310
311
312 def _make_tag_filter(with_tags, without_tags):
313     """Make a callback that checks tests against tags."""
314
315     with_tags = with_tags and set(with_tags) or None
316     without_tags = without_tags and set(without_tags) or None
317
318     def check_tags(test, outcome, err, details, tags):
319         if with_tags and not with_tags <= tags:
320             return False
321         if without_tags and bool(without_tags & tags):
322             return False
323         return True
324
325     return check_tags
326
327
328 class _PredicateFilter(TestResultDecorator, TagsMixin):
329
330     def __init__(self, result, predicate):
331         super(_PredicateFilter, self).__init__(result)
332         self._clear_tags()
333         self.decorated = TimeCollapsingDecorator(
334             TagCollapsingDecorator(self.decorated))
335         self._predicate = predicate
336         # The current test (for filtering tags)
337         self._current_test = None
338         # Has the current test been filtered (for outputting test tags)
339         self._current_test_filtered = None
340         # Calls to this result that we don't know whether to forward on yet.
341         self._buffered_calls = []
342
343     def filter_predicate(self, test, outcome, error, details):
344         return self._predicate(
345             test, outcome, error, details, self._get_active_tags())
346
347     def addError(self, test, err=None, details=None):
348         if (self.filter_predicate(test, 'error', err, details)):
349             self._buffered_calls.append(
350                 ('addError', [test, err], {'details': details}))
351         else:
352             self._filtered()
353
354     def addFailure(self, test, err=None, details=None):
355         if (self.filter_predicate(test, 'failure', err, details)):
356             self._buffered_calls.append(
357                 ('addFailure', [test, err], {'details': details}))
358         else:
359             self._filtered()
360
361     def addSkip(self, test, reason=None, details=None):
362         if (self.filter_predicate(test, 'skip', reason, details)):
363             self._buffered_calls.append(
364                 ('addSkip', [test, reason], {'details': details}))
365         else:
366             self._filtered()
367
368     def addExpectedFailure(self, test, err=None, details=None):
369         if self.filter_predicate(test, 'expectedfailure', err, details):
370             self._buffered_calls.append(
371                 ('addExpectedFailure', [test, err], {'details': details}))
372         else:
373             self._filtered()
374
375     def addUnexpectedSuccess(self, test, details=None):
376         self._buffered_calls.append(
377             ('addUnexpectedSuccess', [test], {'details': details}))
378
379     def addSuccess(self, test, details=None):
380         if (self.filter_predicate(test, 'success', None, details)):
381             self._buffered_calls.append(
382                 ('addSuccess', [test], {'details': details}))
383         else:
384             self._filtered()
385
386     def _filtered(self):
387         self._current_test_filtered = True
388
389     def startTest(self, test):
390         """Start a test.
391
392         Not directly passed to the client, but used for handling of tags
393         correctly.
394         """
395         TagsMixin.startTest(self, test)
396         self._current_test = test
397         self._current_test_filtered = False
398         self._buffered_calls.append(('startTest', [test], {}))
399
400     def stopTest(self, test):
401         """Stop a test.
402
403         Not directly passed to the client, but used for handling of tags
404         correctly.
405         """
406         if not self._current_test_filtered:
407             for method, args, kwargs in self._buffered_calls:
408                 getattr(self.decorated, method)(*args, **kwargs)
409             self.decorated.stopTest(test)
410         self._current_test = None
411         self._current_test_filtered = None
412         self._buffered_calls = []
413         TagsMixin.stopTest(self, test)
414
415     def tags(self, new_tags, gone_tags):
416         TagsMixin.tags(self, new_tags, gone_tags)
417         if self._current_test is not None:
418             self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
419         else:
420             return super(_PredicateFilter, self).tags(new_tags, gone_tags)
421
422     def time(self, a_time):
423         if self._current_test is not None:
424             self._buffered_calls.append(('time', [a_time], {}))
425         else:
426             return self.decorated.time(a_time)
427
428     def id_to_orig_id(self, id):
429         if id.startswith("subunit.RemotedTestCase."):
430             return id[len("subunit.RemotedTestCase."):]
431         return id
432
433
434 class TestResultFilter(TestResultDecorator):
435     """A pyunit TestResult interface implementation which filters tests.
436
437     Tests that pass the filter are handed on to another TestResult instance
438     for further processing/reporting. To obtain the filtered results,
439     the other instance must be interrogated.
440
441     :ivar result: The result that tests are passed to after filtering.
442     :ivar filter_predicate: The callback run to decide whether to pass
443         a result.
444     """
445
446     def __init__(self, result, filter_error=False, filter_failure=False,
447         filter_success=True, filter_skip=False, filter_xfail=False,
448         filter_predicate=None, fixup_expected_failures=None):
449         """Create a FilterResult object filtering to result.
450
451         :param filter_error: Filter out errors.
452         :param filter_failure: Filter out failures.
453         :param filter_success: Filter out successful tests.
454         :param filter_skip: Filter out skipped tests.
455         :param filter_xfail: Filter out expected failure tests.
456         :param filter_predicate: A callable taking (test, outcome, err,
457             details, tags) and returning True if the result should be passed
458             through.  err and details may be none if no error or extra
459             metadata is available. outcome is the name of the outcome such
460             as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
461             are still supported but should be updated to accept the tags
462             parameter for efficiency.
463         :param fixup_expected_failures: Set of test ids to consider known
464             failing.
465         """
466         predicates = []
467         if filter_error:
468             predicates.append(
469                 lambda t, outcome, e, d, tags: outcome != 'error')
470         if filter_failure:
471             predicates.append(
472                 lambda t, outcome, e, d, tags: outcome != 'failure')
473         if filter_success:
474             predicates.append(
475                 lambda t, outcome, e, d, tags: outcome != 'success')
476         if filter_skip:
477             predicates.append(
478                 lambda t, outcome, e, d, tags: outcome != 'skip')
479         if filter_xfail:
480             predicates.append(
481                 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
482         if filter_predicate is not None:
483             def compat(test, outcome, error, details, tags):
484                 # 0.0.7 and earlier did not support the 'tags' parameter.
485                 try:
486                     return filter_predicate(
487                         test, outcome, error, details, tags)
488                 except TypeError:
489                     return filter_predicate(test, outcome, error, details)
490             predicates.append(compat)
491         predicate = and_predicates(predicates)
492         super(TestResultFilter, self).__init__(
493             _PredicateFilter(result, predicate))
494         if fixup_expected_failures is None:
495             self._fixup_expected_failures = frozenset()
496         else:
497             self._fixup_expected_failures = fixup_expected_failures
498
499     def addError(self, test, err=None, details=None):
500         if self._failure_expected(test):
501             self.addExpectedFailure(test, err=err, details=details)
502         else:
503             super(TestResultFilter, self).addError(
504                 test, err=err, details=details)
505
506     def addFailure(self, test, err=None, details=None):
507         if self._failure_expected(test):
508             self.addExpectedFailure(test, err=err, details=details)
509         else:
510             super(TestResultFilter, self).addFailure(
511                 test, err=err, details=details)
512
513     def addSuccess(self, test, details=None):
514         if self._failure_expected(test):
515             self.addUnexpectedSuccess(test, details=details)
516         else:
517             super(TestResultFilter, self).addSuccess(test, details=details)
518
519     def _failure_expected(self, test):
520         return (test.id() in self._fixup_expected_failures)
521
522
523 class TestIdPrintingResult(testtools.TestResult):
524
525     def __init__(self, stream, show_times=False):
526         """Create a FilterResult object outputting to stream."""
527         super(TestIdPrintingResult, self).__init__()
528         self._stream = stream
529         self.failed_tests = 0
530         self.__time = None
531         self.show_times = show_times
532         self._test = None
533         self._test_duration = 0
534
535     def addError(self, test, err):
536         self.failed_tests += 1
537         self._test = test
538
539     def addFailure(self, test, err):
540         self.failed_tests += 1
541         self._test = test
542
543     def addSuccess(self, test):
544         self._test = test
545
546     def addSkip(self, test, reason=None, details=None):
547         self._test = test
548
549     def addUnexpectedSuccess(self, test, details=None):
550         self.failed_tests += 1
551         self._test = test
552
553     def addExpectedFailure(self, test, err=None, details=None):
554         self._test = test
555
556     def reportTest(self, test, duration):
557         if self.show_times:
558             seconds = duration.seconds
559             seconds += duration.days * 3600 * 24
560             seconds += duration.microseconds / 1000000.0
561             self._stream.write(test.id() + ' %0.3f\n' % seconds)
562         else:
563             self._stream.write(test.id() + '\n')
564
565     def startTest(self, test):
566         self._start_time = self._time()
567
568     def stopTest(self, test):
569         test_duration = self._time() - self._start_time
570         self.reportTest(self._test, test_duration)
571
572     def time(self, time):
573         self.__time = time
574
575     def _time(self):
576         return self.__time
577
578     def wasSuccessful(self):
579         "Tells whether or not this result was a success"
580         return self.failed_tests == 0
581
582
583 class TestByTestResult(testtools.TestResult):
584     """Call something every time a test completes."""
585
586 # XXX: In testtools since lp:testtools r249.  Once that's released, just
587 # import that.
588
589     def __init__(self, on_test):
590         """Construct a ``TestByTestResult``.
591
592         :param on_test: A callable that take a test case, a status (one of
593             "success", "failure", "error", "skip", or "xfail"), a start time
594             (a ``datetime`` with timezone), a stop time, an iterable of tags,
595             and a details dict. Is called at the end of each test (i.e. on
596             ``stopTest``) with the accumulated values for that test.
597         """
598         super(TestByTestResult, self).__init__()
599         self._on_test = on_test
600
601     def startTest(self, test):
602         super(TestByTestResult, self).startTest(test)
603         self._start_time = self._now()
604         # There's no supported (i.e. tested) behaviour that relies on these
605         # being set, but it makes me more comfortable all the same. -- jml
606         self._status = None
607         self._details = None
608         self._stop_time = None
609
610     def stopTest(self, test):
611         self._stop_time = self._now()
612         super(TestByTestResult, self).stopTest(test)
613         self._on_test(
614             test=test,
615             status=self._status,
616             start_time=self._start_time,
617             stop_time=self._stop_time,
618             # current_tags is new in testtools 0.9.13.
619             tags=getattr(self, 'current_tags', None),
620             details=self._details)
621
622     def _err_to_details(self, test, err, details):
623         if details:
624             return details
625         return {'traceback': TracebackContent(err, test)}
626
627     def addSuccess(self, test, details=None):
628         super(TestByTestResult, self).addSuccess(test)
629         self._status = 'success'
630         self._details = details
631
632     def addFailure(self, test, err=None, details=None):
633         super(TestByTestResult, self).addFailure(test, err, details)
634         self._status = 'failure'
635         self._details = self._err_to_details(test, err, details)
636
637     def addError(self, test, err=None, details=None):
638         super(TestByTestResult, self).addError(test, err, details)
639         self._status = 'error'
640         self._details = self._err_to_details(test, err, details)
641
642     def addSkip(self, test, reason=None, details=None):
643         super(TestByTestResult, self).addSkip(test, reason, details)
644         self._status = 'skip'
645         if details is None:
646             details = {'reason': text_content(reason)}
647         elif reason:
648             # XXX: What if details already has 'reason' key?
649             details['reason'] = text_content(reason)
650         self._details = details
651
652     def addExpectedFailure(self, test, err=None, details=None):
653         super(TestByTestResult, self).addExpectedFailure(test, err, details)
654         self._status = 'xfail'
655         self._details = self._err_to_details(test, err, details)
656
657     def addUnexpectedSuccess(self, test, details=None):
658         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
659         self._status = 'success'
660         self._details = details
661
662
663 class CsvResult(TestByTestResult):
664
665     def __init__(self, stream):
666         super(CsvResult, self).__init__(self._on_test)
667         self._write_row = csv.writer(stream).writerow
668
669     def _on_test(self, test, status, start_time, stop_time, tags, details):
670         self._write_row([test.id(), status, start_time, stop_time])
671
672     def startTestRun(self):
673         super(CsvResult, self).startTestRun()
674         self._write_row(['test', 'status', 'start_time', 'stop_time'])