selftest: Remove python2.4-isms
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import time
24
25 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
26
27 def parse_results(msg_ops, statistics, fh):
28     expected_fail = 0
29     open_tests = []
30
31     while fh:
32         l = fh.readline()
33         if l == "":
34             break
35         if l.startswith("test: "):
36             msg_ops.control_msg(l)
37             name = l.split(":", 1)[1].strip()
38             msg_ops.start_test(name)
39             open_tests.append(name)
40         elif l.startswith("time: "):
41             grp = re.match(
42                 "^time: (\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", l)
43             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
44         elif re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l):
45             msg_ops.control_msg(l)
46             grp = re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l)
47             (result, testname, hasreason) = (grp.group(1), grp.group(2), grp.group(3))
48             if hasreason:
49                 reason = ""
50                 # reason may be specified in next lines
51                 terminated = False
52                 while fh:
53                     l = fh.readline()
54                     if l == "":
55                         break
56                     msg_ops.control_msg(l)
57                     if l == "]\n":
58                         terminated = True
59                         break
60                     else:
61                         reason += l
62                 
63                 if not terminated:
64                     statistics['TESTS_ERROR']+=1
65                     msg_ops.end_test(testname, "error", True, 
66                                        "reason (%s) interrupted" % result)
67                     return 1
68             else:
69                 reason = None
70             if result in ("success", "successful"):
71                 open_tests.pop() #FIXME: Check that popped value == $testname 
72                 statistics['TESTS_EXPECTED_OK']+=1
73                 msg_ops.end_test(testname, "success", False, reason)
74             elif result in ("xfail", "knownfail"):
75                 open_tests.pop() #FIXME: Check that popped value == $testname
76                 statistics['TESTS_EXPECTED_FAIL']+=1
77                 msg_ops.end_test(testname, "xfail", False, reason)
78                 expected_fail+=1
79             elif result in ("failure", "fail"):
80                 open_tests.pop() #FIXME: Check that popped value == $testname
81                 statistics['TESTS_UNEXPECTED_FAIL']+=1
82                 msg_ops.end_test(testname, "failure", True, reason)
83             elif result == "skip":
84                 statistics['TESTS_SKIP']+=1
85                 # Allow tests to be skipped without prior announcement of test
86                 last = open_tests.pop()
87                 if last is not None and last != testname:
88                     open_tests.append(testname)
89                 msg_ops.end_test(testname, "skip", False, reason)
90             elif result == "error":
91                 statistics['TESTS_ERROR']+=1
92                 open_tests.pop() #FIXME: Check that popped value == $testname
93                 msg_ops.end_test(testname, "error", True, reason)
94             elif result == "skip-testsuite":
95                 msg_ops.skip_testsuite(testname)
96             elif result == "testsuite-success":
97                 msg_ops.end_testsuite(testname, "success", reason)
98             elif result == "testsuite-failure":
99                 msg_ops.end_testsuite(testname, "failure", reason)
100             elif result == "testsuite-xfail":
101                 msg_ops.end_testsuite(testname, "xfail", reason)
102             elif result == "testsuite-error":
103                 msg_ops.end_testsuite(testname, "error", reason)
104         elif l.startswith("testsuite: "):
105             msg_ops.start_testsuite(l.split(":", 1)[1].strip())
106         elif l.startswith("progress: "):
107             arg = l.split(":", 1)[1].strip()
108             if arg == "pop":
109                 msg_ops.progress(None, subunit.PROGRESS_POP)
110             elif arg == "push":
111                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
112             elif arg[0] in '+-':
113                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
114             else:
115                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
116         else:
117             msg_ops.output_msg(l)
118
119     while open_tests:
120         msg_ops.end_test(open_tests.pop(), "error", True,
121                    "was started but never finished!")
122         statistics['TESTS_ERROR']+=1
123
124     if statistics['TESTS_ERROR'] > 0:
125         return 1
126     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
127         return 1 
128     return 0
129
130
131 class SubunitOps(object):
132
133     def start_test(self, testname):
134         print "test: %s" % testname
135
136     def end_test(self, name, result, reason=None):
137         if reason:
138             print "%s: %s [" % (result, name)
139             print "%s" % reason
140             print "]"
141         else:
142             print "%s: %s" % (result, name)
143
144     def skip_test(self, name, reason=None):
145         self.end_test(name, "skip", reason)
146
147     def fail_test(self, name, reason=None):
148         self.end_test(name, "fail", reason)
149
150     def success_test(self, name, reason=None):
151         self.end_test(name, "success", reason)
152
153     def xfail_test(self, name, reason=None):
154         self.end_test(name, "xfail", reason)
155
156     def report_time(self, t):
157         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
158         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
159
160     def progress(self, offset, whence):
161         if whence == subunit.PROGRESS_CUR and offset > -1:
162             prefix = "+"
163         elif whence == subunit.PROGRESS_PUSH:
164             prefix = ""
165             offset = "push"
166         elif whence == subunit.PROGRESS_POP:
167             prefix = ""
168             offset = "pop"
169         else:
170             prefix = ""
171         print "progress: %s%s" % (prefix, offset)
172
173     # The following are Samba extensions:
174     def start_testsuite(self, name):
175         print "testsuite: %s" % name
176
177     def skip_testsuite(self, name, reason=None):
178         if reason:
179             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
180         else:
181             print "skip-testsuite: %s" % name
182
183     def end_testsuite(self, name, result, reason=None):
184         if reason:
185             print "testsuite-%s: %s [" % (result, name)
186             print "%s" % reason
187             print "]"
188         else:
189             print "testsuite-%s: %s" % (result, name)
190
191
192 def read_test_regexes(name):
193     ret = {}
194     f = open(name, 'r')
195     try:
196         for l in f:
197             l = l.strip()
198             if l == "" or l[0] == "#":
199                 continue
200             if "#" in l:
201                 (regex, reason) = l.split("#", 1)
202                 ret[regex.strip()] = reason.strip()
203             else:
204                 ret[l] = None
205     finally:
206         f.close()
207     return ret
208
209
210 def find_in_list(regexes, fullname):
211     for regex, reason in regexes.iteritems():
212         if re.match(regex, fullname):
213             if reason is None:
214                 return ""
215             return reason
216     return None
217
218
219 class FilterOps(object):
220
221     def control_msg(self, msg):
222         pass # We regenerate control messages, so ignore this
223
224     def report_time(self, time):
225         self._ops.report_time(time)
226
227     def progress(self, delta, whence):
228         self._ops.progress(delta, whence)
229
230     def output_msg(self, msg):
231         if self.output is None:
232             sys.stdout.write(msg)
233         else:
234             self.output+=msg
235
236     def start_test(self, testname):
237         if self.prefix is not None:
238             testname = self.prefix + testname
239
240         if self.strip_ok_output:
241            self.output = ""
242
243         self._ops.start_test(testname)
244
245     def end_test(self, testname, result, unexpected, reason):
246         if self.prefix is not None:
247             testname = self.prefix + testname
248
249         if result in ("fail", "failure") and not unexpected:
250             result = "xfail"
251             self.xfail_added+=1
252             self.total_xfail+=1
253         xfail_reason = find_in_list(self.expected_failures, testname)
254         if xfail_reason is not None and result in ("fail", "failure"):
255             result = "xfail"
256             self.xfail_added+=1
257             self.total_xfail+=1
258             reason += xfail_reason
259
260         if result in ("fail", "failure"):
261             self.fail_added+=1
262             self.total_fail+=1
263
264         if result == "error":
265             self.error_added+=1
266             self.total_error+=1
267
268         if self.strip_ok_output:
269             if result not in ("success", "xfail", "skip"):
270                 print self.output
271         self.output = None
272
273         self._ops.end_test(testname, result, reason)
274
275     def skip_testsuite(self, name, reason=None):
276         self._ops.skip_testsuite(name, reason)
277
278     def start_testsuite(self, name):
279         self._ops.start_testsuite(name)
280
281         self.error_added = 0
282         self.fail_added = 0
283         self.xfail_added = 0
284
285     def end_testsuite(self, name, result, reason=None):
286         xfail = False
287
288         if self.xfail_added > 0:
289             xfail = True
290         if self.fail_added > 0 or self.error_added > 0:
291             xfail = False
292
293         if xfail and result in ("fail", "failure"):
294             result = "xfail"
295
296         if self.fail_added > 0 and result != "failure":
297             result = "failure"
298             if reason is None:
299                 reason = "Subunit/Filter Reason"
300             reason += "\n failures[%d]" % self.fail_added
301
302         if self.error_added > 0 and result != "error":
303             result = "error"
304             if reason is None:
305                 reason = "Subunit/Filter Reason"
306             reason += "\n errors[%d]" % self.error_added
307
308         self._ops.end_testsuite(name, result, reason)
309
310     def __init__(self, prefix, expected_failures, strip_ok_output):
311         self._ops = SubunitOps()
312         self.output = None
313         self.prefix = prefix
314         self.expected_failures = expected_failures
315         self.strip_ok_output = strip_ok_output
316         self.xfail_added = 0
317         self.total_xfail = 0
318         self.total_error = 0
319         self.total_fail = 0