selftest: Support parsing progress in format-subunit/filter-subunit.
[amitay/samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import time
24
25 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
26
27 def parse_results(msg_ops, statistics, fh):
28     expected_fail = 0
29     open_tests = []
30
31     while fh:
32         l = fh.readline()
33         if l == "":
34             break
35         if l.startswith("test: "):
36             msg_ops.control_msg(l)
37             name = l.split(":", 1)[1].strip()
38             msg_ops.start_test(name)
39             open_tests.append(name)
40         elif l.startswith("time: "):
41             grp = re.match(
42                 "^time: (\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)\n", l)
43             msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(grp.group(6)), 0, 0, 0)))
44         elif re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l):
45             msg_ops.control_msg(l)
46             grp = re.match("^(" + "|".join(VALID_RESULTS) + "): (.*?)( \[)?([ \t]*)( multipart)?\n", l)
47             (result, testname, hasreason) = (grp.group(1), grp.group(2), grp.group(3))
48             if hasreason:
49                 reason = ""
50                 # reason may be specified in next lines
51                 terminated = False
52                 while fh:
53                     l = fh.readline()
54                     if l == "":
55                         break
56                     msg_ops.control_msg(l)
57                     if l == "]\n":
58                         terminated = True
59                         break
60                     else:
61                         reason += l
62                 
63                 if not terminated:
64                     statistics['TESTS_ERROR']+=1
65                     msg_ops.end_test(testname, "error", True, 
66                                        "reason (%s) interrupted" % result)
67                     return 1
68             else:
69                 reason = None
70             if result in ("success", "successful"):
71                 open_tests.pop() #FIXME: Check that popped value == $testname 
72                 statistics['TESTS_EXPECTED_OK']+=1
73                 msg_ops.end_test(testname, "success", False, reason)
74             elif result in ("xfail", "knownfail"):
75                 open_tests.pop() #FIXME: Check that popped value == $testname
76                 statistics['TESTS_EXPECTED_FAIL']+=1
77                 msg_ops.end_test(testname, "xfail", False, reason)
78                 expected_fail+=1
79             elif result in ("failure", "fail"):
80                 open_tests.pop() #FIXME: Check that popped value == $testname
81                 statistics['TESTS_UNEXPECTED_FAIL']+=1
82                 msg_ops.end_test(testname, "failure", True, reason)
83             elif result == "skip":
84                 statistics['TESTS_SKIP']+=1
85                 # Allow tests to be skipped without prior announcement of test
86                 last = open_tests.pop()
87                 if last is not None and last != testname:
88                     open_tests.append(testname)
89                 msg_ops.end_test(testname, "skip", False, reason)
90             elif result == "error":
91                 statistics['TESTS_ERROR']+=1
92                 open_tests.pop() #FIXME: Check that popped value == $testname
93                 msg_ops.end_test(testname, "error", True, reason)
94             elif result == "skip-testsuite":
95                 msg_ops.skip_testsuite(testname)
96             elif result == "testsuite-success":
97                 msg_ops.end_testsuite(testname, "success", reason)
98             elif result == "testsuite-failure":
99                 msg_ops.end_testsuite(testname, "failure", reason)
100             elif result == "testsuite-xfail":
101                 msg_ops.end_testsuite(testname, "xfail", reason)
102             elif result == "testsuite-error":
103                 msg_ops.end_testsuite(testname, "error", reason)
104         elif l.startswith("testsuite: "):
105             msg_ops.start_testsuite(l.split(":", 1)[1].strip())
106         elif l.startswith("testsuite-count: "):
107             msg_ops.testsuite_count(int(l.split(":", 1)[1].strip()))
108         elif l.startswith("progress: "):
109             arg = l.split(":", 1)[1].strip()
110             if arg == "pop":
111                 msg_ops.progress(None, subunit.PROGRESS_POP)
112             elif arg == "push":
113                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
114             elif arg[0] in '+-':
115                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
116             else:
117                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
118         else:
119             msg_ops.output_msg(l)
120
121     while open_tests:
122         msg_ops.end_test(open_tests.pop(), "error", True,
123                    "was started but never finished!")
124         statistics['TESTS_ERROR']+=1
125
126     if statistics['TESTS_ERROR'] > 0:
127         return 1
128     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
129         return 1 
130     return 0
131
132
133 class SubunitOps(object):
134
135     def start_test(self, testname):
136         print "test: %s" % testname
137
138     def end_test(self, name, result, reason=None):
139         if reason:
140             print "%s: %s [" % (result, name)
141             print "%s" % reason
142             print "]"
143         else:
144             print "%s: %s" % (result, name)
145
146     def skip_test(self, name, reason=None):
147         self.end_test(name, "skip", reason)
148
149     def fail_test(self, name, reason=None):
150         self.end_test(name, "fail", reason)
151
152     def success_test(self, name, reason=None):
153         self.end_test(name, "success", reason)
154
155     def xfail_test(self, name, reason=None):
156         self.end_test(name, "xfail", reason)
157
158     def report_time(self, t):
159         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
160         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
161
162     def progress(self, offset, whence):
163         if whence == subunit.PROGRESS_CUR and offset > -1:
164             prefix = "+"
165         elif whence == subunit.PROGRESS_PUSH:
166             prefix = ""
167             offset = "push"
168         elif whence == subunit.PROGRESS_POP:
169             prefix = ""
170             offset = "pop"
171         else:
172             prefix = ""
173         print "progress: %s%s" % (prefix, offset)
174
175     # The following are Samba extensions:
176     def start_testsuite(self, name):
177         print "testsuite: %s" % name
178
179     def skip_testsuite(self, name, reason=None):
180         if reason:
181             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
182         else:
183             print "skip-testsuite: %s" % name
184
185     def end_testsuite(self, name, result, reason=None):
186         if reason:
187             print "testsuite-%s: %s [" % (result, name)
188             print "%s" % reason
189             print "]"
190         else:
191             print "testsuite-%s: %s" % (result, name)
192
193     def testsuite_count(self, count):
194         print "testsuite-count: %d" % count
195
196
197 def read_test_regexes(name):
198     f = open(name, 'r')
199     try:
200         for l in f:
201             l = l.strip()
202             if l == "" or l[0] == "#":
203                 continue
204             if "#" in l:
205                 (regex, reason) = l.split("#", 1)
206                 yield (regex.strip(), reason.strip())
207             else:
208                 yield l, None
209     finally:
210         f.close()
211
212
213 def find_in_list(regexes, fullname):
214     for regex, reason in regexes:
215         if re.match(regex, fullname):
216             if reason is None:
217                 return ""
218             return reason
219     return None
220
221
222 class FilterOps(object):
223
224     def control_msg(self, msg):
225         pass # We regenerate control messages, so ignore this
226
227     def report_time(self, time):
228         self._ops.report_time(time)
229
230     def progress(self, delta, whence):
231         self._ops.progress(delta, whence)
232
233     def output_msg(self, msg):
234         if self.output is None:
235             sys.stdout.write(msg)
236         else:
237             self.output+=msg
238
239     def start_test(self, testname):
240         if self.prefix is not None:
241             testname = self.prefix + testname
242
243         if self.strip_ok_output:
244            self.output = ""
245
246         self._ops.start_test(testname)
247
248     def end_test(self, testname, result, unexpected, reason):
249         if self.prefix is not None:
250             testname = self.prefix + testname
251
252         if result in ("fail", "failure") and not unexpected:
253             result = "xfail"
254             self.xfail_added+=1
255             self.total_xfail+=1
256         xfail_reason = find_in_list(self.expected_failures, testname)
257         if xfail_reason is not None and result in ("fail", "failure"):
258             result = "xfail"
259             self.xfail_added+=1
260             self.total_xfail+=1
261             reason += xfail_reason
262
263         if result in ("fail", "failure"):
264             self.fail_added+=1
265             self.total_fail+=1
266
267         if result == "error":
268             self.error_added+=1
269             self.total_error+=1
270
271         if self.strip_ok_output:
272             if result not in ("success", "xfail", "skip"):
273                 print self.output
274         self.output = None
275
276         self._ops.end_test(testname, result, reason)
277
278     def skip_testsuite(self, name, reason=None):
279         self._ops.skip_testsuite(name, reason)
280
281     def start_testsuite(self, name):
282         self._ops.start_testsuite(name)
283
284         self.error_added = 0
285         self.fail_added = 0
286         self.xfail_added = 0
287
288     def end_testsuite(self, name, result, reason=None):
289         xfail = False
290
291         if self.xfail_added > 0:
292             xfail = True
293         if self.fail_added > 0 or self.error_added > 0:
294             xfail = False
295
296         if xfail and result in ("fail", "failure"):
297             result = "xfail"
298
299         if self.fail_added > 0 and result != "failure":
300             result = "failure"
301             if reason is None:
302                 reason = "Subunit/Filter Reason"
303             reason += "\n failures[%d]" % self.fail_added
304
305         if self.error_added > 0 and result != "error":
306             result = "error"
307             if reason is None:
308                 reason = "Subunit/Filter Reason"
309             reason += "\n errors[%d]" % self.error_added
310
311         self._ops.end_testsuite(name, result, reason)
312
313     def testsuite_count(self, count):
314         self._ops.testsuite_count(count)
315
316     def __init__(self, prefix, expected_failures, strip_ok_output):
317         self._ops = SubunitOps()
318         self.output = None
319         self.prefix = prefix
320         self.expected_failures = expected_failures
321         self.strip_ok_output = strip_ok_output
322         self.xfail_added = 0
323         self.total_xfail = 0
324         self.total_error = 0
325         self.total_fail = 0