2622d19770388a38f118fe85b997833f65607a12
[third_party/subunit] / lib / subunit / __init__.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  This program is free software; you can redistribute it and/or modify
6 #  it under the terms of the GNU General Public License as published by
7 #  the Free Software Foundation; either version 2 of the License, or
8 #  (at your option) any later version.
9 #
10 #  This program is distributed in the hope that it will be useful,
11 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #  GNU General Public License for more details.
14 #
15 #  You should have received a copy of the GNU General Public License
16 #  along with this program; if not, write to the Free Software
17 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 #
19
20 import sys
21 import unittest
22
23 def test_suite():
24     import subunit.tests
25     return subunit.tests.test_suite()
26
27 class TestProtocolServer(object):
28     """A class for recieving results from a TestProtocol client."""
29
30     OUTSIDE_TEST = 0
31     TEST_STARTED = 1
32     READING_FAILURE = 2
33     READING_ERROR = 3
34
35     def __init__(self):
36         self.state = TestProtocolServer.OUTSIDE_TEST
37         
38     def _addError(self, offset, line):
39         if (self.state == TestProtocolServer.TEST_STARTED and
40             self.current_test_description == line[offset:-1]):
41             self.state = TestProtocolServer.OUTSIDE_TEST
42             self.current_test_description = None
43             self.addError("")
44         elif (self.state == TestProtocolServer.TEST_STARTED and
45             self.current_test_description + " [" == line[offset:-1]):
46             self.state = TestProtocolServer.READING_ERROR
47             self._message = ""
48         else:
49             self.stdOutLineRecieved(line)
50
51     def _addFailure(self, offset, line):
52         if (self.state == TestProtocolServer.TEST_STARTED and
53             self.current_test_description == line[offset:-1]):
54             self.state = TestProtocolServer.OUTSIDE_TEST
55             self.current_test_description = None
56             self.addFailure("")
57         elif (self.state == TestProtocolServer.TEST_STARTED and
58             self.current_test_description + " [" == line[offset:-1]):
59             self.state = TestProtocolServer.READING_FAILURE
60             self._message = ""
61         else:
62             self.stdOutLineRecieved(line)
63
64     def _addSuccess(self, offset, line):
65         if (self.state == TestProtocolServer.TEST_STARTED and
66             self.current_test_description == line[offset:-1]):
67             self.addSuccess(self._current_test)
68             self.current_test_description = None
69             self._current_test = None
70             self.state = TestProtocolServer.OUTSIDE_TEST
71         else:
72             self.stdOutLineRecieved(line)
73         
74     def _appendMessage(self, line):
75         if line[0:2] == " ]":
76             # quoted ] start
77             self._message += line[1:]
78         else:
79             self._message += line
80         
81     def endQuote(self, line):
82         if self.state == TestProtocolServer.READING_FAILURE:
83             self.state = TestProtocolServer.OUTSIDE_TEST
84             self.current_test_description = None
85             self.addFailure(self._message)
86         elif self.state == TestProtocolServer.READING_ERROR:
87             self.state = TestProtocolServer.OUTSIDE_TEST
88             self.current_test_description = None
89             self.addError(self._message)
90         else:
91             self.stdOutLineRecieved(line)
92         
93     def lineReceived(self, line):
94         """Call the appropriate local method for the recieved line."""
95         if line == "]\n":
96             self.endQuote(line)
97         elif (self.state == TestProtocolServer.READING_FAILURE or
98               self.state == TestProtocolServer.READING_ERROR):
99             self._appendMessage(line)
100         elif line.startswith("test:"):
101             self._startTest(6, line)
102         elif line.startswith("testing:"):
103             self._startTest(9, line)
104         elif line.startswith("testing"):
105             self._startTest(8, line)
106         elif line.startswith("test"):
107             self._startTest(5, line)
108         elif line.startswith("error:"):
109             self._addError(7, line)
110         elif line.startswith("error"):
111             self._addError(6, line)
112         elif line.startswith("failure:"):
113             self._addFailure(9, line)
114         elif line.startswith("failure"):
115             self._addFailure(8, line)
116         elif line.startswith("successful:"):
117             self._addSuccess(12, line)
118         elif line.startswith("successful"):
119             self._addSuccess(11, line)
120         elif line.startswith("success:"):
121             self._addSuccess(9, line)
122         elif line.startswith("success"):
123             self._addSuccess(8, line)
124         else:
125             self.stdOutLineRecieved(line)
126
127     def lostConnection(self):
128         """The input connection has finished."""
129         if self.state == TestProtocolServer.TEST_STARTED:
130             self.addError("lost connection during test '%s'" 
131                           % self.current_test_description)
132         elif self.state == TestProtocolServer.READING_ERROR:
133             self.addError("lost connection during "
134                           "error report of test "
135                           "'%s'" % self.current_test_description)
136         elif self.state == TestProtocolServer.READING_FAILURE:
137             self.addError("lost connection during "
138                           "failure report of test "
139                           "'%s'" % self.current_test_description)
140
141     def _startTest(self, offset, line):
142         """Internal call to change state machine. Override startTest()."""
143         if self.state == TestProtocolServer.OUTSIDE_TEST:
144             self.state = TestProtocolServer.TEST_STARTED
145             self._current_test = RemotedTestCase(line[offset:-1])
146             self.current_test_description = line[offset:-1]
147             self.startTest(self._current_test)
148         else:
149             self.stdOutLineRecieved(line)
150         
151     def stdOutLineRecieved(self, line):
152         sys.stdout.write(line)
153
154
155 class RemoteError(Exception):
156     """An exception that occured remotely to python."""
157     
158
159 class RemotedTestCase(unittest.TestCase):
160     """A class to represent test cases run in child processes."""
161
162     def __eq__ (self, other):
163         try:
164             return self.__description == other.__description
165         except AttributeError:
166             return False
167         
168     def __init__(self, description):
169         """Create a psuedo test case with description description."""
170         self.__description = description
171
172     def error(self, label):
173         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
174             label)
175
176     def setUp(self):
177         self.error("setUp")
178
179     def tearDown(self):
180         self.error("tearDown")
181
182     def shortDescription(self):
183         return self.__description
184
185     def id(self):
186         return "%s.%s" % (self._strclass(), self.__description)
187
188     def __str__(self):
189         return "%s (%s)" % (self.__description, self._strclass())
190
191     def __repr__(self):
192         return "<%s description='%s'>" % \
193                (self._strclass(), self.__description)
194
195     def run(self, result=None):
196         if result is None: result = self.defaultTestResult()
197         result.startTest(self)
198         result.addError(self, (RemoteError("Cannot run RemotedTestCases.\n"), None, None))
199         result.stopTest(self)
200         
201     def _strclass(self):
202         cls = self.__class__
203         return "%s.%s" % (cls.__module__, cls.__name__)