1 # Copyright (c) 2010 Jonathan M. Lange. See LICENSE for details.
3 """Individual test case execution for tests that return Deferreds.
5 This module is highly experimental and is liable to change in ways that cause
6 subtle failures in tests. Use at your own peril.
11 'AsynchronousDeferredRunTest',
12 'AsynchronousDeferredRunTestForBrokenTwisted',
13 'SynchronousDeferredRunTest',
18 from testtools import try_imports
19 from testtools.content import (
23 from testtools.content_type import UTF8_TEXT
24 from testtools.runtest import RunTest
25 from testtools._spinner import (
30 trap_unhandled_errors,
33 from twisted.internet import defer
34 from twisted.python import log
35 from twisted.trial.unittest import _LogObserver
37 StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
40 class _DeferredRunTest(RunTest):
41 """Base for tests that return Deferreds."""
43 def _got_user_failure(self, failure, tb_label='traceback'):
44 """We got a failure from user code."""
45 return self._got_user_exception(
46 (failure.type, failure.value, failure.getTracebackObject()),
50 class SynchronousDeferredRunTest(_DeferredRunTest):
51 """Runner for tests that return synchronous Deferreds."""
53 def _run_user(self, function, *args):
54 d = defer.maybeDeferred(function, *args)
55 d.addErrback(self._got_user_failure)
56 result = extract_result(d)
60 def run_with_log_observers(observers, function, *args, **kwargs):
61 """Run 'function' with the given Twisted log observers."""
62 real_observers = log.theLogPublisher.observers
63 for observer in real_observers:
64 log.theLogPublisher.removeObserver(observer)
65 for observer in observers:
66 log.theLogPublisher.addObserver(observer)
68 return function(*args, **kwargs)
70 for observer in observers:
71 log.theLogPublisher.removeObserver(observer)
72 for observer in real_observers:
73 log.theLogPublisher.addObserver(observer)
76 # Observer of the Twisted log that we install during tests.
77 _log_observer = _LogObserver()
81 class AsynchronousDeferredRunTest(_DeferredRunTest):
82 """Runner for tests that return Deferreds that fire asynchronously.
84 That is, this test runner assumes that the Deferreds will only fire if the
85 reactor is left to spin for a while.
87 Do not rely too heavily on the nuances of the behaviour of this class.
88 What it does to the reactor is black magic, and if we can find nicer ways
89 of doing it we will gladly break backwards compatibility.
91 This is highly experimental code. Use at your own risk.
94 def __init__(self, case, handlers=None, reactor=None, timeout=0.005,
96 """Construct an `AsynchronousDeferredRunTest`.
98 :param case: The `testtools.TestCase` to run.
99 :param handlers: A list of exception handlers (ExceptionType, handler)
100 where 'handler' is a callable that takes a `TestCase`, a
101 `TestResult` and the exception raised.
102 :param reactor: The Twisted reactor to use. If not given, we use the
104 :param timeout: The maximum time allowed for running a test. The
106 :param debug: Whether or not to enable Twisted's debugging. Use this
107 to get information about unhandled Deferreds and left-over
108 DelayedCalls. Defaults to False.
110 super(AsynchronousDeferredRunTest, self).__init__(case, handlers)
112 from twisted.internet import reactor
113 self._reactor = reactor
114 self._timeout = timeout
118 def make_factory(cls, reactor=None, timeout=0.005, debug=False):
119 """Make a factory that conforms to the RunTest factory interface."""
120 # This is horrible, but it means that the return value of the method
121 # will be able to be assigned to a class variable *and* also be
123 class AsynchronousDeferredRunTestFactory:
124 def __call__(self, case, handlers=None):
125 return cls(case, handlers, reactor, timeout, debug)
126 return AsynchronousDeferredRunTestFactory()
128 @defer.deferredGenerator
129 def _run_cleanups(self):
130 """Run the cleanups on the test case.
132 We expect that the cleanups on the test case can also return
133 asynchronous Deferreds. As such, we take the responsibility for
134 running the cleanups, rather than letting TestCase do it.
136 while self.case._cleanups:
137 f, args, kwargs = self.case._cleanups.pop()
138 d = defer.maybeDeferred(f, *args, **kwargs)
139 thing = defer.waitForDeferred(d)
144 exc_info = sys.exc_info()
145 self.case._report_traceback(exc_info)
146 last_exception = exc_info[1]
149 def _make_spinner(self):
150 """Make the `Spinner` to be used to run the tests."""
151 return Spinner(self._reactor, debug=self._debug)
153 def _run_deferred(self):
154 """Run the test, assuming everything in it is Deferred-returning.
156 This should return a Deferred that fires with True if the test was
157 successful and False if the test was not successful. It should *not*
158 call addSuccess on the result, because there's reactor clean up that
159 we needs to be done afterwards.
163 def fail_if_exception_caught(exception_caught):
164 if self.exception_caught == exception_caught:
167 def clean_up(ignored=None):
168 """Run the cleanups."""
169 d = self._run_cleanups()
170 def clean_up_done(result):
171 if result is not None:
172 self._exceptions.append(result)
174 return d.addCallback(clean_up_done)
176 def set_up_done(exception_caught):
177 """Set up is done, either clean up or run the test."""
178 if self.exception_caught == exception_caught:
182 d = self._run_user(self.case._run_test_method, self.result)
183 d.addCallback(fail_if_exception_caught)
187 def tear_down(ignored):
188 d = self._run_user(self.case._run_teardown, self.result)
189 d.addCallback(fail_if_exception_caught)
193 d = self._run_user(self.case._run_setup, self.result)
194 d.addCallback(set_up_done)
195 d.addBoth(lambda ignored: len(fails) == 0)
198 def _log_user_exception(self, e):
199 """Raise 'e' and report it as a user exception."""
203 self._got_user_exception(sys.exc_info())
205 def _blocking_run_deferred(self, spinner):
207 return trap_unhandled_errors(
208 spinner.run, self._timeout, self._run_deferred)
209 except NoResultError:
210 # We didn't get a result at all! This could be for any number of
211 # reasons, but most likely someone hit Ctrl-C during the test.
212 raise KeyboardInterrupt
214 # The function took too long to run.
215 self._log_user_exception(TimeoutError(self.case, self._timeout))
219 # Add an observer to trap all logged errors.
220 error_observer = _log_observer
221 full_log = StringIO()
222 full_observer = log.FileLogObserver(full_log)
223 spinner = self._make_spinner()
224 successful, unhandled = run_with_log_observers(
225 [error_observer.gotEvent, full_observer.emit],
226 self._blocking_run_deferred, spinner)
229 'twisted-log', Content(UTF8_TEXT, full_log.readlines))
231 logged_errors = error_observer.flushErrors()
232 for logged_error in logged_errors:
234 self._got_user_failure(logged_error, tb_label='logged-error')
238 for debug_info in unhandled:
239 f = debug_info.failResult
240 info = debug_info._getDebugTracebacks()
243 'unhandled-error-in-deferred-debug',
245 self._got_user_failure(f, 'unhandled-error-in-deferred')
247 junk = spinner.clear_junk()
250 self._log_user_exception(UncleanReactorError(junk))
253 self.result.addSuccess(self.case, details=self.case.getDetails())
255 def _run_user(self, function, *args):
256 """Run a user-supplied function.
258 This just makes sure that it returns a Deferred, regardless of how the
261 d = defer.maybeDeferred(function, *args)
262 return d.addErrback(self._got_user_failure)
265 class AsynchronousDeferredRunTestForBrokenTwisted(AsynchronousDeferredRunTest):
266 """Test runner that works around Twisted brokenness re reactor junk.
268 There are many APIs within Twisted itself where a Deferred fires but
269 leaves cleanup work scheduled for the reactor to do. Arguably, many of
270 these are bugs. This runner iterates the reactor event loop a number of
271 times after every test, in order to shake out these buggy-but-commonplace
275 def _make_spinner(self):
277 AsynchronousDeferredRunTestForBrokenTwisted, self)._make_spinner()
278 spinner._OBLIGATORY_REACTOR_ITERATIONS = 2
282 def assert_fails_with(d, *exc_types, **kwargs):
283 """Assert that 'd' will fail with one of 'exc_types'.
285 The normal way to use this is to return the result of 'assert_fails_with'
288 Note that this function is experimental and unstable. Use at your own
289 peril; expect the API to change.
291 :param d: A Deferred that is expected to fail.
292 :param *exc_types: The exception types that the Deferred is expected to
294 :param failureException: An optional keyword argument. If provided, will
295 raise that exception instead of `testtools.TestCase.failureException`.
296 :return: A Deferred that will fail with an `AssertionError` if 'd' does
297 not fail with one of the exception types.
299 failureException = kwargs.pop('failureException', None)
300 if failureException is None:
301 # Avoid circular imports.
302 from testtools import TestCase
303 failureException = TestCase.failureException
304 expected_names = ", ".join(exc_type.__name__ for exc_type in exc_types)
305 def got_success(result):
306 raise failureException(
307 "%s not raised (%r returned)" % (expected_names, result))
308 def got_failure(failure):
309 if failure.check(*exc_types):
311 raise failureException("%s raised instead of %s:\n %s" % (
312 failure.type.__name__, expected_names, failure.getTraceback()))
313 return d.addCallbacks(got_success, got_failure)
316 def flush_logged_errors(*error_types):
317 return _log_observer.flushErrors(*error_types)
320 class UncleanReactorError(Exception):
321 """Raised when the reactor has junk in it."""
323 def __init__(self, junk):
324 Exception.__init__(self,
325 "The reactor still thinks it needs to do things. Close all "
326 "connections, kill all processes and make sure all delayed "
327 "calls have either fired or been cancelled:\n%s"
328 % ''.join(map(self._get_junk_info, junk)))
330 def _get_junk_info(self, junk):
331 from twisted.internet.base import DelayedCall
332 if isinstance(junk, DelayedCall):
336 return ' %s\n' % (ret,)