Merge tags support.
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  This program is free software; you can redistribute it and/or modify
6 #  it under the terms of the GNU General Public License as published by
7 #  the Free Software Foundation; either version 2 of the License, or
8 #  (at your option) any later version.
9 #
10 #  This program is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #  GNU General Public License for more details.
14 #
15 #  You should have received a copy of the GNU General Public License
16 #  along with this program; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 #
19
20 import os
21 from StringIO import StringIO
22 import subprocess
23 import sys
24 import unittest
25
26 def test_suite():
27     import subunit.tests
28     return subunit.tests.test_suite()
29
30
31 def join_dir(base_path, path):
32     """
33     Returns an absolute path to C{path}, calculated relative to the parent
34     of C{base_path}.
35
36     @param base_path: A path to a file or directory.
37     @param path: An absolute path, or a path relative to the containing
38     directory of C{base_path}.
39
40     @return: An absolute path to C{path}.
41     """
42     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
43
44
45 class TestProtocolServer(object):
46     """A class for receiving results from a TestProtocol client.
47     
48     :ivar tags: The current tags associated with the protocol stream.
49     """
50
51     OUTSIDE_TEST = 0
52     TEST_STARTED = 1
53     READING_FAILURE = 2
54     READING_ERROR = 3
55
56     def __init__(self, client, stream=sys.stdout):
57         """Create a TestProtocol server instance.
58
59         client should be an object that provides
60          - startTest
61          - addSuccess
62          - addFailure
63          - addError
64          - stopTest
65         methods, i.e. a TestResult.
66         """
67         self.state = TestProtocolServer.OUTSIDE_TEST
68         self.client = client
69         self._stream = stream
70         self.tags = set()
71
72     def _addError(self, offset, line):
73         if (self.state == TestProtocolServer.TEST_STARTED and
74             self.current_test_description == line[offset:-1]):
75             self.state = TestProtocolServer.OUTSIDE_TEST
76             self.current_test_description = None
77             self.client.addError(self._current_test, RemoteError(""))
78             self.client.stopTest(self._current_test)
79             self._current_test = None
80         elif (self.state == TestProtocolServer.TEST_STARTED and
81             self.current_test_description + " [" == line[offset:-1]):
82             self.state = TestProtocolServer.READING_ERROR
83             self._message = ""
84         else:
85             self.stdOutLineReceived(line)
86
87     def _addFailure(self, offset, line):
88         if (self.state == TestProtocolServer.TEST_STARTED and
89             self.current_test_description == line[offset:-1]):
90             self.state = TestProtocolServer.OUTSIDE_TEST
91             self.current_test_description = None
92             self.client.addFailure(self._current_test, RemoteError())
93             self.client.stopTest(self._current_test)
94         elif (self.state == TestProtocolServer.TEST_STARTED and
95             self.current_test_description + " [" == line[offset:-1]):
96             self.state = TestProtocolServer.READING_FAILURE
97             self._message = ""
98         else:
99             self.stdOutLineReceived(line)
100
101     def _addSuccess(self, offset, line):
102         if (self.state == TestProtocolServer.TEST_STARTED and
103             self.current_test_description == line[offset:-1]):
104             self.client.addSuccess(self._current_test)
105             self.client.stopTest(self._current_test)
106             self.current_test_description = None
107             self._current_test = None
108             self.state = TestProtocolServer.OUTSIDE_TEST
109         else:
110             self.stdOutLineReceived(line)
111
112     def _appendMessage(self, line):
113         if line[0:2] == " ]":
114             # quoted ] start
115             self._message += line[1:]
116         else:
117             self._message += line
118
119     def endQuote(self, line):
120         if self.state == TestProtocolServer.READING_FAILURE:
121             self.state = TestProtocolServer.OUTSIDE_TEST
122             self.current_test_description = None
123             self.client.addFailure(self._current_test,
124                                    RemoteError(self._message))
125             self.client.stopTest(self._current_test)
126         elif self.state == TestProtocolServer.READING_ERROR:
127             self.state = TestProtocolServer.OUTSIDE_TEST
128             self.current_test_description = None
129             self.client.addError(self._current_test,
130                                  RemoteError(self._message))
131             self.client.stopTest(self._current_test)
132         else:
133             self.stdOutLineReceived(line)
134
135     def _handleTags(self, offset, line):
136         """Process a tags command."""
137         tags = line[offset:].split()
138         new_tags = set()
139         gone_tags = set()
140         for tag in tags:
141             if tag[0] == '-':
142                 gone_tags.add(tag[1:])
143             else:
144                 new_tags.add(tag)
145         if self.state == TestProtocolServer.OUTSIDE_TEST:
146             update_tags = self.tags
147         else:
148             update_tags = self._current_test.tags
149         update_tags.update(new_tags)
150         update_tags.difference_update(gone_tags)
151
152     def lineReceived(self, line):
153         """Call the appropriate local method for the received line."""
154         if line == "]\n":
155             self.endQuote(line)
156         elif (self.state == TestProtocolServer.READING_FAILURE or
157               self.state == TestProtocolServer.READING_ERROR):
158             self._appendMessage(line)
159         else:
160             parts = line.split(None, 1)
161             if len(parts) == 2:
162                 cmd, rest = parts
163                 offset = len(cmd) + 1
164                 cmd = cmd.strip(':')
165                 if cmd in ('test', 'testing'):
166                     self._startTest(offset, line)
167                 elif cmd == 'error':
168                     self._addError(offset, line)
169                 elif cmd == 'failure':
170                     self._addFailure(offset, line)
171                 elif cmd in ('success', 'successful'):
172                     self._addSuccess(offset, line)
173                 elif cmd in ('tags'):
174                     self._handleTags(offset, line)
175                 else:
176                     self.stdOutLineReceived(line)
177             else:
178                 self.stdOutLineReceived(line)
179
180     def lostConnection(self):
181         """The input connection has finished."""
182         if self.state == TestProtocolServer.TEST_STARTED:
183             self.client.addError(self._current_test,
184                                  RemoteError("lost connection during test '%s'"
185                                              % self.current_test_description))
186             self.client.stopTest(self._current_test)
187         elif self.state == TestProtocolServer.READING_ERROR:
188             self.client.addError(self._current_test,
189                                  RemoteError("lost connection during "
190                                              "error report of test "
191                                              "'%s'" %
192                                              self.current_test_description))
193             self.client.stopTest(self._current_test)
194         elif self.state == TestProtocolServer.READING_FAILURE:
195             self.client.addError(self._current_test,
196                                  RemoteError("lost connection during "
197                                              "failure report of test "
198                                              "'%s'" %
199                                              self.current_test_description))
200             self.client.stopTest(self._current_test)
201
202     def readFrom(self, pipe):
203         for line in pipe.readlines():
204             self.lineReceived(line)
205         self.lostConnection()
206
207     def _startTest(self, offset, line):
208         """Internal call to change state machine. Override startTest()."""
209         if self.state == TestProtocolServer.OUTSIDE_TEST:
210             self.state = TestProtocolServer.TEST_STARTED
211             self._current_test = RemotedTestCase(line[offset:-1])
212             self.current_test_description = line[offset:-1]
213             self.client.startTest(self._current_test)
214             self._current_test.tags = set(self.tags)
215         else:
216             self.stdOutLineReceived(line)
217
218     def stdOutLineReceived(self, line):
219         self._stream.write(line)
220
221
222 class RemoteException(Exception):
223     """An exception that occured remotely to Python."""
224
225     def __eq__(self, other):
226         try:
227             return self.args == other.args
228         except AttributeError:
229             return False
230
231
232 class TestProtocolClient(unittest.TestResult):
233     """A class that looks like a TestResult and informs a TestProtocolServer."""
234
235     def __init__(self, stream):
236         unittest.TestResult.__init__(self)
237         self._stream = stream
238
239     def addError(self, test, error):
240         """Report an error in test test."""
241         self._stream.write("error: %s [\n" % test.id())
242         for line in self._exc_info_to_string(error, test).splitlines():
243             self._stream.write("%s\n" % line)
244         self._stream.write("]\n")
245
246     def addFailure(self, test, error):
247         """Report a failure in test test."""
248         self._stream.write("failure: %s [\n" % test.id())
249         for line in self._exc_info_to_string(error, test).splitlines():
250             self._stream.write("%s\n" % line)
251         self._stream.write("]\n")
252
253     def addSuccess(self, test):
254         """Report a success in a test."""
255         self._stream.write("successful: %s\n" % test.id())
256
257     def startTest(self, test):
258         """Mark a test as starting its test run."""
259         self._stream.write("test: %s\n" % test.id())
260
261
262 def RemoteError(description=""):
263     if description == "":
264         description = "\n"
265     return (RemoteException, RemoteException(description), None)
266
267
268 class RemotedTestCase(unittest.TestCase):
269     """A class to represent test cases run in child processes.
270     
271     Instances of this class are used to provide the Python test API a TestCase
272     that can be printed to the screen, introspected for metadata and so on.
273     However, as they are a simply a memoisation of a test that was actually
274     run in the past by a separate process, they cannot perform any interactive
275     actions.
276     """
277
278     def __eq__ (self, other):
279         try:
280             return self.__description == other.__description
281         except AttributeError:
282             return False
283
284     def __init__(self, description):
285         """Create a psuedo test case with description description."""
286         self.__description = description
287
288     def error(self, label):
289         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
290             label)
291
292     def setUp(self):
293         self.error("setUp")
294
295     def tearDown(self):
296         self.error("tearDown")
297
298     def shortDescription(self):
299         return self.__description
300
301     def id(self):
302         return "%s.%s" % (self._strclass(), self.__description)
303
304     def __str__(self):
305         return "%s (%s)" % (self.__description, self._strclass())
306
307     def __repr__(self):
308         return "<%s description='%s'>" % \
309                (self._strclass(), self.__description)
310
311     def run(self, result=None):
312         if result is None: result = self.defaultTestResult()
313         result.startTest(self)
314         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
315         result.stopTest(self)
316
317     def _strclass(self):
318         cls = self.__class__
319         return "%s.%s" % (cls.__module__, cls.__name__)
320
321
322 class ExecTestCase(unittest.TestCase):
323     """A test case which runs external scripts for test fixtures."""
324
325     def __init__(self, methodName='runTest'):
326         """Create an instance of the class that will use the named test
327            method when executed. Raises a ValueError if the instance does
328            not have a method with the specified name.
329         """
330         unittest.TestCase.__init__(self, methodName)
331         testMethod = getattr(self, methodName)
332         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
333                                testMethod.__doc__)
334
335     def countTestCases(self):
336         return 1
337
338     def run(self, result=None):
339         if result is None: result = self.defaultTestResult()
340         self._run(result)
341
342     def debug(self):
343         """Run the test without collecting errors in a TestResult"""
344         self._run(unittest.TestResult())
345
346     def _run(self, result):
347         protocol = TestProtocolServer(result)
348         output = subprocess.Popen([self.script],
349                                   stdout=subprocess.PIPE).communicate()[0]
350         protocol.readFrom(StringIO(output))
351
352
353 class IsolatedTestCase(unittest.TestCase):
354     """A TestCase which runs its tests in a forked process."""
355
356     def run(self, result=None):
357         if result is None: result = self.defaultTestResult()
358         run_isolated(unittest.TestCase, self, result)
359
360
361 class IsolatedTestSuite(unittest.TestSuite):
362     """A TestCase which runs its tests in a forked process."""
363
364     def run(self, result=None):
365         if result is None: result = unittest.TestResult()
366         run_isolated(unittest.TestSuite, self, result)
367
368
369 def run_isolated(klass, self, result):
370     """Run a test suite or case in a subprocess, using the run method on klass.
371     """
372     c2pread, c2pwrite = os.pipe()
373     # fixme - error -> result
374     # now fork
375     pid = os.fork()
376     if pid == 0:
377         # Child
378         # Close parent's pipe ends
379         os.close(c2pread)
380         # Dup fds for child
381         os.dup2(c2pwrite, 1)
382         # Close pipe fds.
383         os.close(c2pwrite)
384
385         # at this point, sys.stdin is redirected, now we want
386         # to filter it to escape ]'s.
387         ### XXX: test and write that bit.
388
389         result = TestProtocolClient(sys.stdout)
390         klass.run(self, result)
391         sys.stdout.flush()
392         sys.stderr.flush()
393         # exit HARD, exit NOW.
394         os._exit(0)
395     else:
396         # Parent
397         # Close child pipe ends
398         os.close(c2pwrite)
399         # hookup a protocol engine
400         protocol = TestProtocolServer(result)
401         protocol.readFrom(os.fdopen(c2pread, 'rU'))
402         os.waitpid(pid, 0)
403         # TODO return code evaluation.
404     return result