cde57077cf22ae33c50f01025294ec001121c588
[third_party/subunit] / python / subunit / __init__.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  This program is free software; you can redistribute it and/or modify
6 #  it under the terms of the GNU General Public License as published by
7 #  the Free Software Foundation; either version 2 of the License, or
8 #  (at your option) any later version.
9 #
10 #  This program is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #  GNU General Public License for more details.
14 #
15 #  You should have received a copy of the GNU General Public License
16 #  along with this program; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 #
19
20 import os
21 from StringIO import StringIO
22 import subprocess
23 import sys
24 import unittest
25
26 def test_suite():
27     import subunit.tests
28     return subunit.tests.test_suite()
29
30
31 def join_dir(base_path, path):
32     """
33     Returns an absolute path to C{path}, calculated relative to the parent
34     of C{base_path}.
35
36     @param base_path: A path to a file or directory.
37     @param path: An absolute path, or a path relative to the containing
38     directory of C{base_path}.
39
40     @return: An absolute path to C{path}.
41     """
42     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
43
44
45 class TestProtocolServer(object):
46     """A class for receiving results from a TestProtocol client."""
47
48     OUTSIDE_TEST = 0
49     TEST_STARTED = 1
50     READING_FAILURE = 2
51     READING_ERROR = 3
52     READING_SKIP = 4
53     READING_XFAIL = 5
54     READING_SUCCESS = 6
55
56     def __init__(self, client, stream=sys.stdout):
57         """Create a TestProtocol server instance.
58
59         client should be an object that provides
60          - startTest
61          - addSuccess
62          - addFailure
63          - addError
64          - stopTest
65         methods, i.e. a TestResult.
66         """
67         self.state = TestProtocolServer.OUTSIDE_TEST
68         self.client = client
69         self._stream = stream
70
71     def _addError(self, offset, line):
72         if (self.state == TestProtocolServer.TEST_STARTED and
73             self.current_test_description == line[offset:-1]):
74             self.state = TestProtocolServer.OUTSIDE_TEST
75             self.current_test_description = None
76             self.client.addError(self._current_test, RemoteError(""))
77             self.client.stopTest(self._current_test)
78             self._current_test = None
79         elif (self.state == TestProtocolServer.TEST_STARTED and
80             self.current_test_description + " [" == line[offset:-1]):
81             self.state = TestProtocolServer.READING_ERROR
82             self._message = ""
83         else:
84             self.stdOutLineReceived(line)
85
86     def _addExpectedFail(self, offset, line):
87         if (self.state == TestProtocolServer.TEST_STARTED and
88             self.current_test_description == line[offset:-1]):
89             self.state = TestProtocolServer.OUTSIDE_TEST
90             self.current_test_description = None
91             self.client.addSuccess(self._current_test)
92             self.client.stopTest(self._current_test)
93         elif (self.state == TestProtocolServer.TEST_STARTED and
94             self.current_test_description + " [" == line[offset:-1]):
95             self.state = TestProtocolServer.READING_XFAIL
96             self._message = ""
97         else:
98             self.stdOutLineReceived(line)
99
100     def _addFailure(self, offset, line):
101         if (self.state == TestProtocolServer.TEST_STARTED and
102             self.current_test_description == line[offset:-1]):
103             self.state = TestProtocolServer.OUTSIDE_TEST
104             self.current_test_description = None
105             self.client.addFailure(self._current_test, RemoteError())
106             self.client.stopTest(self._current_test)
107         elif (self.state == TestProtocolServer.TEST_STARTED and
108             self.current_test_description + " [" == line[offset:-1]):
109             self.state = TestProtocolServer.READING_FAILURE
110             self._message = ""
111         else:
112             self.stdOutLineReceived(line)
113
114     def _addSkip(self, offset, line):
115         if (self.state == TestProtocolServer.TEST_STARTED and
116             self.current_test_description == line[offset:-1]):
117             self.state = TestProtocolServer.OUTSIDE_TEST
118             self.current_test_description = None
119             self.client.addSuccess(self._current_test)
120             self.client.stopTest(self._current_test)
121         elif (self.state == TestProtocolServer.TEST_STARTED and
122             self.current_test_description + " [" == line[offset:-1]):
123             self.state = TestProtocolServer.READING_SKIP
124             self._message = ""
125         else:
126             self.stdOutLineReceived(line)
127
128     def _addSuccess(self, offset, line):
129         if (self.state == TestProtocolServer.TEST_STARTED and
130             self.current_test_description == line[offset:-1]):
131             self._succeedTest()
132         elif (self.state == TestProtocolServer.TEST_STARTED and
133             self.current_test_description + " [" == line[offset:-1]):
134             self.state = TestProtocolServer.READING_SUCCESS
135             self._message = ""
136         else:
137             self.stdOutLineReceived(line)
138
139     def _appendMessage(self, line):
140         if line[0:2] == " ]":
141             # quoted ] start
142             self._message += line[1:]
143         else:
144             self._message += line
145
146     def endQuote(self, line):
147         if self.state == TestProtocolServer.READING_FAILURE:
148             self.state = TestProtocolServer.OUTSIDE_TEST
149             self.current_test_description = None
150             self.client.addFailure(self._current_test,
151                                    RemoteError(self._message))
152             self.client.stopTest(self._current_test)
153         elif self.state == TestProtocolServer.READING_ERROR:
154             self.state = TestProtocolServer.OUTSIDE_TEST
155             self.current_test_description = None
156             self.client.addError(self._current_test,
157                                  RemoteError(self._message))
158             self.client.stopTest(self._current_test)
159         elif self.state in (
160             TestProtocolServer.READING_SKIP,
161             TestProtocolServer.READING_SUCCESS,
162             TestProtocolServer.READING_XFAIL,
163             ):
164             self._succeedTest()
165         else:
166             self.stdOutLineReceived(line)
167
168     def lineReceived(self, line):
169         """Call the appropriate local method for the received line."""
170         if line == "]\n":
171             self.endQuote(line)
172         elif self.state in (TestProtocolServer.READING_FAILURE,
173             TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP):
174             self._appendMessage(line)
175         else:
176             parts = line.split(None, 1)
177             if len(parts) == 2:
178                 cmd, rest = parts
179                 offset = len(cmd) + 1
180                 cmd = cmd.strip(':')
181                 if cmd in ('test', 'testing'):
182                     self._startTest(offset, line)
183                 elif cmd == 'error':
184                     self._addError(offset, line)
185                 elif cmd == 'failure':
186                     self._addFailure(offset, line)
187                 elif cmd == 'skip':
188                     self._addSkip(offset, line)
189                 elif cmd in ('success', 'successful'):
190                     self._addSuccess(offset, line)
191                 elif cmd == 'xfail':
192                     self._addExpectedFail(offset, line)
193                 else:
194                     self.stdOutLineReceived(line)
195             else:
196                 self.stdOutLineReceived(line)
197
198     def _lostConnectionInTest(self, state_string):
199         error_string = "lost connection during %stest '%s'" % (
200             state_string, self.current_test_description)
201         self.client.addError(self._current_test, RemoteError(error_string))
202         self.client.stopTest(self._current_test)
203
204     def lostConnection(self):
205         """The input connection has finished."""
206         if self.state == TestProtocolServer.OUTSIDE_TEST:
207             return
208         if self.state == TestProtocolServer.TEST_STARTED:
209             self._lostConnectionInTest('')
210         elif self.state == TestProtocolServer.READING_ERROR:
211             self._lostConnectionInTest('error report of ')
212         elif self.state == TestProtocolServer.READING_FAILURE:
213             self._lostConnectionInTest('failure report of ')
214         elif self.state == TestProtocolServer.READING_SUCCESS:
215             self._lostConnectionInTest('success report of ')
216         elif self.state == TestProtocolServer.READING_SKIP:
217             self._lostConnectionInTest('skip report of ')
218         elif self.state == TestProtocolServer.READING_XFAIL:
219             self._lostConnectionInTest('xfail report of ')
220         else:
221             self._lostConnectionInTest('unknown state of ')
222
223     def readFrom(self, pipe):
224         for line in pipe.readlines():
225             self.lineReceived(line)
226         self.lostConnection()
227
228     def _startTest(self, offset, line):
229         """Internal call to change state machine. Override startTest()."""
230         if self.state == TestProtocolServer.OUTSIDE_TEST:
231             self.state = TestProtocolServer.TEST_STARTED
232             self._current_test = RemotedTestCase(line[offset:-1])
233             self.current_test_description = line[offset:-1]
234             self.client.startTest(self._current_test)
235         else:
236             self.stdOutLineReceived(line)
237
238     def stdOutLineReceived(self, line):
239         self._stream.write(line)
240
241     def _succeedTest(self):
242         self.client.addSuccess(self._current_test)
243         self.client.stopTest(self._current_test)
244         self.current_test_description = None
245         self._current_test = None
246         self.state = TestProtocolServer.OUTSIDE_TEST
247
248
249 class RemoteException(Exception):
250     """An exception that occured remotely to python."""
251
252     def __eq__(self, other):
253         try:
254             return self.args == other.args
255         except AttributeError:
256             return False
257
258
259 class TestProtocolClient(unittest.TestResult):
260     """A class that looks like a TestResult and informs a TestProtocolServer."""
261
262     def __init__(self, stream):
263         unittest.TestResult.__init__(self)
264         self._stream = stream
265
266     def addError(self, test, error):
267         """Report an error in test test."""
268         self._stream.write("error: %s [\n" % test.shortDescription())
269         for line in self._exc_info_to_string(error, test).split():
270             self._stream.write("%s\n" % line)
271         self._stream.write("]\n")
272
273     def addFailure(self, test, error):
274         """Report a failure in test test."""
275         self._stream.write("failure: %s [\n" % test.shortDescription())
276         for line in self._exc_info_to_string(error, test).split():
277             self._stream.write("%s\n" % line)
278         self._stream.write("]\n")
279
280     def addSuccess(self, test):
281         """Report a success in a test."""
282         self._stream.write("successful: %s\n" % test.shortDescription())
283
284     def startTest(self, test):
285         """Mark a test as starting its test run."""
286         self._stream.write("test: %s\n" % test.shortDescription())
287
288
289 def RemoteError(description=""):
290     if description == "":
291         description = "\n"
292     return (RemoteException, RemoteException(description), None)
293
294
295 class RemotedTestCase(unittest.TestCase):
296     """A class to represent test cases run in child processes."""
297
298     def __eq__ (self, other):
299         try:
300             return self.__description == other.__description
301         except AttributeError:
302             return False
303
304     def __init__(self, description):
305         """Create a psuedo test case with description description."""
306         self.__description = description
307
308     def error(self, label):
309         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
310             label)
311
312     def setUp(self):
313         self.error("setUp")
314
315     def tearDown(self):
316         self.error("tearDown")
317
318     def shortDescription(self):
319         return self.__description
320
321     def id(self):
322         return "%s.%s" % (self._strclass(), self.__description)
323
324     def __str__(self):
325         return "%s (%s)" % (self.__description, self._strclass())
326
327     def __repr__(self):
328         return "<%s description='%s'>" % \
329                (self._strclass(), self.__description)
330
331     def run(self, result=None):
332         if result is None: result = self.defaultTestResult()
333         result.startTest(self)
334         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
335         result.stopTest(self)
336
337     def _strclass(self):
338         cls = self.__class__
339         return "%s.%s" % (cls.__module__, cls.__name__)
340
341
342 class ExecTestCase(unittest.TestCase):
343     """A test case which runs external scripts for test fixtures."""
344
345     def __init__(self, methodName='runTest'):
346         """Create an instance of the class that will use the named test
347            method when executed. Raises a ValueError if the instance does
348            not have a method with the specified name.
349         """
350         unittest.TestCase.__init__(self, methodName)
351         testMethod = getattr(self, methodName)
352         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
353                                testMethod.__doc__)
354
355     def countTestCases(self):
356         return 1
357
358     def run(self, result=None):
359         if result is None: result = self.defaultTestResult()
360         self._run(result)
361
362     def debug(self):
363         """Run the test without collecting errors in a TestResult"""
364         self._run(unittest.TestResult())
365
366     def _run(self, result):
367         protocol = TestProtocolServer(result)
368         output = subprocess.Popen([self.script],
369                                   stdout=subprocess.PIPE).communicate()[0]
370         protocol.readFrom(StringIO(output))
371
372
373 class IsolatedTestCase(unittest.TestCase):
374     """A TestCase which runs its tests in a forked process."""
375
376     def run(self, result=None):
377         if result is None: result = self.defaultTestResult()
378         run_isolated(unittest.TestCase, self, result)
379
380
381 class IsolatedTestSuite(unittest.TestSuite):
382     """A TestCase which runs its tests in a forked process."""
383
384     def run(self, result=None):
385         if result is None: result = unittest.TestResult()
386         run_isolated(unittest.TestSuite, self, result)
387
388
389 def run_isolated(klass, self, result):
390     """Run a test suite or case in a subprocess, using the run method on klass.
391     """
392     c2pread, c2pwrite = os.pipe()
393     # fixme - error -> result
394     # now fork
395     pid = os.fork()
396     if pid == 0:
397         # Child
398         # Close parent's pipe ends
399         os.close(c2pread)
400         # Dup fds for child
401         os.dup2(c2pwrite, 1)
402         # Close pipe fds.
403         os.close(c2pwrite)
404
405         # at this point, sys.stdin is redirected, now we want
406         # to filter it to escape ]'s.
407         ### XXX: test and write that bit.
408
409         result = TestProtocolClient(sys.stdout)
410         klass.run(self, result)
411         sys.stdout.flush()
412         sys.stderr.flush()
413         # exit HARD, exit NOW.
414         os._exit(0)
415     else:
416         # Parent
417         # Close child pipe ends
418         os.close(c2pwrite)
419         # hookup a protocol engine
420         protocol = TestProtocolServer(result)
421         protocol.readFrom(os.fdopen(c2pread, 'rU'))
422         os.waitpid(pid, 0)
423         # TODO return code evaluation.
424     return result