testtools: Import new upstream snapshot.
[nivanova/samba-autobuild/.git] / lib / testtools / testtools / deferredruntest.py
1 # Copyright (c) 2010 Jonathan M. Lange. See LICENSE for details.
2
3 """Individual test case execution for tests that return Deferreds.
4
5 This module is highly experimental and is liable to change in ways that cause
6 subtle failures in tests.  Use at your own peril.
7 """
8
9 __all__ = [
10     'assert_fails_with',
11     'AsynchronousDeferredRunTest',
12     'AsynchronousDeferredRunTestForBrokenTwisted',
13     'SynchronousDeferredRunTest',
14     ]
15
16 import sys
17
18 from testtools import try_imports
19 from testtools.content import (
20     Content,
21     text_content,
22     )
23 from testtools.content_type import UTF8_TEXT
24 from testtools.runtest import RunTest
25 from testtools._spinner import (
26     extract_result,
27     NoResultError,
28     Spinner,
29     TimeoutError,
30     trap_unhandled_errors,
31     )
32
33 from twisted.internet import defer
34 from twisted.python import log
35 from twisted.trial.unittest import _LogObserver
36
37 StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
38
39
40 class _DeferredRunTest(RunTest):
41     """Base for tests that return Deferreds."""
42
43     def _got_user_failure(self, failure, tb_label='traceback'):
44         """We got a failure from user code."""
45         return self._got_user_exception(
46             (failure.type, failure.value, failure.getTracebackObject()),
47             tb_label=tb_label)
48
49
50 class SynchronousDeferredRunTest(_DeferredRunTest):
51     """Runner for tests that return synchronous Deferreds."""
52
53     def _run_user(self, function, *args):
54         d = defer.maybeDeferred(function, *args)
55         d.addErrback(self._got_user_failure)
56         result = extract_result(d)
57         return result
58
59
60 def run_with_log_observers(observers, function, *args, **kwargs):
61     """Run 'function' with the given Twisted log observers."""
62     real_observers = log.theLogPublisher.observers
63     for observer in real_observers:
64         log.theLogPublisher.removeObserver(observer)
65     for observer in observers:
66         log.theLogPublisher.addObserver(observer)
67     try:
68         return function(*args, **kwargs)
69     finally:
70         for observer in observers:
71             log.theLogPublisher.removeObserver(observer)
72         for observer in real_observers:
73             log.theLogPublisher.addObserver(observer)
74
75
76 # Observer of the Twisted log that we install during tests.
77 _log_observer = _LogObserver()
78
79
80
81 class AsynchronousDeferredRunTest(_DeferredRunTest):
82     """Runner for tests that return Deferreds that fire asynchronously.
83
84     That is, this test runner assumes that the Deferreds will only fire if the
85     reactor is left to spin for a while.
86
87     Do not rely too heavily on the nuances of the behaviour of this class.
88     What it does to the reactor is black magic, and if we can find nicer ways
89     of doing it we will gladly break backwards compatibility.
90
91     This is highly experimental code.  Use at your own risk.
92     """
93
94     def __init__(self, case, handlers=None, reactor=None, timeout=0.005,
95                  debug=False):
96         """Construct an `AsynchronousDeferredRunTest`.
97
98         :param case: The `testtools.TestCase` to run.
99         :param handlers: A list of exception handlers (ExceptionType, handler)
100             where 'handler' is a callable that takes a `TestCase`, a
101             `TestResult` and the exception raised.
102         :param reactor: The Twisted reactor to use.  If not given, we use the
103             default reactor.
104         :param timeout: The maximum time allowed for running a test.  The
105             default is 0.005s.
106         :param debug: Whether or not to enable Twisted's debugging.  Use this
107             to get information about unhandled Deferreds and left-over
108             DelayedCalls.  Defaults to False.
109         """
110         super(AsynchronousDeferredRunTest, self).__init__(case, handlers)
111         if reactor is None:
112             from twisted.internet import reactor
113         self._reactor = reactor
114         self._timeout = timeout
115         self._debug = debug
116
117     @classmethod
118     def make_factory(cls, reactor=None, timeout=0.005, debug=False):
119         """Make a factory that conforms to the RunTest factory interface."""
120         # This is horrible, but it means that the return value of the method
121         # will be able to be assigned to a class variable *and* also be
122         # invoked directly.
123         class AsynchronousDeferredRunTestFactory:
124             def __call__(self, case, handlers=None):
125                 return cls(case, handlers, reactor, timeout, debug)
126         return AsynchronousDeferredRunTestFactory()
127
128     @defer.deferredGenerator
129     def _run_cleanups(self):
130         """Run the cleanups on the test case.
131
132         We expect that the cleanups on the test case can also return
133         asynchronous Deferreds.  As such, we take the responsibility for
134         running the cleanups, rather than letting TestCase do it.
135         """
136         while self.case._cleanups:
137             f, args, kwargs = self.case._cleanups.pop()
138             d = defer.maybeDeferred(f, *args, **kwargs)
139             thing = defer.waitForDeferred(d)
140             yield thing
141             try:
142                 thing.getResult()
143             except Exception:
144                 exc_info = sys.exc_info()
145                 self.case._report_traceback(exc_info)
146                 last_exception = exc_info[1]
147         yield last_exception
148
149     def _make_spinner(self):
150         """Make the `Spinner` to be used to run the tests."""
151         return Spinner(self._reactor, debug=self._debug)
152
153     def _run_deferred(self):
154         """Run the test, assuming everything in it is Deferred-returning.
155
156         This should return a Deferred that fires with True if the test was
157         successful and False if the test was not successful.  It should *not*
158         call addSuccess on the result, because there's reactor clean up that
159         we needs to be done afterwards.
160         """
161         fails = []
162
163         def fail_if_exception_caught(exception_caught):
164             if self.exception_caught == exception_caught:
165                 fails.append(None)
166
167         def clean_up(ignored=None):
168             """Run the cleanups."""
169             d = self._run_cleanups()
170             def clean_up_done(result):
171                 if result is not None:
172                     self._exceptions.append(result)
173                     fails.append(None)
174             return d.addCallback(clean_up_done)
175
176         def set_up_done(exception_caught):
177             """Set up is done, either clean up or run the test."""
178             if self.exception_caught == exception_caught:
179                 fails.append(None)
180                 return clean_up()
181             else:
182                 d = self._run_user(self.case._run_test_method, self.result)
183                 d.addCallback(fail_if_exception_caught)
184                 d.addBoth(tear_down)
185                 return d
186
187         def tear_down(ignored):
188             d = self._run_user(self.case._run_teardown, self.result)
189             d.addCallback(fail_if_exception_caught)
190             d.addBoth(clean_up)
191             return d
192
193         d = self._run_user(self.case._run_setup, self.result)
194         d.addCallback(set_up_done)
195         d.addBoth(lambda ignored: len(fails) == 0)
196         return d
197
198     def _log_user_exception(self, e):
199         """Raise 'e' and report it as a user exception."""
200         try:
201             raise e
202         except e.__class__:
203             self._got_user_exception(sys.exc_info())
204
205     def _blocking_run_deferred(self, spinner):
206         try:
207             return trap_unhandled_errors(
208                 spinner.run, self._timeout, self._run_deferred)
209         except NoResultError:
210             # We didn't get a result at all!  This could be for any number of
211             # reasons, but most likely someone hit Ctrl-C during the test.
212             raise KeyboardInterrupt
213         except TimeoutError:
214             # The function took too long to run.
215             self._log_user_exception(TimeoutError(self.case, self._timeout))
216             return False, []
217
218     def _run_core(self):
219         # Add an observer to trap all logged errors.
220         error_observer = _log_observer
221         full_log = StringIO()
222         full_observer = log.FileLogObserver(full_log)
223         spinner = self._make_spinner()
224         successful, unhandled = run_with_log_observers(
225             [error_observer.gotEvent, full_observer.emit],
226             self._blocking_run_deferred, spinner)
227
228         self.case.addDetail(
229             'twisted-log', Content(UTF8_TEXT, full_log.readlines))
230
231         logged_errors = error_observer.flushErrors()
232         for logged_error in logged_errors:
233             successful = False
234             self._got_user_failure(logged_error, tb_label='logged-error')
235
236         if unhandled:
237             successful = False
238             for debug_info in unhandled:
239                 f = debug_info.failResult
240                 info = debug_info._getDebugTracebacks()
241                 if info:
242                     self.case.addDetail(
243                         'unhandled-error-in-deferred-debug',
244                         text_content(info))
245                 self._got_user_failure(f, 'unhandled-error-in-deferred')
246
247         junk = spinner.clear_junk()
248         if junk:
249             successful = False
250             self._log_user_exception(UncleanReactorError(junk))
251
252         if successful:
253             self.result.addSuccess(self.case, details=self.case.getDetails())
254
255     def _run_user(self, function, *args):
256         """Run a user-supplied function.
257
258         This just makes sure that it returns a Deferred, regardless of how the
259         user wrote it.
260         """
261         d = defer.maybeDeferred(function, *args)
262         return d.addErrback(self._got_user_failure)
263
264
265 class AsynchronousDeferredRunTestForBrokenTwisted(AsynchronousDeferredRunTest):
266     """Test runner that works around Twisted brokenness re reactor junk.
267
268     There are many APIs within Twisted itself where a Deferred fires but
269     leaves cleanup work scheduled for the reactor to do.  Arguably, many of
270     these are bugs.  This runner iterates the reactor event loop a number of
271     times after every test, in order to shake out these buggy-but-commonplace
272     events.
273     """
274
275     def _make_spinner(self):
276         spinner = super(
277             AsynchronousDeferredRunTestForBrokenTwisted, self)._make_spinner()
278         spinner._OBLIGATORY_REACTOR_ITERATIONS = 2
279         return spinner
280
281
282 def assert_fails_with(d, *exc_types, **kwargs):
283     """Assert that 'd' will fail with one of 'exc_types'.
284
285     The normal way to use this is to return the result of 'assert_fails_with'
286     from your unit test.
287
288     Note that this function is experimental and unstable.  Use at your own
289     peril; expect the API to change.
290
291     :param d: A Deferred that is expected to fail.
292     :param *exc_types: The exception types that the Deferred is expected to
293         fail with.
294     :param failureException: An optional keyword argument.  If provided, will
295         raise that exception instead of `testtools.TestCase.failureException`.
296     :return: A Deferred that will fail with an `AssertionError` if 'd' does
297         not fail with one of the exception types.
298     """
299     failureException = kwargs.pop('failureException', None)
300     if failureException is None:
301         # Avoid circular imports.
302         from testtools import TestCase
303         failureException = TestCase.failureException
304     expected_names = ", ".join(exc_type.__name__ for exc_type in exc_types)
305     def got_success(result):
306         raise failureException(
307             "%s not raised (%r returned)" % (expected_names, result))
308     def got_failure(failure):
309         if failure.check(*exc_types):
310             return failure.value
311         raise failureException("%s raised instead of %s:\n %s" % (
312             failure.type.__name__, expected_names, failure.getTraceback()))
313     return d.addCallbacks(got_success, got_failure)
314
315
316 def flush_logged_errors(*error_types):
317     return _log_observer.flushErrors(*error_types)
318
319
320 class UncleanReactorError(Exception):
321     """Raised when the reactor has junk in it."""
322
323     def __init__(self, junk):
324         Exception.__init__(self,
325             "The reactor still thinks it needs to do things. Close all "
326             "connections, kill all processes and make sure all delayed "
327             "calls have either fired or been cancelled:\n%s"
328             % ''.join(map(self._get_junk_info, junk)))
329
330     def _get_junk_info(self, junk):
331         from twisted.internet.base import DelayedCall
332         if isinstance(junk, DelayedCall):
333             ret = str(junk)
334         else:
335             ret = repr(junk)
336         return '  %s\n' % (ret,)