testtools: Fix included testtools, for systems that don't have it.
[nivanova/samba-autobuild/.git] / lib / testtools / testtools / testsuite.py
1 # Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
2
3 """Test suites and related things."""
4
5 __metaclass__ = type
6 __all__ = [
7   'ConcurrentTestSuite',
8   ]
9
10 try:
11     import Queue
12 except ImportError:
13     import queue as Queue
14 import threading
15 import unittest
16
17 import testtools
18
19
20 class ConcurrentTestSuite(unittest.TestSuite):
21     """A TestSuite whose run() calls out to a concurrency strategy."""
22
23     def __init__(self, suite, make_tests):
24         """Create a ConcurrentTestSuite to execute suite.
25
26         :param suite: A suite to run concurrently.
27         :param make_tests: A helper function to split the tests in the
28             ConcurrentTestSuite into some number of concurrently executing
29             sub-suites. make_tests must take a suite, and return an iterable
30             of TestCase-like object, each of which must have a run(result)
31             method.
32         """
33         super(ConcurrentTestSuite, self).__init__([suite])
34         self.make_tests = make_tests
35
36     def run(self, result):
37         """Run the tests concurrently.
38
39         This calls out to the provided make_tests helper, and then serialises
40         the results so that result only sees activity from one TestCase at
41         a time.
42
43         ConcurrentTestSuite provides no special mechanism to stop the tests
44         returned by make_tests, it is up to the make_tests to honour the
45         shouldStop attribute on the result object they are run with, which will
46         be set if an exception is raised in the thread which
47         ConcurrentTestSuite.run is called in.
48         """
49         tests = self.make_tests(self)
50         try:
51             threads = {}
52             queue = Queue.Queue()
53             result_semaphore = threading.Semaphore(1)
54             for test in tests:
55                 process_result = testtools.ThreadsafeForwardingResult(result,
56                     result_semaphore)
57                 reader_thread = threading.Thread(
58                     target=self._run_test, args=(test, process_result, queue))
59                 threads[test] = reader_thread, process_result
60                 reader_thread.start()
61             while threads:
62                 finished_test = queue.get()
63                 threads[finished_test][0].join()
64                 del threads[finished_test]
65         except:
66             for thread, process_result in threads.values():
67                 process_result.stop()
68             raise
69
70     def _run_test(self, test, process_result, queue):
71         try:
72             test.run(process_result)
73         finally:
74             queue.put(test)