c00a2d3e9706cf3c2011b1c35856e63ff08402b7
[samba.git] / lib / subunit / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.compat import all
24 from testtools.content import (
25     text_content,
26     TracebackContent,
27     )
28
29 from subunit import iso8601
30
31
32 # NOT a TestResult, because we are implementing the interface, not inheriting
33 # it.
34 class TestResultDecorator(object):
35     """General pass-through decorator.
36
37     This provides a base that other TestResults can inherit from to
38     gain basic forwarding functionality. It also takes care of
39     handling the case where the target doesn't support newer methods
40     or features by degrading them.
41     """
42
43     # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44     # we should gut this and just use that.
45
46     def __init__(self, decorated):
47         """Create a TestResultDecorator forwarding to decorated."""
48         # Make every decorator degrade gracefully.
49         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
50
51     def startTest(self, test):
52         return self.decorated.startTest(test)
53
54     def startTestRun(self):
55         return self.decorated.startTestRun()
56
57     def stopTest(self, test):
58         return self.decorated.stopTest(test)
59
60     def stopTestRun(self):
61         return self.decorated.stopTestRun()
62
63     def addError(self, test, err=None, details=None):
64         return self.decorated.addError(test, err, details=details)
65
66     def addFailure(self, test, err=None, details=None):
67         return self.decorated.addFailure(test, err, details=details)
68
69     def addSuccess(self, test, details=None):
70         return self.decorated.addSuccess(test, details=details)
71
72     def addSkip(self, test, reason=None, details=None):
73         return self.decorated.addSkip(test, reason, details=details)
74
75     def addExpectedFailure(self, test, err=None, details=None):
76         return self.decorated.addExpectedFailure(test, err, details=details)
77
78     def addUnexpectedSuccess(self, test, details=None):
79         return self.decorated.addUnexpectedSuccess(test, details=details)
80
81     def progress(self, offset, whence):
82         return self.decorated.progress(offset, whence)
83
84     def wasSuccessful(self):
85         return self.decorated.wasSuccessful()
86
87     @property
88     def shouldStop(self):
89         return self.decorated.shouldStop
90
91     def stop(self):
92         return self.decorated.stop()
93
94     @property
95     def testsRun(self):
96         return self.decorated.testsRun
97
98     def tags(self, new_tags, gone_tags):
99         return self.decorated.tags(new_tags, gone_tags)
100
101     def time(self, a_datetime):
102         return self.decorated.time(a_datetime)
103
104
105 class HookedTestResultDecorator(TestResultDecorator):
106     """A TestResult which calls a hook on every event."""
107
108     def __init__(self, decorated):
109         self.super = super(HookedTestResultDecorator, self)
110         self.super.__init__(decorated)
111
112     def startTest(self, test):
113         self._before_event()
114         return self.super.startTest(test)
115
116     def startTestRun(self):
117         self._before_event()
118         return self.super.startTestRun()
119
120     def stopTest(self, test):
121         self._before_event()
122         return self.super.stopTest(test)
123
124     def stopTestRun(self):
125         self._before_event()
126         return self.super.stopTestRun()
127
128     def addError(self, test, err=None, details=None):
129         self._before_event()
130         return self.super.addError(test, err, details=details)
131
132     def addFailure(self, test, err=None, details=None):
133         self._before_event()
134         return self.super.addFailure(test, err, details=details)
135
136     def addSuccess(self, test, details=None):
137         self._before_event()
138         return self.super.addSuccess(test, details=details)
139
140     def addSkip(self, test, reason=None, details=None):
141         self._before_event()
142         return self.super.addSkip(test, reason, details=details)
143
144     def addExpectedFailure(self, test, err=None, details=None):
145         self._before_event()
146         return self.super.addExpectedFailure(test, err, details=details)
147
148     def addUnexpectedSuccess(self, test, details=None):
149         self._before_event()
150         return self.super.addUnexpectedSuccess(test, details=details)
151
152     def progress(self, offset, whence):
153         self._before_event()
154         return self.super.progress(offset, whence)
155
156     def wasSuccessful(self):
157         self._before_event()
158         return self.super.wasSuccessful()
159
160     @property
161     def shouldStop(self):
162         self._before_event()
163         return self.super.shouldStop
164
165     def stop(self):
166         self._before_event()
167         return self.super.stop()
168
169     def time(self, a_datetime):
170         self._before_event()
171         return self.super.time(a_datetime)
172
173
174 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
175     """Decorate a TestResult to add time events to a test run.
176
177     By default this will cause a time event before every test event,
178     but if explicit time data is being provided by the test run, then
179     this decorator will turn itself off to prevent causing confusion.
180     """
181
182     def __init__(self, decorated):
183         self._time = None
184         super(AutoTimingTestResultDecorator, self).__init__(decorated)
185
186     def _before_event(self):
187         time = self._time
188         if time is not None:
189             return
190         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
191         self.decorated.time(time)
192
193     def progress(self, offset, whence):
194         return self.decorated.progress(offset, whence)
195
196     @property
197     def shouldStop(self):
198         return self.decorated.shouldStop
199
200     def time(self, a_datetime):
201         """Provide a timestamp for the current test activity.
202
203         :param a_datetime: If None, automatically add timestamps before every
204             event (this is the default behaviour if time() is not called at
205             all).  If not None, pass the provided time onto the decorated
206             result object and disable automatic timestamps.
207         """
208         self._time = a_datetime
209         return self.decorated.time(a_datetime)
210
211
212 class TagsMixin(object):
213
214     def __init__(self):
215         self._clear_tags()
216
217     def _clear_tags(self):
218         self._global_tags = set(), set()
219         self._test_tags = None
220
221     def _get_active_tags(self):
222         global_new, global_gone = self._global_tags
223         if self._test_tags is None:
224             return set(global_new)
225         test_new, test_gone = self._test_tags
226         return global_new.difference(test_gone).union(test_new)
227
228     def _get_current_scope(self):
229         if self._test_tags:
230             return self._test_tags
231         return self._global_tags
232
233     def _flush_current_scope(self, tag_receiver):
234         new_tags, gone_tags = self._get_current_scope()
235         if new_tags or gone_tags:
236             tag_receiver.tags(new_tags, gone_tags)
237         if self._test_tags:
238             self._test_tags = set(), set()
239         else:
240             self._global_tags = set(), set()
241
242     def startTestRun(self):
243         self._clear_tags()
244
245     def startTest(self, test):
246         self._test_tags = set(), set()
247
248     def stopTest(self, test):
249         self._test_tags = None
250
251     def tags(self, new_tags, gone_tags):
252         """Handle tag instructions.
253
254         Adds and removes tags as appropriate. If a test is currently running,
255         tags are not affected for subsequent tests.
256
257         :param new_tags: Tags to add,
258         :param gone_tags: Tags to remove.
259         """
260         current_new_tags, current_gone_tags = self._get_current_scope()
261         current_new_tags.update(new_tags)
262         current_new_tags.difference_update(gone_tags)
263         current_gone_tags.update(gone_tags)
264         current_gone_tags.difference_update(new_tags)
265
266
267 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
268     """Collapses many 'tags' calls into one where possible."""
269
270     def __init__(self, result):
271         super(TagCollapsingDecorator, self).__init__(result)
272         self._clear_tags()
273
274     def _before_event(self):
275         self._flush_current_scope(self.decorated)
276
277     def tags(self, new_tags, gone_tags):
278         TagsMixin.tags(self, new_tags, gone_tags)
279
280
281 class TimeCollapsingDecorator(HookedTestResultDecorator):
282     """Only pass on the first and last of a consecutive sequence of times."""
283
284     def __init__(self, decorated):
285         super(TimeCollapsingDecorator, self).__init__(decorated)
286         self._last_received_time = None
287         self._last_sent_time = None
288
289     def _before_event(self):
290         if self._last_received_time is None:
291             return
292         if self._last_received_time != self._last_sent_time:
293             self.decorated.time(self._last_received_time)
294             self._last_sent_time = self._last_received_time
295         self._last_received_time = None
296
297     def time(self, a_time):
298         # Don't upcall, because we don't want to call _before_event, it's only
299         # for non-time events.
300         if self._last_received_time is None:
301             self.decorated.time(a_time)
302             self._last_sent_time = a_time
303         self._last_received_time = a_time
304
305
306 def and_predicates(predicates):
307     """Return a predicate that is true iff all predicates are true."""
308     # XXX: Should probably be in testtools to be better used by matchers. jml
309     return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
310
311
312 def make_tag_filter(with_tags, without_tags):
313     """Make a callback that checks tests against tags."""
314
315     with_tags = with_tags and set(with_tags) or None
316     without_tags = without_tags and set(without_tags) or None
317
318     def check_tags(test, outcome, err, details, tags):
319         if with_tags and not with_tags <= tags:
320             return False
321         if without_tags and bool(without_tags & tags):
322             return False
323         return True
324
325     return check_tags
326
327
328 class _PredicateFilter(TestResultDecorator, TagsMixin):
329
330     def __init__(self, result, predicate):
331         super(_PredicateFilter, self).__init__(result)
332         self._clear_tags()
333         self.decorated = TimeCollapsingDecorator(
334             TagCollapsingDecorator(self.decorated))
335         self._predicate = predicate
336         # The current test (for filtering tags)
337         self._current_test = None
338         # Has the current test been filtered (for outputting test tags)
339         self._current_test_filtered = None
340         # Calls to this result that we don't know whether to forward on yet.
341         self._buffered_calls = []
342
343     def filter_predicate(self, test, outcome, error, details):
344         return self._predicate(
345             test, outcome, error, details, self._get_active_tags())
346
347     def addError(self, test, err=None, details=None):
348         if (self.filter_predicate(test, 'error', err, details)):
349             self._buffered_calls.append(
350                 ('addError', [test, err], {'details': details}))
351         else:
352             self._filtered()
353
354     def addFailure(self, test, err=None, details=None):
355         if (self.filter_predicate(test, 'failure', err, details)):
356             self._buffered_calls.append(
357                 ('addFailure', [test, err], {'details': details}))
358         else:
359             self._filtered()
360
361     def addSkip(self, test, reason=None, details=None):
362         if (self.filter_predicate(test, 'skip', reason, details)):
363             self._buffered_calls.append(
364                 ('addSkip', [test, reason], {'details': details}))
365         else:
366             self._filtered()
367
368     def addExpectedFailure(self, test, err=None, details=None):
369         if self.filter_predicate(test, 'expectedfailure', err, details):
370             self._buffered_calls.append(
371                 ('addExpectedFailure', [test, err], {'details': details}))
372         else:
373             self._filtered()
374
375     def addUnexpectedSuccess(self, test, details=None):
376         self._buffered_calls.append(
377             ('addUnexpectedSuccess', [test], {'details': details}))
378
379     def addSuccess(self, test, details=None):
380         if (self.filter_predicate(test, 'success', None, details)):
381             self._buffered_calls.append(
382                 ('addSuccess', [test], {'details': details}))
383         else:
384             self._filtered()
385
386     def _filtered(self):
387         self._current_test_filtered = True
388
389     def startTest(self, test):
390         """Start a test.
391
392         Not directly passed to the client, but used for handling of tags
393         correctly.
394         """
395         TagsMixin.startTest(self, test)
396         self._current_test = test
397         self._current_test_filtered = False
398         self._buffered_calls.append(('startTest', [test], {}))
399
400     def stopTest(self, test):
401         """Stop a test.
402
403         Not directly passed to the client, but used for handling of tags
404         correctly.
405         """
406         if not self._current_test_filtered:
407             for method, args, kwargs in self._buffered_calls:
408                 getattr(self.decorated, method)(*args, **kwargs)
409             self.decorated.stopTest(test)
410         self._current_test = None
411         self._current_test_filtered = None
412         self._buffered_calls = []
413         TagsMixin.stopTest(self, test)
414
415     def tags(self, new_tags, gone_tags):
416         TagsMixin.tags(self, new_tags, gone_tags)
417         if self._current_test is not None:
418             self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
419         else:
420             return super(_PredicateFilter, self).tags(new_tags, gone_tags)
421
422     def time(self, a_time):
423         return self.decorated.time(a_time)
424
425     def id_to_orig_id(self, id):
426         if id.startswith("subunit.RemotedTestCase."):
427             return id[len("subunit.RemotedTestCase."):]
428         return id
429
430
431 class TestResultFilter(TestResultDecorator):
432     """A pyunit TestResult interface implementation which filters tests.
433
434     Tests that pass the filter are handed on to another TestResult instance
435     for further processing/reporting. To obtain the filtered results,
436     the other instance must be interrogated.
437
438     :ivar result: The result that tests are passed to after filtering.
439     :ivar filter_predicate: The callback run to decide whether to pass
440         a result.
441     """
442
443     def __init__(self, result, filter_error=False, filter_failure=False,
444         filter_success=True, filter_skip=False, filter_xfail=False,
445         filter_predicate=None, fixup_expected_failures=None):
446         """Create a FilterResult object filtering to result.
447
448         :param filter_error: Filter out errors.
449         :param filter_failure: Filter out failures.
450         :param filter_success: Filter out successful tests.
451         :param filter_skip: Filter out skipped tests.
452         :param filter_xfail: Filter out expected failure tests.
453         :param filter_predicate: A callable taking (test, outcome, err,
454             details, tags) and returning True if the result should be passed
455             through.  err and details may be none if no error or extra
456             metadata is available. outcome is the name of the outcome such
457             as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
458             are still supported but should be updated to accept the tags
459             parameter for efficiency.
460         :param fixup_expected_failures: Set of test ids to consider known
461             failing.
462         """
463         predicates = []
464         if filter_error:
465             predicates.append(
466                 lambda t, outcome, e, d, tags: outcome != 'error')
467         if filter_failure:
468             predicates.append(
469                 lambda t, outcome, e, d, tags: outcome != 'failure')
470         if filter_success:
471             predicates.append(
472                 lambda t, outcome, e, d, tags: outcome != 'success')
473         if filter_skip:
474             predicates.append(
475                 lambda t, outcome, e, d, tags: outcome != 'skip')
476         if filter_xfail:
477             predicates.append(
478                 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
479         if filter_predicate is not None:
480             def compat(test, outcome, error, details, tags):
481                 # 0.0.7 and earlier did not support the 'tags' parameter.
482                 try:
483                     return filter_predicate(
484                         test, outcome, error, details, tags)
485                 except TypeError:
486                     return filter_predicate(test, outcome, error, details)
487             predicates.append(compat)
488         predicate = and_predicates(predicates)
489         super(TestResultFilter, self).__init__(
490             _PredicateFilter(result, predicate))
491         if fixup_expected_failures is None:
492             self._fixup_expected_failures = frozenset()
493         else:
494             self._fixup_expected_failures = fixup_expected_failures
495
496     def addError(self, test, err=None, details=None):
497         if self._failure_expected(test):
498             self.addExpectedFailure(test, err=err, details=details)
499         else:
500             super(TestResultFilter, self).addError(
501                 test, err=err, details=details)
502
503     def addFailure(self, test, err=None, details=None):
504         if self._failure_expected(test):
505             self.addExpectedFailure(test, err=err, details=details)
506         else:
507             super(TestResultFilter, self).addFailure(
508                 test, err=err, details=details)
509
510     def addSuccess(self, test, details=None):
511         if self._failure_expected(test):
512             self.addUnexpectedSuccess(test, details=details)
513         else:
514             super(TestResultFilter, self).addSuccess(test, details=details)
515
516     def _failure_expected(self, test):
517         return (test.id() in self._fixup_expected_failures)
518
519
520 class TestIdPrintingResult(testtools.TestResult):
521
522     def __init__(self, stream, show_times=False):
523         """Create a FilterResult object outputting to stream."""
524         super(TestIdPrintingResult, self).__init__()
525         self._stream = stream
526         self.failed_tests = 0
527         self.__time = None
528         self.show_times = show_times
529         self._test = None
530         self._test_duration = 0
531
532     def addError(self, test, err):
533         self.failed_tests += 1
534         self._test = test
535
536     def addFailure(self, test, err):
537         self.failed_tests += 1
538         self._test = test
539
540     def addSuccess(self, test):
541         self._test = test
542
543     def addSkip(self, test, reason=None, details=None):
544         self._test = test
545
546     def addUnexpectedSuccess(self, test, details=None):
547         self.failed_tests += 1
548         self._test = test
549
550     def addExpectedFailure(self, test, err=None, details=None):
551         self._test = test
552
553     def reportTest(self, test, duration):
554         if self.show_times:
555             seconds = duration.seconds
556             seconds += duration.days * 3600 * 24
557             seconds += duration.microseconds / 1000000.0
558             self._stream.write(test.id() + ' %0.3f\n' % seconds)
559         else:
560             self._stream.write(test.id() + '\n')
561
562     def startTest(self, test):
563         self._start_time = self._time()
564
565     def stopTest(self, test):
566         test_duration = self._time() - self._start_time
567         self.reportTest(self._test, test_duration)
568
569     def time(self, time):
570         self.__time = time
571
572     def _time(self):
573         return self.__time
574
575     def wasSuccessful(self):
576         "Tells whether or not this result was a success"
577         return self.failed_tests == 0
578
579
580 class TestByTestResult(testtools.TestResult):
581     """Call something every time a test completes."""
582
583 # XXX: In testtools since lp:testtools r249.  Once that's released, just
584 # import that.
585
586     def __init__(self, on_test):
587         """Construct a ``TestByTestResult``.
588
589         :param on_test: A callable that take a test case, a status (one of
590             "success", "failure", "error", "skip", or "xfail"), a start time
591             (a ``datetime`` with timezone), a stop time, an iterable of tags,
592             and a details dict. Is called at the end of each test (i.e. on
593             ``stopTest``) with the accumulated values for that test.
594         """
595         super(TestByTestResult, self).__init__()
596         self._on_test = on_test
597
598     def startTest(self, test):
599         super(TestByTestResult, self).startTest(test)
600         self._start_time = self._now()
601         # There's no supported (i.e. tested) behaviour that relies on these
602         # being set, but it makes me more comfortable all the same. -- jml
603         self._status = None
604         self._details = None
605         self._stop_time = None
606
607     def stopTest(self, test):
608         self._stop_time = self._now()
609         super(TestByTestResult, self).stopTest(test)
610         self._on_test(
611             test=test,
612             status=self._status,
613             start_time=self._start_time,
614             stop_time=self._stop_time,
615             # current_tags is new in testtools 0.9.13.
616             tags=getattr(self, 'current_tags', None),
617             details=self._details)
618
619     def _err_to_details(self, test, err, details):
620         if details:
621             return details
622         return {'traceback': TracebackContent(err, test)}
623
624     def addSuccess(self, test, details=None):
625         super(TestByTestResult, self).addSuccess(test)
626         self._status = 'success'
627         self._details = details
628
629     def addFailure(self, test, err=None, details=None):
630         super(TestByTestResult, self).addFailure(test, err, details)
631         self._status = 'failure'
632         self._details = self._err_to_details(test, err, details)
633
634     def addError(self, test, err=None, details=None):
635         super(TestByTestResult, self).addError(test, err, details)
636         self._status = 'error'
637         self._details = self._err_to_details(test, err, details)
638
639     def addSkip(self, test, reason=None, details=None):
640         super(TestByTestResult, self).addSkip(test, reason, details)
641         self._status = 'skip'
642         if details is None:
643             details = {'reason': text_content(reason)}
644         elif reason:
645             # XXX: What if details already has 'reason' key?
646             details['reason'] = text_content(reason)
647         self._details = details
648
649     def addExpectedFailure(self, test, err=None, details=None):
650         super(TestByTestResult, self).addExpectedFailure(test, err, details)
651         self._status = 'xfail'
652         self._details = self._err_to_details(test, err, details)
653
654     def addUnexpectedSuccess(self, test, details=None):
655         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
656         self._status = 'success'
657         self._details = details
658
659
660 class CsvResult(TestByTestResult):
661
662     def __init__(self, stream):
663         super(CsvResult, self).__init__(self._on_test)
664         self._write_row = csv.writer(stream).writerow
665
666     def _on_test(self, test, status, start_time, stop_time, tags, details):
667         self._write_row([test.id(), status, start_time, stop_time])
668
669     def startTestRun(self):
670         super(CsvResult, self).startTestRun()
671         self._write_row(['test', 'status', 'start_time', 'stop_time'])