Merge lp:~jml/subunit/to-csv -- Add subunit2csv script to filter subunit output
[third_party/subunit] / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.content import (
24     text_content,
25     TracebackContent,
26     )
27
28 from subunit import iso8601
29
30
31 # NOT a TestResult, because we are implementing the interface, not inheriting
32 # it.
33 class TestResultDecorator(object):
34     """General pass-through decorator.
35
36     This provides a base that other TestResults can inherit from to
37     gain basic forwarding functionality. It also takes care of
38     handling the case where the target doesn't support newer methods
39     or features by degrading them.
40     """
41
42     def __init__(self, decorated):
43         """Create a TestResultDecorator forwarding to decorated."""
44         # Make every decorator degrade gracefully.
45         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
46
47     def startTest(self, test):
48         return self.decorated.startTest(test)
49
50     def startTestRun(self):
51         return self.decorated.startTestRun()
52
53     def stopTest(self, test):
54         return self.decorated.stopTest(test)
55
56     def stopTestRun(self):
57         return self.decorated.stopTestRun()
58
59     def addError(self, test, err=None, details=None):
60         return self.decorated.addError(test, err, details=details)
61
62     def addFailure(self, test, err=None, details=None):
63         return self.decorated.addFailure(test, err, details=details)
64
65     def addSuccess(self, test, details=None):
66         return self.decorated.addSuccess(test, details=details)
67
68     def addSkip(self, test, reason=None, details=None):
69         return self.decorated.addSkip(test, reason, details=details)
70
71     def addExpectedFailure(self, test, err=None, details=None):
72         return self.decorated.addExpectedFailure(test, err, details=details)
73
74     def addUnexpectedSuccess(self, test, details=None):
75         return self.decorated.addUnexpectedSuccess(test, details=details)
76
77     def progress(self, offset, whence):
78         return self.decorated.progress(offset, whence)
79
80     def wasSuccessful(self):
81         return self.decorated.wasSuccessful()
82
83     @property
84     def shouldStop(self):
85         return self.decorated.shouldStop
86
87     def stop(self):
88         return self.decorated.stop()
89
90     @property
91     def testsRun(self):
92         return self.decorated.testsRun
93
94     def tags(self, new_tags, gone_tags):
95         return self.decorated.tags(new_tags, gone_tags)
96
97     def time(self, a_datetime):
98         return self.decorated.time(a_datetime)
99
100
101 class HookedTestResultDecorator(TestResultDecorator):
102     """A TestResult which calls a hook on every event."""
103
104     def __init__(self, decorated):
105         self.super = super(HookedTestResultDecorator, self)
106         self.super.__init__(decorated)
107
108     def startTest(self, test):
109         self._before_event()
110         return self.super.startTest(test)
111
112     def startTestRun(self):
113         self._before_event()
114         return self.super.startTestRun()
115
116     def stopTest(self, test):
117         self._before_event()
118         return self.super.stopTest(test)
119
120     def stopTestRun(self):
121         self._before_event()
122         return self.super.stopTestRun()
123
124     def addError(self, test, err=None, details=None):
125         self._before_event()
126         return self.super.addError(test, err, details=details)
127
128     def addFailure(self, test, err=None, details=None):
129         self._before_event()
130         return self.super.addFailure(test, err, details=details)
131
132     def addSuccess(self, test, details=None):
133         self._before_event()
134         return self.super.addSuccess(test, details=details)
135
136     def addSkip(self, test, reason=None, details=None):
137         self._before_event()
138         return self.super.addSkip(test, reason, details=details)
139
140     def addExpectedFailure(self, test, err=None, details=None):
141         self._before_event()
142         return self.super.addExpectedFailure(test, err, details=details)
143
144     def addUnexpectedSuccess(self, test, details=None):
145         self._before_event()
146         return self.super.addUnexpectedSuccess(test, details=details)
147
148     def progress(self, offset, whence):
149         self._before_event()
150         return self.super.progress(offset, whence)
151
152     def wasSuccessful(self):
153         self._before_event()
154         return self.super.wasSuccessful()
155
156     @property
157     def shouldStop(self):
158         self._before_event()
159         return self.super.shouldStop
160
161     def stop(self):
162         self._before_event()
163         return self.super.stop()
164
165     def time(self, a_datetime):
166         self._before_event()
167         return self.super.time(a_datetime)
168
169
170 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
171     """Decorate a TestResult to add time events to a test run.
172
173     By default this will cause a time event before every test event,
174     but if explicit time data is being provided by the test run, then
175     this decorator will turn itself off to prevent causing confusion.
176     """
177
178     def __init__(self, decorated):
179         self._time = None
180         super(AutoTimingTestResultDecorator, self).__init__(decorated)
181
182     def _before_event(self):
183         time = self._time
184         if time is not None:
185             return
186         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
187         self.decorated.time(time)
188
189     def progress(self, offset, whence):
190         return self.decorated.progress(offset, whence)
191
192     @property
193     def shouldStop(self):
194         return self.decorated.shouldStop
195
196     def time(self, a_datetime):
197         """Provide a timestamp for the current test activity.
198
199         :param a_datetime: If None, automatically add timestamps before every
200             event (this is the default behaviour if time() is not called at
201             all).  If not None, pass the provided time onto the decorated
202             result object and disable automatic timestamps.
203         """
204         self._time = a_datetime
205         return self.decorated.time(a_datetime)
206
207
208 class TagCollapsingDecorator(TestResultDecorator):
209     """Collapses many 'tags' calls into one where possible."""
210
211     def __init__(self, result):
212         super(TagCollapsingDecorator, self).__init__(result)
213         # The (new, gone) tags for the current test.
214         self._current_test_tags = None
215
216     def startTest(self, test):
217         """Start a test.
218
219         Not directly passed to the client, but used for handling of tags
220         correctly.
221         """
222         self.decorated.startTest(test)
223         self._current_test_tags = set(), set()
224
225     def stopTest(self, test):
226         """Stop a test.
227
228         Not directly passed to the client, but used for handling of tags
229         correctly.
230         """
231         # Tags to output for this test.
232         if self._current_test_tags[0] or self._current_test_tags[1]:
233             self.decorated.tags(*self._current_test_tags)
234         self.decorated.stopTest(test)
235         self._current_test_tags = None
236
237     def tags(self, new_tags, gone_tags):
238         """Handle tag instructions.
239
240         Adds and removes tags as appropriate. If a test is currently running,
241         tags are not affected for subsequent tests.
242
243         :param new_tags: Tags to add,
244         :param gone_tags: Tags to remove.
245         """
246         if self._current_test_tags is not None:
247             # gather the tags until the test stops.
248             self._current_test_tags[0].update(new_tags)
249             self._current_test_tags[0].difference_update(gone_tags)
250             self._current_test_tags[1].update(gone_tags)
251             self._current_test_tags[1].difference_update(new_tags)
252         else:
253             return self.decorated.tags(new_tags, gone_tags)
254
255
256 class TimeCollapsingDecorator(HookedTestResultDecorator):
257     """Only pass on the first and last of a consecutive sequence of times."""
258
259     def __init__(self, decorated):
260         super(TimeCollapsingDecorator, self).__init__(decorated)
261         self._last_received_time = None
262         self._last_sent_time = None
263
264     def _before_event(self):
265         if self._last_received_time is None:
266             return
267         if self._last_received_time != self._last_sent_time:
268             self.decorated.time(self._last_received_time)
269             self._last_sent_time = self._last_received_time
270         self._last_received_time = None
271
272     def time(self, a_time):
273         # Don't upcall, because we don't want to call _before_event, it's only
274         # for non-time events.
275         if self._last_received_time is None:
276             self.decorated.time(a_time)
277             self._last_sent_time = a_time
278         self._last_received_time = a_time
279
280
281 def all_true(bools):
282     """Return True if all of 'bools' are True. False otherwise."""
283     for b in bools:
284         if not b:
285             return False
286     return True
287
288
289 class TestResultFilter(TestResultDecorator):
290     """A pyunit TestResult interface implementation which filters tests.
291
292     Tests that pass the filter are handed on to another TestResult instance
293     for further processing/reporting. To obtain the filtered results,
294     the other instance must be interrogated.
295
296     :ivar result: The result that tests are passed to after filtering.
297     :ivar filter_predicate: The callback run to decide whether to pass
298         a result.
299     """
300
301     def __init__(self, result, filter_error=False, filter_failure=False,
302         filter_success=True, filter_skip=False, filter_xfail=False,
303         filter_predicate=None, fixup_expected_failures=None):
304         """Create a FilterResult object filtering to result.
305
306         :param filter_error: Filter out errors.
307         :param filter_failure: Filter out failures.
308         :param filter_success: Filter out successful tests.
309         :param filter_skip: Filter out skipped tests.
310         :param filter_xfail: Filter out expected failure tests.
311         :param filter_predicate: A callable taking (test, outcome, err,
312             details) and returning True if the result should be passed
313             through.  err and details may be none if no error or extra
314             metadata is available. outcome is the name of the outcome such
315             as 'success' or 'failure'.
316         :param fixup_expected_failures: Set of test ids to consider known
317             failing.
318         """
319         super(TestResultFilter, self).__init__(result)
320         self.decorated = TimeCollapsingDecorator(
321             TagCollapsingDecorator(self.decorated))
322         predicates = []
323         if filter_error:
324             predicates.append(lambda t, outcome, e, d: outcome != 'error')
325         if filter_failure:
326             predicates.append(lambda t, outcome, e, d: outcome != 'failure')
327         if filter_success:
328             predicates.append(lambda t, outcome, e, d: outcome != 'success')
329         if filter_skip:
330             predicates.append(lambda t, outcome, e, d: outcome != 'skip')
331         if filter_xfail:
332             predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
333         if filter_predicate is not None:
334             predicates.append(filter_predicate)
335         self.filter_predicate = (
336             lambda test, outcome, err, details:
337                 all_true(p(test, outcome, err, details) for p in predicates))
338         # The current test (for filtering tags)
339         self._current_test = None
340         # Has the current test been filtered (for outputting test tags)
341         self._current_test_filtered = None
342         # Calls to this result that we don't know whether to forward on yet.
343         self._buffered_calls = []
344         if fixup_expected_failures is None:
345             self._fixup_expected_failures = frozenset()
346         else:
347             self._fixup_expected_failures = fixup_expected_failures
348
349     def addError(self, test, err=None, details=None):
350         if (self.filter_predicate(test, 'error', err, details)):
351             if self._failure_expected(test):
352                 self._buffered_calls.append(
353                     ('addExpectedFailure', [test, err], {'details': details}))
354             else:
355                 self._buffered_calls.append(
356                     ('addError', [test, err], {'details': details}))
357         else:
358             self._filtered()
359
360     def addFailure(self, test, err=None, details=None):
361         if (self.filter_predicate(test, 'failure', err, details)):
362             if self._failure_expected(test):
363                 self._buffered_calls.append(
364                     ('addExpectedFailure', [test, err], {'details': details}))
365             else:
366                 self._buffered_calls.append(
367                     ('addFailure', [test, err], {'details': details}))
368         else:
369             self._filtered()
370
371     def addSkip(self, test, reason=None, details=None):
372         if (self.filter_predicate(test, 'skip', reason, details)):
373             self._buffered_calls.append(
374                 ('addSkip', [test, reason], {'details': details}))
375         else:
376             self._filtered()
377
378     def addSuccess(self, test, details=None):
379         if (self.filter_predicate(test, 'success', None, details)):
380             if self._failure_expected(test):
381                 self._buffered_calls.append(
382                     ('addUnexpectedSuccess', [test], {'details': details}))
383             else:
384                 self._buffered_calls.append(
385                     ('addSuccess', [test], {'details': details}))
386         else:
387             self._filtered()
388
389     def addExpectedFailure(self, test, err=None, details=None):
390         if self.filter_predicate(test, 'expectedfailure', err, details):
391             self._buffered_calls.append(
392                 ('addExpectedFailure', [test, err], {'details': details}))
393         else:
394             self._filtered()
395
396     def addUnexpectedSuccess(self, test, details=None):
397         self._buffered_calls.append(
398             ('addUnexpectedSuccess', [test], {'details': details}))
399
400     def _filtered(self):
401         self._current_test_filtered = True
402
403     def _failure_expected(self, test):
404         return (test.id() in self._fixup_expected_failures)
405
406     def startTest(self, test):
407         """Start a test.
408
409         Not directly passed to the client, but used for handling of tags
410         correctly.
411         """
412         self._current_test = test
413         self._current_test_filtered = False
414         self._buffered_calls.append(('startTest', [test], {}))
415
416     def stopTest(self, test):
417         """Stop a test.
418
419         Not directly passed to the client, but used for handling of tags
420         correctly.
421         """
422         if not self._current_test_filtered:
423             # Tags to output for this test.
424             for method, args, kwargs in self._buffered_calls:
425                 getattr(self.decorated, method)(*args, **kwargs)
426             self.decorated.stopTest(test)
427         self._current_test = None
428         self._current_test_filtered = None
429         self._buffered_calls = []
430
431     def time(self, a_time):
432         if self._current_test is not None:
433             self._buffered_calls.append(('time', [a_time], {}))
434         else:
435             return self.decorated.time(a_time)
436
437     def id_to_orig_id(self, id):
438         if id.startswith("subunit.RemotedTestCase."):
439             return id[len("subunit.RemotedTestCase."):]
440         return id
441
442
443 class TestIdPrintingResult(testtools.TestResult):
444
445     def __init__(self, stream, show_times=False):
446         """Create a FilterResult object outputting to stream."""
447         super(TestIdPrintingResult, self).__init__()
448         self._stream = stream
449         self.failed_tests = 0
450         self.__time = None
451         self.show_times = show_times
452         self._test = None
453         self._test_duration = 0
454
455     def addError(self, test, err):
456         self.failed_tests += 1
457         self._test = test
458
459     def addFailure(self, test, err):
460         self.failed_tests += 1
461         self._test = test
462
463     def addSuccess(self, test):
464         self._test = test
465
466     def addSkip(self, test, reason=None, details=None):
467         self._test = test
468
469     def addUnexpectedSuccess(self, test, details=None):
470         self.failed_tests += 1
471         self._test = test
472
473     def addExpectedFailure(self, test, err=None, details=None):
474         self._test = test
475
476     def reportTest(self, test, duration):
477         if self.show_times:
478             seconds = duration.seconds
479             seconds += duration.days * 3600 * 24
480             seconds += duration.microseconds / 1000000.0
481             self._stream.write(test.id() + ' %0.3f\n' % seconds)
482         else:
483             self._stream.write(test.id() + '\n')
484
485     def startTest(self, test):
486         self._start_time = self._time()
487
488     def stopTest(self, test):
489         test_duration = self._time() - self._start_time
490         self.reportTest(self._test, test_duration)
491
492     def time(self, time):
493         self.__time = time
494
495     def _time(self):
496         return self.__time
497
498     def wasSuccessful(self):
499         "Tells whether or not this result was a success"
500         return self.failed_tests == 0
501
502
503 class TestByTestResult(testtools.TestResult):
504     """Call something every time a test completes."""
505
506     # XXX: Arguably belongs in testtools.
507
508     def __init__(self, on_test):
509         """Construct a ``TestByTestResult``.
510
511         :param on_test: A callable that take a test case, a status (one of
512             "success", "failure", "error", "skip", or "xfail"), a start time
513             (a ``datetime`` with timezone), a stop time, an iterable of tags,
514             and a details dict. Is called at the end of each test (i.e. on
515             ``stopTest``) with the accumulated values for that test.
516         """
517         super(TestByTestResult, self).__init__()
518         self._on_test = on_test
519
520     def startTest(self, test):
521         super(TestByTestResult, self).startTest(test)
522         self._start_time = self._now()
523         # There's no supported (i.e. tested) behaviour that relies on these
524         # being set, but it makes me more comfortable all the same. -- jml
525         self._status = None
526         self._details = None
527         self._stop_time = None
528
529     def stopTest(self, test):
530         self._stop_time = self._now()
531         super(TestByTestResult, self).stopTest(test)
532         self._on_test(
533             test=test,
534             status=self._status,
535             start_time=self._start_time,
536             stop_time=self._stop_time,
537             # current_tags is new in testtools 0.9.13.
538             tags=getattr(self, 'current_tags', None),
539             details=self._details)
540
541     def _err_to_details(self, test, err, details):
542         if details:
543             return details
544         return {'traceback': TracebackContent(err, test)}
545
546     def addSuccess(self, test, details=None):
547         super(TestByTestResult, self).addSuccess(test)
548         self._status = 'success'
549         self._details = details
550
551     def addFailure(self, test, err=None, details=None):
552         super(TestByTestResult, self).addFailure(test, err, details)
553         self._status = 'failure'
554         self._details = self._err_to_details(test, err, details)
555
556     def addError(self, test, err=None, details=None):
557         super(TestByTestResult, self).addError(test, err, details)
558         self._status = 'error'
559         self._details = self._err_to_details(test, err, details)
560
561     def addSkip(self, test, reason=None, details=None):
562         super(TestByTestResult, self).addSkip(test, reason, details)
563         self._status = 'skip'
564         if details is None:
565             details = {'reason': text_content(reason)}
566         elif reason:
567             # XXX: What if details already has 'reason' key?
568             details['reason'] = text_content(reason)
569         self._details = details
570
571     def addExpectedFailure(self, test, err=None, details=None):
572         super(TestByTestResult, self).addExpectedFailure(test, err, details)
573         self._status = 'xfail'
574         self._details = self._err_to_details(test, err, details)
575
576     def addUnexpectedSuccess(self, test, details=None):
577         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
578         self._status = 'success'
579         self._details = details
580
581
582 class CsvResult(TestByTestResult):
583
584     def __init__(self, stream):
585         super(CsvResult, self).__init__(self._on_test)
586         self._write_row = csv.writer(stream).writerow
587
588     def _on_test(self, test, status, start_time, stop_time, tags, details):
589         self._write_row([test.id(), status, start_time, stop_time])
590
591     def startTestRun(self):
592         super(CsvResult, self).startTestRun()
593         self._write_row(['test', 'status', 'start_time', 'stop_time'])