545178ea9966d0769cb962ff18e1f2f520688d03
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import time
24
25 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
26
27 def parse_results(msg_ops, statistics, fh):
28     expected_fail = 0
29     open_tests = []
30
31     while fh:
32         l = fh.readline()
33         if l == "":
34             break
35         if l.startswith("test: "):
36             msg_ops.control_msg(l)
37             name = l.split(":", 1)[1].strip()
38             msg_ops.start_test(name)
39             open_tests.append(name)
40         elif l.startswith("time: "):
41             grp = re.match(
42                 "^time: (\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", l)
43             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
44         elif re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l):
45             msg_ops.control_msg(l)
46             grp = re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l)
47             (result, testname, hasreason) = (grp.group(1), grp.group(2), grp.group(3))
48             if hasreason:
49                 reason = ""
50                 # reason may be specified in next lines
51                 terminated = False
52                 while fh:
53                     l = fh.readline()
54                     if l == "":
55                         break
56                     msg_ops.control_msg(l)
57                     if l == "]\n":
58                         terminated = True
59                         break
60                     else:
61                         reason += l
62                 
63                 if not terminated:
64                     statistics['TESTS_ERROR']+=1
65                     msg_ops.end_test(testname, "error", True, 
66                                        "reason (%s) interrupted" % result)
67                     return 1
68             else:
69                 reason = None
70             if result in ("success", "successful"):
71                 try:
72                     open_tests.remove(testname)
73                 except KeyError:
74                     statistics['TESTS_ERROR']+=1
75                     msg_ops.end_test(testname, "error", True, 
76                             "Test was never started")
77                 else:
78                     statistics['TESTS_EXPECTED_OK']+=1
79                     msg_ops.end_test(testname, "success", False, reason)
80             elif result in ("xfail", "knownfail"):
81                 try:
82                     open_tests.remove(testname)
83                 except KeyError:
84                     statistics['TESTS_ERROR']+=1
85                     msg_ops.end_test(testname, "error", True, 
86                             "Test was never started")
87                 else:
88                     statistics['TESTS_EXPECTED_FAIL']+=1
89                     msg_ops.end_test(testname, "xfail", False, reason)
90                     expected_fail+=1
91             elif result in ("failure", "fail"):
92                 try:
93                     open_tests.remove(testname)
94                 except KeyError:
95                     statistics['TESTS_ERROR']+=1
96                     msg_ops.end_test(testname, "error", True, 
97                             "Test was never started")
98                 else:
99                     statistics['TESTS_UNEXPECTED_FAIL']+=1
100                     msg_ops.end_test(testname, "failure", True, reason)
101             elif result == "skip":
102                 statistics['TESTS_SKIP']+=1
103                 # Allow tests to be skipped without prior announcement of test
104                 last = open_tests.pop()
105                 if last is not None and last != testname:
106                     open_tests.append(testname)
107                 msg_ops.end_test(testname, "skip", False, reason)
108             elif result == "error":
109                 statistics['TESTS_ERROR']+=1
110                 try:
111                     open_tests.remove(testname)
112                 except KeyError:
113                     pass
114                 msg_ops.end_test(testname, "error", True, reason)
115             elif result == "skip-testsuite":
116                 msg_ops.skip_testsuite(testname)
117             elif result == "testsuite-success":
118                 msg_ops.end_testsuite(testname, "success", reason)
119             elif result == "testsuite-failure":
120                 msg_ops.end_testsuite(testname, "failure", reason)
121             elif result == "testsuite-xfail":
122                 msg_ops.end_testsuite(testname, "xfail", reason)
123             elif result == "testsuite-error":
124                 msg_ops.end_testsuite(testname, "error", reason)
125         elif l.startswith("testsuite: "):
126             msg_ops.start_testsuite(l.split(":", 1)[1].strip())
127         elif l.startswith("progress: "):
128             arg = l.split(":", 1)[1].strip()
129             if arg == "pop":
130                 msg_ops.progress(None, subunit.PROGRESS_POP)
131             elif arg == "push":
132                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
133             elif arg[0] in '+-':
134                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
135             else:
136                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
137         else:
138             msg_ops.output_msg(l)
139
140     while open_tests:
141         msg_ops.end_test(open_tests.pop(), "error", True,
142                    "was started but never finished!")
143         statistics['TESTS_ERROR']+=1
144
145     if statistics['TESTS_ERROR'] > 0:
146         return 1
147     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
148         return 1 
149     return 0
150
151
152 class SubunitOps(object):
153
154     def start_test(self, testname):
155         print "test: %s" % testname
156
157     def end_test(self, name, result, reason=None):
158         if reason:
159             print "%s: %s [" % (result, name)
160             print "%s" % reason
161             print "]"
162         else:
163             print "%s: %s" % (result, name)
164
165     def skip_test(self, name, reason=None):
166         self.end_test(name, "skip", reason)
167
168     def fail_test(self, name, reason=None):
169         self.end_test(name, "fail", reason)
170
171     def success_test(self, name, reason=None):
172         self.end_test(name, "success", reason)
173
174     def xfail_test(self, name, reason=None):
175         self.end_test(name, "xfail", reason)
176
177     def report_time(self, t):
178         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
179         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
180
181     def progress(self, offset, whence):
182         if whence == subunit.PROGRESS_CUR and offset > -1:
183             prefix = "+"
184         elif whence == subunit.PROGRESS_PUSH:
185             prefix = ""
186             offset = "push"
187         elif whence == subunit.PROGRESS_POP:
188             prefix = ""
189             offset = "pop"
190         else:
191             prefix = ""
192         print "progress: %s%s" % (prefix, offset)
193
194     # The following are Samba extensions:
195     def start_testsuite(self, name):
196         print "testsuite: %s" % name
197
198     def skip_testsuite(self, name, reason=None):
199         if reason:
200             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
201         else:
202             print "skip-testsuite: %s" % name
203
204     def end_testsuite(self, name, result, reason=None):
205         if reason:
206             print "testsuite-%s: %s [" % (result, name)
207             print "%s" % reason
208             print "]"
209         else:
210             print "testsuite-%s: %s" % (result, name)
211
212
213 def read_test_regexes(name):
214     ret = {}
215     f = open(name, 'r')
216     try:
217         for l in f:
218             l = l.strip()
219             if l == "" or l[0] == "#":
220                 continue
221             if "#" in l:
222                 (regex, reason) = l.split("#", 1)
223                 ret[regex.strip()] = reason.strip()
224             else:
225                 ret[l] = None
226     finally:
227         f.close()
228     return ret
229
230
231 def find_in_list(regexes, fullname):
232     for regex, reason in regexes.iteritems():
233         if re.match(regex, fullname):
234             if reason is None:
235                 return ""
236             return reason
237     return None
238
239
240 class FilterOps(object):
241
242     def control_msg(self, msg):
243         pass # We regenerate control messages, so ignore this
244
245     def report_time(self, time):
246         self._ops.report_time(time)
247
248     def progress(self, delta, whence):
249         self._ops.progress(delta, whence)
250
251     def output_msg(self, msg):
252         if self.output is None:
253             sys.stdout.write(msg)
254         else:
255             self.output+=msg
256
257     def start_test(self, testname):
258         if self.prefix is not None:
259             testname = self.prefix + testname
260
261         if self.strip_ok_output:
262            self.output = ""
263
264         self._ops.start_test(testname)
265
266     def end_test(self, testname, result, unexpected, reason):
267         if self.prefix is not None:
268             testname = self.prefix + testname
269
270         if result in ("fail", "failure") and not unexpected:
271             result = "xfail"
272             self.xfail_added+=1
273             self.total_xfail+=1
274         xfail_reason = find_in_list(self.expected_failures, testname)
275         if xfail_reason is not None and result in ("fail", "failure"):
276             result = "xfail"
277             self.xfail_added+=1
278             self.total_xfail+=1
279             reason += xfail_reason
280
281         if result in ("fail", "failure"):
282             self.fail_added+=1
283             self.total_fail+=1
284
285         if result == "error":
286             self.error_added+=1
287             self.total_error+=1
288
289         if self.strip_ok_output:
290             if result not in ("success", "xfail", "skip"):
291                 print self.output
292         self.output = None
293
294         self._ops.end_test(testname, result, reason)
295
296     def skip_testsuite(self, name, reason=None):
297         self._ops.skip_testsuite(name, reason)
298
299     def start_testsuite(self, name):
300         self._ops.start_testsuite(name)
301
302         self.error_added = 0
303         self.fail_added = 0
304         self.xfail_added = 0
305
306     def end_testsuite(self, name, result, reason=None):
307         xfail = False
308
309         if self.xfail_added > 0:
310             xfail = True
311         if self.fail_added > 0 or self.error_added > 0:
312             xfail = False
313
314         if xfail and result in ("fail", "failure"):
315             result = "xfail"
316
317         if self.fail_added > 0 and result != "failure":
318             result = "failure"
319             if reason is None:
320                 reason = "Subunit/Filter Reason"
321             reason += "\n failures[%d]" % self.fail_added
322
323         if self.error_added > 0 and result != "error":
324             result = "error"
325             if reason is None:
326                 reason = "Subunit/Filter Reason"
327             reason += "\n errors[%d]" % self.error_added
328
329         self._ops.end_testsuite(name, result, reason)
330
331     def __init__(self, prefix, expected_failures, strip_ok_output):
332         self._ops = SubunitOps()
333         self.output = None
334         self.prefix = prefix
335         self.expected_failures = expected_failures
336         self.strip_ok_output = strip_ok_output
337         self.xfail_added = 0
338         self.total_xfail = 0
339         self.total_error = 0
340         self.total_fail = 0