selftest: Use standard subunit command for progress reporting.
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import time
24
25 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
26
27 def parse_results(msg_ops, statistics, fh):
28     expected_fail = 0
29     open_tests = []
30
31     while fh:
32         l = fh.readline()
33         if l == "":
34             break
35         if l.startswith("test: "):
36             msg_ops.control_msg(l)
37             name = l.split(":", 1)[1].strip()
38             msg_ops.start_test(name)
39             open_tests.append(name)
40         elif l.startswith("time: "):
41             grp = re.match(
42                 "^time: (\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", l)
43             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
44         elif re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l):
45             msg_ops.control_msg(l)
46             grp = re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l)
47             (result, testname, hasreason) = (grp.group(1), grp.group(2), grp.group(3))
48             if hasreason:
49                 reason = ""
50                 # reason may be specified in next lines
51                 terminated = False
52                 while fh:
53                     l = fh.readline()
54                     if l == "":
55                         break
56                     msg_ops.control_msg(l)
57                     if l == "]\n":
58                         terminated = True
59                         break
60                     else:
61                         reason += l
62                 
63                 if not terminated:
64                     statistics['TESTS_ERROR']+=1
65                     msg_ops.end_test(testname, "error", True, 
66                                        "reason (%s) interrupted" % result)
67                     return 1
68             else:
69                 reason = None
70             if result in ("success", "successful"):
71                 open_tests.pop() #FIXME: Check that popped value == $testname 
72                 statistics['TESTS_EXPECTED_OK']+=1
73                 msg_ops.end_test(testname, "success", False, reason)
74             elif result in ("xfail", "knownfail"):
75                 open_tests.pop() #FIXME: Check that popped value == $testname
76                 statistics['TESTS_EXPECTED_FAIL']+=1
77                 msg_ops.end_test(testname, "xfail", False, reason)
78                 expected_fail+=1
79             elif result in ("failure", "fail"):
80                 open_tests.pop() #FIXME: Check that popped value == $testname
81                 statistics['TESTS_UNEXPECTED_FAIL']+=1
82                 msg_ops.end_test(testname, "failure", True, reason)
83             elif result == "skip":
84                 statistics['TESTS_SKIP']+=1
85                 # Allow tests to be skipped without prior announcement of test
86                 last = open_tests.pop()
87                 if last is not None and last != testname:
88                     open_tests.append(testname)
89                 msg_ops.end_test(testname, "skip", False, reason)
90             elif result == "error":
91                 statistics['TESTS_ERROR']+=1
92                 open_tests.pop() #FIXME: Check that popped value == $testname
93                 msg_ops.end_test(testname, "error", True, reason)
94             elif result == "skip-testsuite":
95                 msg_ops.skip_testsuite(testname)
96             elif result == "testsuite-success":
97                 msg_ops.end_testsuite(testname, "success", reason)
98             elif result == "testsuite-failure":
99                 msg_ops.end_testsuite(testname, "failure", reason)
100             elif result == "testsuite-xfail":
101                 msg_ops.end_testsuite(testname, "xfail", reason)
102             elif result == "testsuite-error":
103                 msg_ops.end_testsuite(testname, "error", reason)
104         elif l.startswith("testsuite: "):
105             msg_ops.start_testsuite(l.split(":", 1)[1].strip())
106         elif l.startswith("progress: "):
107             arg = l.split(":", 1)[1].strip()
108             if arg == "pop":
109                 msg_ops.progress(None, subunit.PROGRESS_POP)
110             elif arg == "push":
111                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
112             elif arg[0] in '+-':
113                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
114             else:
115                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
116         else:
117             msg_ops.output_msg(l)
118
119     while open_tests:
120         msg_ops.end_test(open_tests.pop(), "error", True,
121                    "was started but never finished!")
122         statistics['TESTS_ERROR']+=1
123
124     if statistics['TESTS_ERROR'] > 0:
125         return 1
126     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
127         return 1 
128     return 0
129
130
131 class SubunitOps(object):
132
133     def start_test(self, testname):
134         print "test: %s" % testname
135
136     def end_test(self, name, result, reason=None):
137         if reason:
138             print "%s: %s [" % (result, name)
139             print "%s" % reason
140             print "]"
141         else:
142             print "%s: %s" % (result, name)
143
144     def skip_test(self, name, reason=None):
145         self.end_test(name, "skip", reason)
146
147     def fail_test(self, name, reason=None):
148         self.end_test(name, "fail", reason)
149
150     def success_test(self, name, reason=None):
151         self.end_test(name, "success", reason)
152
153     def xfail_test(self, name, reason=None):
154         self.end_test(name, "xfail", reason)
155
156     def report_time(self, t):
157         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
158         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
159
160     def progress(self, offset, whence):
161         if whence == subunit.PROGRESS_CUR and offset > -1:
162             prefix = "+"
163         elif whence == subunit.PROGRESS_PUSH:
164             prefix = ""
165             offset = "push"
166         elif whence == subunit.PROGRESS_POP:
167             prefix = ""
168             offset = "pop"
169         else:
170             prefix = ""
171         print "progress: %s%s" % (prefix, offset)
172
173     # The following are Samba extensions:
174     def start_testsuite(self, name):
175         print "testsuite: %s" % name
176
177     def skip_testsuite(self, name, reason=None):
178         if reason:
179             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
180         else:
181             print "skip-testsuite: %s" % name
182
183     def end_testsuite(self, name, result, reason=None):
184         if reason:
185             print "testsuite-%s: %s [" % (result, name)
186             print "%s" % reason
187             print "]"
188         else:
189             print "testsuite-%s: %s" % (result, name)
190
191
192 def read_test_regexes(name):
193     f = open(name, 'r')
194     try:
195         for l in f:
196             l = l.strip()
197             if l == "" or l[0] == "#":
198                 continue
199             if "#" in l:
200                 (regex, reason) = l.split("#", 1)
201                 yield (regex.strip(), reason.strip())
202             else:
203                 yield l, None
204     finally:
205         f.close()
206
207
208 def find_in_list(regexes, fullname):
209     for regex, reason in regexes:
210         if re.match(regex, fullname):
211             if reason is None:
212                 return ""
213             return reason
214     return None
215
216
217 class FilterOps(object):
218
219     def control_msg(self, msg):
220         pass # We regenerate control messages, so ignore this
221
222     def report_time(self, time):
223         self._ops.report_time(time)
224
225     def progress(self, delta, whence):
226         self._ops.progress(delta, whence)
227
228     def output_msg(self, msg):
229         if self.output is None:
230             sys.stdout.write(msg)
231         else:
232             self.output+=msg
233
234     def start_test(self, testname):
235         if self.prefix is not None:
236             testname = self.prefix + testname
237
238         if self.strip_ok_output:
239            self.output = ""
240
241         self._ops.start_test(testname)
242
243     def end_test(self, testname, result, unexpected, reason):
244         if self.prefix is not None:
245             testname = self.prefix + testname
246
247         if result in ("fail", "failure") and not unexpected:
248             result = "xfail"
249             self.xfail_added+=1
250             self.total_xfail+=1
251         xfail_reason = find_in_list(self.expected_failures, testname)
252         if xfail_reason is not None and result in ("fail", "failure"):
253             result = "xfail"
254             self.xfail_added+=1
255             self.total_xfail+=1
256             reason += xfail_reason
257
258         if result in ("fail", "failure"):
259             self.fail_added+=1
260             self.total_fail+=1
261
262         if result == "error":
263             self.error_added+=1
264             self.total_error+=1
265
266         if self.strip_ok_output:
267             if result not in ("success", "xfail", "skip"):
268                 print self.output
269         self.output = None
270
271         self._ops.end_test(testname, result, reason)
272
273     def skip_testsuite(self, name, reason=None):
274         self._ops.skip_testsuite(name, reason)
275
276     def start_testsuite(self, name):
277         self._ops.start_testsuite(name)
278
279         self.error_added = 0
280         self.fail_added = 0
281         self.xfail_added = 0
282
283     def end_testsuite(self, name, result, reason=None):
284         xfail = False
285
286         if self.xfail_added > 0:
287             xfail = True
288         if self.fail_added > 0 or self.error_added > 0:
289             xfail = False
290
291         if xfail and result in ("fail", "failure"):
292             result = "xfail"
293
294         if self.fail_added > 0 and result != "failure":
295             result = "failure"
296             if reason is None:
297                 reason = "Subunit/Filter Reason"
298             reason += "\n failures[%d]" % self.fail_added
299
300         if self.error_added > 0 and result != "error":
301             result = "error"
302             if reason is None:
303                 reason = "Subunit/Filter Reason"
304             reason += "\n errors[%d]" % self.error_added
305
306         self._ops.end_testsuite(name, result, reason)
307
308     def __init__(self, prefix, expected_failures, strip_ok_output):
309         self._ops = SubunitOps()
310         self.output = None
311         self.prefix = prefix
312         self.expected_failures = expected_failures
313         self.strip_ok_output = strip_ok_output
314         self.xfail_added = 0
315         self.total_xfail = 0
316         self.total_error = 0
317         self.total_fail = 0