testtools: Merge in new upstream.
[nivanova/samba-autobuild/.git] / lib / testtools / testtools / testresult / real.py
1 # Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
2
3 """Test results and related things."""
4
5 __metaclass__ = type
6 __all__ = [
7     'ExtendedToOriginalDecorator',
8     'MultiTestResult',
9     'TestResult',
10     'ThreadsafeForwardingResult',
11     ]
12
13 import datetime
14 import sys
15 import unittest
16
17 from testtools.compat import all, _format_exc_info, str_is_unicode, _u
18
19 # From http://docs.python.org/library/datetime.html
20 _ZERO = datetime.timedelta(0)
21
22 # A UTC class.
23
24 class UTC(datetime.tzinfo):
25     """UTC"""
26
27     def utcoffset(self, dt):
28         return _ZERO
29
30     def tzname(self, dt):
31         return "UTC"
32
33     def dst(self, dt):
34         return _ZERO
35
36 utc = UTC()
37
38
39 class TestResult(unittest.TestResult):
40     """Subclass of unittest.TestResult extending the protocol for flexability.
41
42     This test result supports an experimental protocol for providing additional
43     data to in test outcomes. All the outcome methods take an optional dict
44     'details'. If supplied any other detail parameters like 'err' or 'reason'
45     should not be provided. The details dict is a mapping from names to
46     MIME content objects (see testtools.content). This permits attaching
47     tracebacks, log files, or even large objects like databases that were
48     part of the test fixture. Until this API is accepted into upstream
49     Python it is considered experimental: it may be replaced at any point
50     by a newer version more in line with upstream Python. Compatibility would
51     be aimed for in this case, but may not be possible.
52
53     :ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip.
54     """
55
56     def __init__(self):
57         # startTestRun resets all attributes, and older clients don't know to
58         # call startTestRun, so it is called once here.
59         # Because subclasses may reasonably not expect this, we call the 
60         # specific version we want to run.
61         TestResult.startTestRun(self)
62
63     def addExpectedFailure(self, test, err=None, details=None):
64         """Called when a test has failed in an expected manner.
65
66         Like with addSuccess and addError, testStopped should still be called.
67
68         :param test: The test that has been skipped.
69         :param err: The exc_info of the error that was raised.
70         :return: None
71         """
72         # This is the python 2.7 implementation
73         self.expectedFailures.append(
74             (test, self._err_details_to_string(test, err, details)))
75
76     def addError(self, test, err=None, details=None):
77         """Called when an error has occurred. 'err' is a tuple of values as
78         returned by sys.exc_info().
79
80         :param details: Alternative way to supply details about the outcome.
81             see the class docstring for more information.
82         """
83         self.errors.append((test,
84             self._err_details_to_string(test, err, details)))
85
86     def addFailure(self, test, err=None, details=None):
87         """Called when an error has occurred. 'err' is a tuple of values as
88         returned by sys.exc_info().
89
90         :param details: Alternative way to supply details about the outcome.
91             see the class docstring for more information.
92         """
93         self.failures.append((test,
94             self._err_details_to_string(test, err, details)))
95
96     def addSkip(self, test, reason=None, details=None):
97         """Called when a test has been skipped rather than running.
98
99         Like with addSuccess and addError, testStopped should still be called.
100
101         This must be called by the TestCase. 'addError' and 'addFailure' will
102         not call addSkip, since they have no assumptions about the kind of
103         errors that a test can raise.
104
105         :param test: The test that has been skipped.
106         :param reason: The reason for the test being skipped. For instance,
107             u"pyGL is not available".
108         :param details: Alternative way to supply details about the outcome.
109             see the class docstring for more information.
110         :return: None
111         """
112         if reason is None:
113             reason = details.get('reason')
114             if reason is None:
115                 reason = 'No reason given'
116             else:
117                 reason = ''.join(reason.iter_text())
118         skip_list = self.skip_reasons.setdefault(reason, [])
119         skip_list.append(test)
120
121     def addSuccess(self, test, details=None):
122         """Called when a test succeeded."""
123
124     def addUnexpectedSuccess(self, test, details=None):
125         """Called when a test was expected to fail, but succeed."""
126         self.unexpectedSuccesses.append(test)
127
128     def wasSuccessful(self):
129         """Has this result been successful so far?
130
131         If there have been any errors, failures or unexpected successes,
132         return False.  Otherwise, return True.
133
134         Note: This differs from standard unittest in that we consider
135         unexpected successes to be equivalent to failures, rather than
136         successes.
137         """
138         return not (self.errors or self.failures or self.unexpectedSuccesses)
139
140     if str_is_unicode:
141         # Python 3 and IronPython strings are unicode, use parent class method
142         _exc_info_to_unicode = unittest.TestResult._exc_info_to_string
143     else:
144         # For Python 2, need to decode components of traceback according to
145         # their source, so can't use traceback.format_exception
146         # Here follows a little deep magic to copy the existing method and
147         # replace the formatter with one that returns unicode instead
148         from types import FunctionType as __F, ModuleType as __M
149         __f = unittest.TestResult._exc_info_to_string.im_func
150         __g = dict(__f.func_globals)
151         __m = __M("__fake_traceback")
152         __m.format_exception = _format_exc_info
153         __g["traceback"] = __m
154         _exc_info_to_unicode = __F(__f.func_code, __g, "_exc_info_to_unicode")
155         del __F, __M, __f, __g, __m
156
157     def _err_details_to_string(self, test, err=None, details=None):
158         """Convert an error in exc_info form or a contents dict to a string."""
159         if err is not None:
160             return self._exc_info_to_unicode(err, test)
161         return _details_to_str(details)
162
163     def _now(self):
164         """Return the current 'test time'.
165
166         If the time() method has not been called, this is equivalent to
167         datetime.now(), otherwise its the last supplied datestamp given to the
168         time() method.
169         """
170         if self.__now is None:
171             return datetime.datetime.now(utc)
172         else:
173             return self.__now
174
175     def startTestRun(self):
176         """Called before a test run starts.
177
178         New in python 2.7. The testtools version resets the result to a
179         pristine condition ready for use in another test run.
180         """
181         super(TestResult, self).__init__()
182         self.skip_reasons = {}
183         self.__now = None
184         # -- Start: As per python 2.7 --
185         self.expectedFailures = []
186         self.unexpectedSuccesses = []
187         # -- End:   As per python 2.7 --
188
189     def stopTestRun(self):
190         """Called after a test run completes
191
192         New in python 2.7
193         """
194
195     def time(self, a_datetime):
196         """Provide a timestamp to represent the current time.
197
198         This is useful when test activity is time delayed, or happening
199         concurrently and getting the system time between API calls will not
200         accurately represent the duration of tests (or the whole run).
201
202         Calling time() sets the datetime used by the TestResult object.
203         Time is permitted to go backwards when using this call.
204
205         :param a_datetime: A datetime.datetime object with TZ information or
206             None to reset the TestResult to gathering time from the system.
207         """
208         self.__now = a_datetime
209
210     def done(self):
211         """Called when the test runner is done.
212
213         deprecated in favour of stopTestRun.
214         """
215
216
217 class MultiTestResult(TestResult):
218     """A test result that dispatches to many test results."""
219
220     def __init__(self, *results):
221         TestResult.__init__(self)
222         self._results = list(map(ExtendedToOriginalDecorator, results))
223
224     def _dispatch(self, message, *args, **kwargs):
225         return tuple(
226             getattr(result, message)(*args, **kwargs)
227             for result in self._results)
228
229     def startTest(self, test):
230         return self._dispatch('startTest', test)
231
232     def stopTest(self, test):
233         return self._dispatch('stopTest', test)
234
235     def addError(self, test, error=None, details=None):
236         return self._dispatch('addError', test, error, details=details)
237
238     def addExpectedFailure(self, test, err=None, details=None):
239         return self._dispatch(
240             'addExpectedFailure', test, err, details=details)
241
242     def addFailure(self, test, err=None, details=None):
243         return self._dispatch('addFailure', test, err, details=details)
244
245     def addSkip(self, test, reason=None, details=None):
246         return self._dispatch('addSkip', test, reason, details=details)
247
248     def addSuccess(self, test, details=None):
249         return self._dispatch('addSuccess', test, details=details)
250
251     def addUnexpectedSuccess(self, test, details=None):
252         return self._dispatch('addUnexpectedSuccess', test, details=details)
253
254     def startTestRun(self):
255         return self._dispatch('startTestRun')
256
257     def stopTestRun(self):
258         return self._dispatch('stopTestRun')
259
260     def time(self, a_datetime):
261         return self._dispatch('time', a_datetime)
262
263     def done(self):
264         return self._dispatch('done')
265
266     def wasSuccessful(self):
267         """Was this result successful?
268
269         Only returns True if every constituent result was successful.
270         """
271         return all(self._dispatch('wasSuccessful'))
272
273
274 class TextTestResult(TestResult):
275     """A TestResult which outputs activity to a text stream."""
276
277     def __init__(self, stream):
278         """Construct a TextTestResult writing to stream."""
279         super(TextTestResult, self).__init__()
280         self.stream = stream
281         self.sep1 = '=' * 70 + '\n'
282         self.sep2 = '-' * 70 + '\n'
283
284     def _delta_to_float(self, a_timedelta):
285         return (a_timedelta.days * 86400.0 + a_timedelta.seconds +
286             a_timedelta.microseconds / 1000000.0)
287
288     def _show_list(self, label, error_list):
289         for test, output in error_list:
290             self.stream.write(self.sep1)
291             self.stream.write("%s: %s\n" % (label, test.id()))
292             self.stream.write(self.sep2)
293             self.stream.write(output)
294
295     def startTestRun(self):
296         super(TextTestResult, self).startTestRun()
297         self.__start = self._now()
298         self.stream.write("Tests running...\n")
299
300     def stopTestRun(self):
301         if self.testsRun != 1:
302             plural = 's'
303         else:
304             plural = ''
305         stop = self._now()
306         self._show_list('ERROR', self.errors)
307         self._show_list('FAIL', self.failures)
308         for test in self.unexpectedSuccesses:
309             self.stream.write(
310                 "%sUNEXPECTED SUCCESS: %s\n%s" % (
311                     self.sep1, test.id(), self.sep2))
312         self.stream.write("Ran %d test%s in %.3fs\n\n" %
313             (self.testsRun, plural,
314              self._delta_to_float(stop - self.__start)))
315         if self.wasSuccessful():
316             self.stream.write("OK\n")
317         else:
318             self.stream.write("FAILED (")
319             details = []
320             details.append("failures=%d" % (
321                 sum(map(len, (
322                     self.failures, self.errors, self.unexpectedSuccesses)))))
323             self.stream.write(", ".join(details))
324             self.stream.write(")\n")
325         super(TextTestResult, self).stopTestRun()
326
327
328 class ThreadsafeForwardingResult(TestResult):
329     """A TestResult which ensures the target does not receive mixed up calls.
330
331     This is used when receiving test results from multiple sources, and batches
332     up all the activity for a single test into a thread-safe batch where all
333     other ThreadsafeForwardingResult objects sharing the same semaphore will be
334     locked out.
335
336     Typical use of ThreadsafeForwardingResult involves creating one
337     ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These
338     forward to the TestResult that the ConcurrentTestSuite run method was
339     called with.
340
341     target.done() is called once for each ThreadsafeForwardingResult that
342     forwards to the same target. If the target's done() takes special action,
343     care should be taken to accommodate this.
344     """
345
346     def __init__(self, target, semaphore):
347         """Create a ThreadsafeForwardingResult forwarding to target.
348
349         :param target: A TestResult.
350         :param semaphore: A threading.Semaphore with limit 1.
351         """
352         TestResult.__init__(self)
353         self.result = ExtendedToOriginalDecorator(target)
354         self.semaphore = semaphore
355
356     def _add_result_with_semaphore(self, method, test, *args, **kwargs):
357         self.semaphore.acquire()
358         try:
359             self.result.time(self._test_start)
360             self.result.startTest(test)
361             self.result.time(self._now())
362             try:
363                 method(test, *args, **kwargs)
364             finally:
365                 self.result.stopTest(test)
366         finally:
367             self.semaphore.release()
368
369     def addError(self, test, err=None, details=None):
370         self._add_result_with_semaphore(self.result.addError,
371             test, err, details=details)
372
373     def addExpectedFailure(self, test, err=None, details=None):
374         self._add_result_with_semaphore(self.result.addExpectedFailure,
375             test, err, details=details)
376
377     def addFailure(self, test, err=None, details=None):
378         self._add_result_with_semaphore(self.result.addFailure,
379             test, err, details=details)
380
381     def addSkip(self, test, reason=None, details=None):
382         self._add_result_with_semaphore(self.result.addSkip,
383             test, reason, details=details)
384
385     def addSuccess(self, test, details=None):
386         self._add_result_with_semaphore(self.result.addSuccess,
387             test, details=details)
388
389     def addUnexpectedSuccess(self, test, details=None):
390         self._add_result_with_semaphore(self.result.addUnexpectedSuccess,
391             test, details=details)
392
393     def startTestRun(self):
394         self.semaphore.acquire()
395         try:
396             self.result.startTestRun()
397         finally:
398             self.semaphore.release()
399
400     def stopTestRun(self):
401         self.semaphore.acquire()
402         try:
403             self.result.stopTestRun()
404         finally:
405             self.semaphore.release()
406
407     def done(self):
408         self.semaphore.acquire()
409         try:
410             self.result.done()
411         finally:
412             self.semaphore.release()
413
414     def startTest(self, test):
415         self._test_start = self._now()
416         super(ThreadsafeForwardingResult, self).startTest(test)
417
418     def wasSuccessful(self):
419         return self.result.wasSuccessful()
420
421
422 class ExtendedToOriginalDecorator(object):
423     """Permit new TestResult API code to degrade gracefully with old results.
424
425     This decorates an existing TestResult and converts missing outcomes
426     such as addSkip to older outcomes such as addSuccess. It also supports
427     the extended details protocol. In all cases the most recent protocol
428     is attempted first, and fallbacks only occur when the decorated result
429     does not support the newer style of calling.
430     """
431
432     def __init__(self, decorated):
433         self.decorated = decorated
434
435     def __getattr__(self, name):
436         return getattr(self.decorated, name)
437
438     def addError(self, test, err=None, details=None):
439         self._check_args(err, details)
440         if details is not None:
441             try:
442                 return self.decorated.addError(test, details=details)
443             except TypeError:
444                 # have to convert
445                 err = self._details_to_exc_info(details)
446         return self.decorated.addError(test, err)
447
448     def addExpectedFailure(self, test, err=None, details=None):
449         self._check_args(err, details)
450         addExpectedFailure = getattr(
451             self.decorated, 'addExpectedFailure', None)
452         if addExpectedFailure is None:
453             return self.addSuccess(test)
454         if details is not None:
455             try:
456                 return addExpectedFailure(test, details=details)
457             except TypeError:
458                 # have to convert
459                 err = self._details_to_exc_info(details)
460         return addExpectedFailure(test, err)
461
462     def addFailure(self, test, err=None, details=None):
463         self._check_args(err, details)
464         if details is not None:
465             try:
466                 return self.decorated.addFailure(test, details=details)
467             except TypeError:
468                 # have to convert
469                 err = self._details_to_exc_info(details)
470         return self.decorated.addFailure(test, err)
471
472     def addSkip(self, test, reason=None, details=None):
473         self._check_args(reason, details)
474         addSkip = getattr(self.decorated, 'addSkip', None)
475         if addSkip is None:
476             return self.decorated.addSuccess(test)
477         if details is not None:
478             try:
479                 return addSkip(test, details=details)
480             except TypeError:
481                 # extract the reason if it's available
482                 try:
483                     reason = ''.join(details['reason'].iter_text())
484                 except KeyError:
485                     reason = _details_to_str(details)
486         return addSkip(test, reason)
487
488     def addUnexpectedSuccess(self, test, details=None):
489         outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
490         if outcome is None:
491             try:
492                 test.fail("")
493             except test.failureException:
494                 return self.addFailure(test, sys.exc_info())
495         if details is not None:
496             try:
497                 return outcome(test, details=details)
498             except TypeError:
499                 pass
500         return outcome(test)
501
502     def addSuccess(self, test, details=None):
503         if details is not None:
504             try:
505                 return self.decorated.addSuccess(test, details=details)
506             except TypeError:
507                 pass
508         return self.decorated.addSuccess(test)
509
510     def _check_args(self, err, details):
511         param_count = 0
512         if err is not None:
513             param_count += 1
514         if details is not None:
515             param_count += 1
516         if param_count != 1:
517             raise ValueError("Must pass only one of err '%s' and details '%s"
518                 % (err, details))
519
520     def _details_to_exc_info(self, details):
521         """Convert a details dict to an exc_info tuple."""
522         return (_StringException,
523             _StringException(_details_to_str(details)), None)
524
525     def done(self):
526         try:
527             return self.decorated.done()
528         except AttributeError:
529             return
530
531     def progress(self, offset, whence):
532         method = getattr(self.decorated, 'progress', None)
533         if method is None:
534             return
535         return method(offset, whence)
536
537     @property
538     def shouldStop(self):
539         return self.decorated.shouldStop
540
541     def startTest(self, test):
542         return self.decorated.startTest(test)
543
544     def startTestRun(self):
545         try:
546             return self.decorated.startTestRun()
547         except AttributeError:
548             return
549
550     def stop(self):
551         return self.decorated.stop()
552
553     def stopTest(self, test):
554         return self.decorated.stopTest(test)
555
556     def stopTestRun(self):
557         try:
558             return self.decorated.stopTestRun()
559         except AttributeError:
560             return
561
562     def tags(self, new_tags, gone_tags):
563         method = getattr(self.decorated, 'tags', None)
564         if method is None:
565             return
566         return method(new_tags, gone_tags)
567
568     def time(self, a_datetime):
569         method = getattr(self.decorated, 'time', None)
570         if method is None:
571             return
572         return method(a_datetime)
573
574     def wasSuccessful(self):
575         return self.decorated.wasSuccessful()
576
577
578 class _StringException(Exception):
579     """An exception made from an arbitrary string."""
580
581     if not str_is_unicode:
582         def __init__(self, string):
583             if type(string) is not unicode:
584                 raise TypeError("_StringException expects unicode, got %r" %
585                     (string,))
586             Exception.__init__(self, string)
587
588         def __str__(self):
589             return self.args[0].encode("utf-8")
590
591         def __unicode__(self):
592             return self.args[0]
593     # For 3.0 and above the default __str__ is fine, so we don't define one.
594
595     def __hash__(self):
596         return id(self)
597
598     def __eq__(self, other):
599         try:
600             return self.args == other.args
601         except AttributeError:
602             return False
603
604
605 def _details_to_str(details):
606     """Convert a details dict to a string."""
607     chars = []
608     # sorted is for testing, may want to remove that and use a dict
609     # subclass with defined order for items instead.
610     for key, content in sorted(details.items()):
611         if content.content_type.type != 'text':
612             chars.append('Binary content: %s\n' % key)
613             continue
614         chars.append('Text attachment: %s\n' % key)
615         chars.append('------------\n')
616         chars.extend(content.iter_text())
617         if not chars[-1].endswith('\n'):
618             chars.append('\n')
619         chars.append('------------\n')
620     return _u('').join(chars)