166c59316565d7c9e9f229f714d667d50b373e58
[third_party/subunit] / lib / subunit / __init__.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  This program is free software; you can redistribute it and/or modify
6 #  it under the terms of the GNU General Public License as published by
7 #  the Free Software Foundation; either version 2 of the License, or
8 #  (at your option) any later version.
9 #
10 #  This program is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #  GNU General Public License for more details.
14 #
15 #  You should have received a copy of the GNU General Public License
16 #  along with this program; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 #
19
20 import sys
21 import unittest
22
23 def test_suite():
24     import subunit.tests
25     return subunit.tests.test_suite()
26
27 class TestProtocolServer(object):
28     """A class for recieving results from a TestProtocol client."""
29
30     OUTSIDE_TEST = 0
31     TEST_STARTED = 1
32     READING_FAILURE = 2
33     READING_ERROR = 3
34
35     def __init__(self, client):
36         """Create a TestProtocol server instance. 
37
38         client should be an object that provides 
39          - startTest
40          - addSuccess
41          - addFailure
42          - addError
43          - endTest
44         methods, i.e. a TestResult.
45         """
46         self.state = TestProtocolServer.OUTSIDE_TEST
47         self.client = client
48         
49     def _addError(self, offset, line):
50         if (self.state == TestProtocolServer.TEST_STARTED and
51             self.current_test_description == line[offset:-1]):
52             self.state = TestProtocolServer.OUTSIDE_TEST
53             self.current_test_description = None
54             self.client.addError(self._current_test, RemoteError(""))
55             self.client.endTest(self._current_test)
56             self._current_test = None
57         elif (self.state == TestProtocolServer.TEST_STARTED and
58             self.current_test_description + " [" == line[offset:-1]):
59             self.state = TestProtocolServer.READING_ERROR
60             self._message = ""
61         else:
62             self.stdOutLineRecieved(line)
63
64     def _addFailure(self, offset, line):
65         if (self.state == TestProtocolServer.TEST_STARTED and
66             self.current_test_description == line[offset:-1]):
67             self.state = TestProtocolServer.OUTSIDE_TEST
68             self.current_test_description = None
69             self.client.addFailure(self._current_test, RemoteError())
70             self.client.endTest(self._current_test)
71         elif (self.state == TestProtocolServer.TEST_STARTED and
72             self.current_test_description + " [" == line[offset:-1]):
73             self.state = TestProtocolServer.READING_FAILURE
74             self._message = ""
75         else:
76             self.stdOutLineRecieved(line)
77
78     def _addSuccess(self, offset, line):
79         if (self.state == TestProtocolServer.TEST_STARTED and
80             self.current_test_description == line[offset:-1]):
81             self.client.addSuccess(self._current_test)
82             self.client.endTest(self._current_test)
83             self.current_test_description = None
84             self._current_test = None
85             self.state = TestProtocolServer.OUTSIDE_TEST
86         else:
87             self.stdOutLineRecieved(line)
88         
89     def _appendMessage(self, line):
90         if line[0:2] == " ]":
91             # quoted ] start
92             self._message += line[1:]
93         else:
94             self._message += line
95         
96     def endQuote(self, line):
97         if self.state == TestProtocolServer.READING_FAILURE:
98             self.state = TestProtocolServer.OUTSIDE_TEST
99             self.current_test_description = None
100             self.client.addFailure(self._current_test,
101                                    RemoteError(self._message))
102             self.client.endTest(self._current_test)
103         elif self.state == TestProtocolServer.READING_ERROR:
104             self.state = TestProtocolServer.OUTSIDE_TEST
105             self.current_test_description = None
106             self.client.addError(self._current_test,
107                                  RemoteError(self._message))
108             self.client.endTest(self._current_test)
109         else:
110             self.stdOutLineRecieved(line)
111         
112     def lineReceived(self, line):
113         """Call the appropriate local method for the recieved line."""
114         if line == "]\n":
115             self.endQuote(line)
116         elif (self.state == TestProtocolServer.READING_FAILURE or
117               self.state == TestProtocolServer.READING_ERROR):
118             self._appendMessage(line)
119         elif line.startswith("test:"):
120             self._startTest(6, line)
121         elif line.startswith("testing:"):
122             self._startTest(9, line)
123         elif line.startswith("testing"):
124             self._startTest(8, line)
125         elif line.startswith("test"):
126             self._startTest(5, line)
127         elif line.startswith("error:"):
128             self._addError(7, line)
129         elif line.startswith("error"):
130             self._addError(6, line)
131         elif line.startswith("failure:"):
132             self._addFailure(9, line)
133         elif line.startswith("failure"):
134             self._addFailure(8, line)
135         elif line.startswith("successful:"):
136             self._addSuccess(12, line)
137         elif line.startswith("successful"):
138             self._addSuccess(11, line)
139         elif line.startswith("success:"):
140             self._addSuccess(9, line)
141         elif line.startswith("success"):
142             self._addSuccess(8, line)
143         else:
144             self.stdOutLineRecieved(line)
145
146     def lostConnection(self):
147         """The input connection has finished."""
148         if self.state == TestProtocolServer.TEST_STARTED:
149             self.client.addError(self._current_test,
150                                  RemoteError("lost connection during test '%s'"
151                                              % self.current_test_description))
152             self.client.endTest(self._current_test)
153         elif self.state == TestProtocolServer.READING_ERROR:
154             self.client.addError(self._current_test,
155                                  RemoteError("lost connection during "
156                                              "error report of test "
157                                              "'%s'" %
158                                              self.current_test_description))
159             self.client.endTest(self._current_test)
160         elif self.state == TestProtocolServer.READING_FAILURE:
161             self.client.addError(self._current_test,
162                                  RemoteError("lost connection during "
163                                              "failure report of test "
164                                              "'%s'" % 
165                                              self.current_test_description))
166             self.client.endTest(self._current_test)
167
168     def readFrom(self, pipe):
169         for line in pipe.readlines(pipe):
170             self.lineReceived(line)
171         self.lostConnection()
172
173     def _startTest(self, offset, line):
174         """Internal call to change state machine. Override startTest()."""
175         if self.state == TestProtocolServer.OUTSIDE_TEST:
176             self.state = TestProtocolServer.TEST_STARTED
177             self._current_test = RemotedTestCase(line[offset:-1])
178             self.current_test_description = line[offset:-1]
179             self.client.startTest(self._current_test)
180         else:
181             self.stdOutLineRecieved(line)
182         
183     def stdOutLineRecieved(self, line):
184         sys.stdout.write(line)
185
186
187 class RemoteException(Exception):
188     """An exception that occured remotely to python."""
189
190     def __eq__(self, other):
191         try:
192             return self.args == other.args
193         except AttributeError:
194             return False
195     
196
197 def RemoteError(description=""):
198     if description == "":
199         description = "\n"
200     return (RemoteException("RemoteError:\n%s" % description), None, None)
201
202
203 class RemotedTestCase(unittest.TestCase):
204     """A class to represent test cases run in child processes."""
205
206     def __eq__ (self, other):
207         try:
208             return self.__description == other.__description
209         except AttributeError:
210             return False
211         
212     def __init__(self, description):
213         """Create a psuedo test case with description description."""
214         self.__description = description
215
216     def error(self, label):
217         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
218             label)
219
220     def setUp(self):
221         self.error("setUp")
222
223     def tearDown(self):
224         self.error("tearDown")
225
226     def shortDescription(self):
227         return self.__description
228
229     def id(self):
230         return "%s.%s" % (self._strclass(), self.__description)
231
232     def __str__(self):
233         return "%s (%s)" % (self.__description, self._strclass())
234
235     def __repr__(self):
236         return "<%s description='%s'>" % \
237                (self._strclass(), self.__description)
238
239     def run(self, result=None):
240         if result is None: result = self.defaultTestResult()
241         result.startTest(self)
242         result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
243         result.stopTest(self)
244         
245     def _strclass(self):
246         cls = self.__class__
247         return "%s.%s" % (cls.__module__, cls.__name__)