subunit: Support formatting compatible with upstream subunit, for consistency.
[bbaumbach/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import time
24
25 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
26
27 def parse_results(msg_ops, statistics, fh):
28     expected_fail = 0
29     open_tests = []
30
31     while fh:
32         l = fh.readline()
33         if l == "":
34             break
35         parts = l.split(None, 1)
36         if not len(parts) == 2 or not l.startswith(parts[0]):
37             continue
38         command = parts[0].rstrip(":")
39         arg = parts[1]
40         if command in ("test", "testing"):
41             msg_ops.control_msg(l)
42             msg_ops.start_test(arg.rstrip())
43             open_tests.append(arg.rstrip())
44         elif command == "time":
45             msg_ops.control_msg(l)
46             grp = re.match(
47                 "(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", arg)
48             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
49         elif command in VALID_RESULTS:
50             msg_ops.control_msg(l)
51             result = command
52             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
53             (testname, hasreason) = (grp.group(1), grp.group(2))
54             if hasreason:
55                 reason = ""
56                 # reason may be specified in next lines
57                 terminated = False
58                 while fh:
59                     l = fh.readline()
60                     if l == "":
61                         break
62                     msg_ops.control_msg(l)
63                     if l == "]\n":
64                         terminated = True
65                         break
66                     else:
67                         reason += l
68                 
69                 if not terminated:
70                     statistics['TESTS_ERROR']+=1
71                     msg_ops.end_test(testname, "error", True, 
72                                        "reason (%s) interrupted" % result)
73                     return 1
74             else:
75                 reason = None
76             if result in ("success", "successful"):
77                 try:
78                     open_tests.remove(testname)
79                 except KeyError:
80                     statistics['TESTS_ERROR']+=1
81                     msg_ops.end_test(testname, "error", True, 
82                             "Test was never started")
83                 else:
84                     statistics['TESTS_EXPECTED_OK']+=1
85                     msg_ops.end_test(testname, "success", False, reason)
86             elif result in ("xfail", "knownfail"):
87                 try:
88                     open_tests.remove(testname)
89                 except KeyError:
90                     statistics['TESTS_ERROR']+=1
91                     msg_ops.end_test(testname, "error", True, 
92                             "Test was never started")
93                 else:
94                     statistics['TESTS_EXPECTED_FAIL']+=1
95                     msg_ops.end_test(testname, "xfail", False, reason)
96                     expected_fail+=1
97             elif result in ("failure", "fail"):
98                 try:
99                     open_tests.remove(testname)
100                 except KeyError:
101                     statistics['TESTS_ERROR']+=1
102                     msg_ops.end_test(testname, "error", True, 
103                             "Test was never started")
104                 else:
105                     statistics['TESTS_UNEXPECTED_FAIL']+=1
106                     msg_ops.end_test(testname, "failure", True, reason)
107             elif result == "skip":
108                 statistics['TESTS_SKIP']+=1
109                 # Allow tests to be skipped without prior announcement of test
110                 last = open_tests.pop()
111                 if last is not None and last != testname:
112                     open_tests.append(testname)
113                 msg_ops.end_test(testname, "skip", False, reason)
114             elif result == "error":
115                 statistics['TESTS_ERROR']+=1
116                 try:
117                     open_tests.remove(testname)
118                 except KeyError:
119                     pass
120                 msg_ops.end_test(testname, "error", True, reason)
121             elif result == "skip-testsuite":
122                 msg_ops.skip_testsuite(testname)
123             elif result == "testsuite-success":
124                 msg_ops.end_testsuite(testname, "success", reason)
125             elif result == "testsuite-failure":
126                 msg_ops.end_testsuite(testname, "failure", reason)
127             elif result == "testsuite-xfail":
128                 msg_ops.end_testsuite(testname, "xfail", reason)
129             elif result == "testsuite-error":
130                 msg_ops.end_testsuite(testname, "error", reason)
131         elif command == "testsuite":
132             msg_ops.start_testsuite(arg.strip())
133         elif command == "progress":
134             arg = arg.strip()
135             if arg == "pop":
136                 msg_ops.progress(None, subunit.PROGRESS_POP)
137             elif arg == "push":
138                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
139             elif arg[0] in '+-':
140                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
141             else:
142                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
143         else:
144             msg_ops.output_msg(l)
145
146     while open_tests:
147         msg_ops.end_test(open_tests.pop(), "error", True,
148                    "was started but never finished!")
149         statistics['TESTS_ERROR']+=1
150
151     if statistics['TESTS_ERROR'] > 0:
152         return 1
153     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
154         return 1 
155     return 0
156
157
158 class SubunitOps(object):
159
160     def start_test(self, testname):
161         print "test: %s" % testname
162
163     def end_test(self, name, result, reason=None):
164         if reason:
165             print "%s: %s [" % (result, name)
166             print "%s" % reason
167             print "]"
168         else:
169             print "%s: %s" % (result, name)
170
171     def skip_test(self, name, reason=None):
172         self.end_test(name, "skip", reason)
173
174     def fail_test(self, name, reason=None):
175         self.end_test(name, "fail", reason)
176
177     def success_test(self, name, reason=None):
178         self.end_test(name, "success", reason)
179
180     def xfail_test(self, name, reason=None):
181         self.end_test(name, "xfail", reason)
182
183     def report_time(self, t):
184         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
185         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
186
187     def progress(self, offset, whence):
188         if whence == subunit.PROGRESS_CUR and offset > -1:
189             prefix = "+"
190         elif whence == subunit.PROGRESS_PUSH:
191             prefix = ""
192             offset = "push"
193         elif whence == subunit.PROGRESS_POP:
194             prefix = ""
195             offset = "pop"
196         else:
197             prefix = ""
198         print "progress: %s%s" % (prefix, offset)
199
200     # The following are Samba extensions:
201     def start_testsuite(self, name):
202         print "testsuite: %s" % name
203
204     def skip_testsuite(self, name, reason=None):
205         if reason:
206             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
207         else:
208             print "skip-testsuite: %s" % name
209
210     def end_testsuite(self, name, result, reason=None):
211         if reason:
212             print "testsuite-%s: %s [" % (result, name)
213             print "%s" % reason
214             print "]"
215         else:
216             print "testsuite-%s: %s" % (result, name)
217
218
219 def read_test_regexes(name):
220     ret = {}
221     f = open(name, 'r')
222     try:
223         for l in f:
224             l = l.strip()
225             if l == "" or l[0] == "#":
226                 continue
227             if "#" in l:
228                 (regex, reason) = l.split("#", 1)
229                 ret[regex.strip()] = reason.strip()
230             else:
231                 ret[l] = None
232     finally:
233         f.close()
234     return ret
235
236
237 def find_in_list(regexes, fullname):
238     for regex, reason in regexes.iteritems():
239         if re.match(regex, fullname):
240             if reason is None:
241                 return ""
242             return reason
243     return None
244
245
246 class FilterOps(object):
247
248     def control_msg(self, msg):
249         pass # We regenerate control messages, so ignore this
250
251     def report_time(self, time):
252         self._ops.report_time(time)
253
254     def progress(self, delta, whence):
255         self._ops.progress(delta, whence)
256
257     def output_msg(self, msg):
258         if self.output is None:
259             sys.stdout.write(msg)
260         else:
261             self.output+=msg
262
263     def start_test(self, testname):
264         if self.prefix is not None:
265             testname = self.prefix + testname
266
267         if self.strip_ok_output:
268            self.output = ""
269
270         self._ops.start_test(testname)
271
272     def end_test(self, testname, result, unexpected, reason):
273         if self.prefix is not None:
274             testname = self.prefix + testname
275
276         if result in ("fail", "failure") and not unexpected:
277             result = "xfail"
278             self.xfail_added+=1
279             self.total_xfail+=1
280         xfail_reason = find_in_list(self.expected_failures, testname)
281         if xfail_reason is not None and result in ("fail", "failure"):
282             result = "xfail"
283             self.xfail_added+=1
284             self.total_xfail+=1
285             reason += xfail_reason
286
287         if result in ("fail", "failure"):
288             self.fail_added+=1
289             self.total_fail+=1
290
291         if result == "error":
292             self.error_added+=1
293             self.total_error+=1
294
295         if self.strip_ok_output:
296             if result not in ("success", "xfail", "skip"):
297                 print self.output
298         self.output = None
299
300         self._ops.end_test(testname, result, reason)
301
302     def skip_testsuite(self, name, reason=None):
303         self._ops.skip_testsuite(name, reason)
304
305     def start_testsuite(self, name):
306         self._ops.start_testsuite(name)
307
308         self.error_added = 0
309         self.fail_added = 0
310         self.xfail_added = 0
311
312     def end_testsuite(self, name, result, reason=None):
313         xfail = False
314
315         if self.xfail_added > 0:
316             xfail = True
317         if self.fail_added > 0 or self.error_added > 0:
318             xfail = False
319
320         if xfail and result in ("fail", "failure"):
321             result = "xfail"
322
323         if self.fail_added > 0 and result != "failure":
324             result = "failure"
325             if reason is None:
326                 reason = "Subunit/Filter Reason"
327             reason += "\n failures[%d]" % self.fail_added
328
329         if self.error_added > 0 and result != "error":
330             result = "error"
331             if reason is None:
332                 reason = "Subunit/Filter Reason"
333             reason += "\n errors[%d]" % self.error_added
334
335         self._ops.end_testsuite(name, result, reason)
336
337     def __init__(self, prefix, expected_failures, strip_ok_output):
338         self._ops = SubunitOps()
339         self.output = None
340         self.prefix = prefix
341         self.expected_failures = expected_failures
342         self.strip_ok_output = strip_ok_output
343         self.xfail_added = 0
344         self.total_xfail = 0
345         self.total_error = 0
346         self.total_fail = 0