selftest: Convert filter-subunit to Python so the subunit Python module
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import time
23
24 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
25
26 def parse_results(msg_ops, statistics, fh):
27     expected_fail = 0
28     open_tests = []
29
30     while fh:
31         l = fh.readline()
32         if l == "":
33             break
34         if l.startswith("test: "):
35             msg_ops.control_msg(l)
36             name = l.split(":", 1)[1].strip()
37             msg_ops.start_test(name)
38             open_tests.append(name)
39         elif l.startswith("time: "):
40             grp = re.match(
41                 "^time: (\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", l)
42             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
43         elif re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l):
44             msg_ops.control_msg(l)
45             grp = re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l)
46             (result, testname, hasreason) = (grp.group(1), grp.group(2), grp.group(3))
47             if hasreason:
48                 reason = ""
49                 # reason may be specified in next lines
50                 terminated = False
51                 while fh:
52                     l = fh.readline()
53                     if l == "":
54                         break
55                     msg_ops.control_msg(l)
56                     if l == "]\n":
57                         terminated = True
58                         break
59                     else:
60                         reason += l
61                 
62                 if not terminated:
63                     statistics['TESTS_ERROR']+=1
64                     msg_ops.end_test(testname, "error", True, 
65                                        "reason (%s) interrupted" % result)
66                     return 1
67             else:
68                 reason = None
69             if result in ("success", "successful"):
70                 open_tests.pop() #FIXME: Check that popped value == $testname 
71                 statistics['TESTS_EXPECTED_OK']+=1
72                 msg_ops.end_test(testname, "success", False, reason)
73             elif result in ("xfail", "knownfail"):
74                 open_tests.pop() #FIXME: Check that popped value == $testname
75                 statistics['TESTS_EXPECTED_FAIL']+=1
76                 msg_ops.end_test(testname, "xfail", False, reason)
77                 expected_fail+=1
78             elif result in ("failure", "fail"):
79                 open_tests.pop() #FIXME: Check that popped value == $testname
80                 statistics['TESTS_UNEXPECTED_FAIL']+=1
81                 msg_ops.end_test(testname, "failure", True, reason)
82             elif result == "skip":
83                 statistics['TESTS_SKIP']+=1
84                 # Allow tests to be skipped without prior announcement of test
85                 last = open_tests.pop()
86                 if last is not None and last != testname:
87                     open_tests.append(testname)
88                 msg_ops.end_test(testname, "skip", False, reason)
89             elif result == "error":
90                 statistics['TESTS_ERROR']+=1
91                 open_tests.pop() #FIXME: Check that popped value == $testname
92                 msg_ops.end_test(testname, "error", True, reason)
93             elif result == "skip-testsuite":
94                 msg_ops.skip_testsuite(testname)
95             elif result == "testsuite-success":
96                 msg_ops.end_testsuite(testname, "success", reason)
97             elif result == "testsuite-failure":
98                 msg_ops.end_testsuite(testname, "failure", reason)
99             elif result == "testsuite-xfail":
100                 msg_ops.end_testsuite(testname, "xfail", reason)
101             elif result == "testsuite-error":
102                 msg_ops.end_testsuite(testname, "error", reason)
103         elif l.startswith("testsuite: "):
104             msg_ops.start_testsuite(l.split(":", 1)[1].strip())
105         elif l.startswith("testsuite-count: "):
106             msg_ops.testsuite_count(int(l.split(":", 1)[1].strip()))
107         else:
108             msg_ops.output_msg(l)
109
110     while open_tests:
111         msg_ops.end_test(open_tests.pop(), "error", True,
112                    "was started but never finished!")
113         statistics['TESTS_ERROR']+=1
114
115     if statistics['TESTS_ERROR'] > 0:
116         return 1
117     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
118         return 1 
119     return 0
120
121
122 class SubunitOps(object):
123
124     def start_test(self, testname):
125         print "test: %s" % testname
126
127     def end_test(self, name, result, reason=None):
128         if reason:
129             print "%s: %s [" % (result, name)
130             print "%s" % reason
131             print "]"
132         else:
133             print "%s: %s" % (result, name)
134
135     def skip_test(self, name, reason=None):
136         self.end_test(name, "skip", reason)
137
138     def fail_test(self, name, reason=None):
139         self.end_test(name, "fail", reason)
140
141     def success_test(self, name, reason=None):
142         self.end_test(name, "success", reason)
143
144     def xfail_test(self, name, reason=None):
145         self.end_test(name, "xfail", reason)
146
147     def report_time(self, t):
148         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
149         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
150
151     # The following are Samba extensions:
152     def start_testsuite(self, name):
153         print "testsuite: %s" % name
154
155     def skip_testsuite(self, name, reason=None):
156         if reason:
157             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
158         else:
159             print "skip-testsuite: %s" % name
160
161     def end_testsuite(self, name, result, reason=None):
162         if reason:
163             print "testsuite-%s: %s [" % (result, name)
164             print "%s" % reason
165             print "]"
166         else:
167             print "testsuite-%s: %s" % (result, name)
168
169     def testsuite_count(self, count):
170         print "testsuite-count: %d" % count
171
172
173 def read_test_regexes(name):
174     f = open(name, 'r')
175     try:
176         for l in f:
177             l = l.strip()
178             if l == "" or l[0] == "#":
179                 continue
180             if "#" in l:
181                 (regex, reason) = l.split("#", 1)
182                 yield (regex.strip(), reason.strip())
183             else:
184                 yield l, None
185     finally:
186         f.close()
187
188
189 def find_in_list(regexes, fullname):
190     for regex, reason in regexes:
191         if re.match(regex, fullname):
192             if reason is None:
193                 return ""
194             return reason
195     return None
196
197
198 class FilterOps(object):
199
200     def control_msg(self, msg):
201         pass # We regenerate control messages, so ignore this
202
203     def report_time(self, time):
204         self._ops.report_time(time)
205
206     def output_msg(self, msg):
207         if self.output is None:
208             sys.stdout.write(msg)
209         else:
210             self.output+=msg
211
212     def start_test(self, testname):
213         if self.prefix is not None:
214             testname = self.prefix + testname
215
216         if self.strip_ok_output:
217            self.output = ""
218
219         self._ops.start_test(testname)
220
221     def end_test(self, testname, result, unexpected, reason):
222         if self.prefix is not None:
223             testname = self.prefix + testname
224
225         if result in ("fail", "failure") and not unexpected:
226             result = "xfail"
227             self.xfail_added+=1
228             self.total_xfail+=1
229         xfail_reason = find_in_list(self.expected_failures, testname)
230         if xfail_reason is not None and result in ("fail", "failure"):
231             result = "xfail"
232             self.xfail_added+=1
233             self.total_xfail+=1
234             reason += xfail_reason
235
236         if result in ("fail", "failure"):
237             self.fail_added+=1
238             self.total_fail+=1
239
240         if result == "error":
241             self.error_added+=1
242             self.total_error+=1
243
244         if self.strip_ok_output:
245             if result not in ("success", "xfail", "skip"):
246                 print self.output
247         self.output = None
248
249         self._ops.end_test(testname, result, reason)
250
251     def skip_testsuite(self, name, reason=None):
252         self._ops.skip_testsuite(name, reason)
253
254     def start_testsuite(self, name):
255         self._ops.start_testsuite(name)
256
257         self.error_added = 0
258         self.fail_added = 0
259         self.xfail_added = 0
260
261     def end_testsuite(self, name, result, reason=None):
262         xfail = False
263
264         if self.xfail_added > 0:
265             xfail = True
266         if self.fail_added > 0 or self.error_added > 0:
267             xfail = False
268
269         if xfail and result in ("fail", "failure"):
270             result = "xfail"
271
272         if self.fail_added > 0 and result != "failure":
273             result = "failure"
274             if reason is None:
275                 reason = "Subunit/Filter Reason"
276             reason += "\n failures[%d]" % self.fail_added
277
278         if self.error_added > 0 and result != "error":
279             result = "error"
280             if reason is None:
281                 reason = "Subunit/Filter Reason"
282             reason += "\n errors[%d]" % self.error_added
283
284         self._ops.end_testsuite(name, result, reason)
285
286     def testsuite_count(self, count):
287         self._ops.testsuite_count(count)
288
289     def __init__(self, prefix, expected_failures, strip_ok_output):
290         self._ops = SubunitOps()
291         self.output = None
292         self.prefix = prefix
293         self.expected_failures = expected_failures
294         self.strip_ok_output = strip_ok_output
295         self.xfail_added = 0
296         self.total_xfail = 0
297         self.total_error = 0
298         self.total_fail = 0