Fix handling of unexpected failures in subunithelper.
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 from samba import subunit
23 from samba.subunit.run import TestProtocolClient
24 from subunit import iso8601
25 import unittest
26
27 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
28
29 class TestsuiteEnabledTestResult(unittest.TestResult):
30
31     def start_testsuite(self, name):
32         raise NotImplementedError(self.start_testsuite)
33
34
35 def parse_results(msg_ops, statistics, fh):
36     exitcode = 0
37     open_tests = {}
38
39     while fh:
40         l = fh.readline()
41         if l == "":
42             break
43         parts = l.split(None, 1)
44         if not len(parts) == 2 or not l.startswith(parts[0]):
45             msg_ops.output_msg(l)
46             continue
47         command = parts[0].rstrip(":")
48         arg = parts[1]
49         if command in ("test", "testing"):
50             msg_ops.control_msg(l)
51             name = arg.rstrip()
52             test = subunit.RemotedTestCase(name)
53             if name in open_tests:
54                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
55             msg_ops.startTest(test)
56             open_tests[name] = test
57         elif command == "time":
58             msg_ops.control_msg(l)
59             try:
60                 dt = iso8601.parse_date(arg.rstrip("\n"))
61             except TypeError, e:
62                 print "Unable to parse time line: %s" % arg.rstrip("\n")
63             else:
64                 msg_ops.time(dt)
65         elif command in VALID_RESULTS:
66             msg_ops.control_msg(l)
67             result = command
68             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
69             (testname, hasreason) = (grp.group(1), grp.group(2))
70             if hasreason:
71                 reason = ""
72                 # reason may be specified in next lines
73                 terminated = False
74                 while fh:
75                     l = fh.readline()
76                     if l == "":
77                         break
78                     msg_ops.control_msg(l)
79                     if l == "]\n":
80                         terminated = True
81                         break
82                     else:
83                         reason += l
84
85                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
86
87                 if not terminated:
88                     statistics['TESTS_ERROR']+=1
89                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
90                     return 1
91             else:
92                 reason = None
93                 remote_error = subunit.RemoteError(u"No reason specified")
94             if result in ("success", "successful"):
95                 try:
96                     test = open_tests.pop(testname)
97                 except KeyError:
98                     statistics['TESTS_ERROR']+=1
99                     exitcode = 1
100                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
101                 else:
102                     statistics['TESTS_EXPECTED_OK']+=1
103                     msg_ops.addSuccess(test)
104             elif result in ("xfail", "knownfail"):
105                 try:
106                     test = open_tests.pop(testname)
107                 except KeyError:
108                     statistics['TESTS_ERROR']+=1
109                     exitcode = 1
110                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
111                 else:
112                     statistics['TESTS_EXPECTED_FAIL']+=1
113                     msg_ops.addExpectedFailure(test, remote_error)
114             elif result in ("uxsuccess", ):
115                 try:
116                     test = open_tests.pop(testname)
117                 except KeyError:
118                     statistics['TESTS_ERROR']+=1
119                     exitcode = 1
120                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
121                 else:
122                     statistics['TESTS_UNEXPECTED_OK']+=1
123                     msg_ops.addUnexpectedSuccess(test, remote_error)
124                     exitcode = 1
125             elif result in ("failure", "fail"):
126                 try:
127                     test = open_tests.pop(testname)
128                 except KeyError:
129                     statistics['TESTS_ERROR']+=1
130                     exitcode = 1
131                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
132                 else:
133                     statistics['TESTS_UNEXPECTED_FAIL']+=1
134                     exitcode = 1
135                     msg_ops.addFailure(test, remote_error)
136             elif result == "skip":
137                 statistics['TESTS_SKIP']+=1
138                 # Allow tests to be skipped without prior announcement of test
139                 try:
140                     test = open_tests.pop(testname)
141                 except KeyError:
142                     test = subunit.RemotedTestCase(testname)
143                 msg_ops.addSkip(test, reason)
144             elif result == "error":
145                 statistics['TESTS_ERROR']+=1
146                 exitcode = 1
147                 try:
148                     test = open_tests.pop(testname)
149                 except KeyError:
150                     test = subunit.RemotedTestCase(testname)
151                 msg_ops.addError(test, remote_error)
152             elif result == "skip-testsuite":
153                 msg_ops.skip_testsuite(testname)
154             elif result == "testsuite-success":
155                 msg_ops.end_testsuite(testname, "success", reason)
156             elif result == "testsuite-failure":
157                 msg_ops.end_testsuite(testname, "failure", reason)
158                 exitcode = 1
159             elif result == "testsuite-xfail":
160                 msg_ops.end_testsuite(testname, "xfail", reason)
161             elif result == "testsuite-uxsuccess":
162                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
163                 exitcode = 1
164             elif result == "testsuite-error":
165                 msg_ops.end_testsuite(testname, "error", reason)
166                 exitcode = 1
167             else:
168                 raise AssertionError("Recognized but unhandled result %r" %
169                     result)
170         elif command == "testsuite":
171             msg_ops.start_testsuite(arg.strip())
172         elif command == "progress":
173             arg = arg.strip()
174             if arg == "pop":
175                 msg_ops.progress(None, subunit.PROGRESS_POP)
176             elif arg == "push":
177                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
178             elif arg[0] in '+-':
179                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
180             else:
181                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
182         else:
183             msg_ops.output_msg(l)
184
185     while open_tests:
186         test = subunit.RemotedTestCase(open_tests.popitem()[1])
187         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
188         statistics['TESTS_ERROR']+=1
189         exitcode = 1
190
191     return exitcode
192
193
194 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
195
196     def progress(self, count, whence):
197         pass
198
199     # The following are Samba extensions:
200     def start_testsuite(self, name):
201         self._stream.write("testsuite: %s\n" % name)
202
203     def skip_testsuite(self, name, reason=None):
204         if reason:
205             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
206         else:
207             self._stream.write("skip-testsuite: %s\n" % name)
208
209     def end_testsuite(self, name, result, reason=None):
210         if reason:
211             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
212         else:
213             self._stream.write("testsuite-%s: %s\n" % (result, name))
214
215     def output_msg(self, msg):
216         self._stream.write(msg)
217
218
219 def read_test_regexes(name):
220     ret = {}
221     f = open(name, 'r')
222     try:
223         for l in f:
224             l = l.strip()
225             if l == "" or l[0] == "#":
226                 continue
227             if "#" in l:
228                 (regex, reason) = l.split("#", 1)
229                 ret[regex.strip()] = reason.strip()
230             else:
231                 ret[l] = None
232     finally:
233         f.close()
234     return ret
235
236
237 def find_in_list(regexes, fullname):
238     for regex, reason in regexes.iteritems():
239         if re.match(regex, fullname):
240             if reason is None:
241                 return ""
242             return reason
243     return None
244
245
246 class ImmediateFail(Exception):
247     """Raised to abort immediately."""
248
249     def __init__(self):
250         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
251
252
253 class FilterOps(unittest.TestResult):
254
255     def control_msg(self, msg):
256         pass # We regenerate control messages, so ignore this
257
258     def time(self, time):
259         self._ops.time(time)
260
261     def progress(self, delta, whence):
262         self._ops.progress(delta, whence)
263
264     def output_msg(self, msg):
265         if self.output is None:
266             sys.stdout.write(msg)
267         else:
268             self.output+=msg
269
270     def startTest(self, test):
271         self.seen_output = True
272         test = self._add_prefix(test)
273         if self.strip_ok_output:
274            self.output = ""
275
276         self._ops.startTest(test)
277
278     def _add_prefix(self, test):
279         prefix = ""
280         suffix = ""
281         if self.prefix is not None:
282             prefix = self.prefix
283         if self.suffix is not None:
284             suffix = self.suffix
285
286         return subunit.RemotedTestCase(prefix + test.id() + suffix)
287
288     def addError(self, test, err=None):
289         test = self._add_prefix(test)
290         self.error_added+=1
291         self.total_error+=1
292         self._ops.addError(test, err)
293         self.output = None
294         if self.fail_immediately:
295             raise ImmediateFail()
296
297     def addSkip(self, test, reason=None):
298         self.seen_output = True
299         test = self._add_prefix(test)
300         self._ops.addSkip(test, reason)
301         self.output = None
302
303     def addExpectedFailure(self, test, err=None):
304         test = self._add_prefix(test)
305         self._ops.addExpectedFailure(test, err)
306         self.output = None
307
308     def addUnexpectedSuccess(self, test, err=None):
309         test = self._add_prefix(test)
310         self.uxsuccess_added+=1
311         self.total_uxsuccess+=1
312         self._ops.addUnexpectedSuccess(test, err)
313         if self.output:
314             self._ops.output_msg(self.output)
315         self.output = None
316         if self.fail_immediately:
317             raise ImmediateFail()
318
319     def addFailure(self, test, err=None):
320         test = self._add_prefix(test)
321         xfail_reason = find_in_list(self.expected_failures, test.id())
322         if xfail_reason is None:
323             xfail_reason = find_in_list(self.flapping, test.id())
324         if xfail_reason is not None:
325             self.xfail_added+=1
326             self.total_xfail+=1
327             self._ops.addExpectedFailure(test, err)
328         else:
329             self.fail_added+=1
330             self.total_fail+=1
331             self._ops.addFailure(test, err)
332             if self.output:
333                 self._ops.output_msg(self.output)
334             if self.fail_immediately:
335                 raise ImmediateFail()
336         self.output = None
337
338     def addSuccess(self, test):
339         test = self._add_prefix(test)
340         xfail_reason = find_in_list(self.expected_failures, test.id())
341         if xfail_reason is not None:
342             self.uxsuccess_added += 1
343             self.total_uxsuccess += 1
344             self._ops.addUnexpectedSuccess(test, subunit.RemoteError(xfail_reason))
345             if self.output:
346                 self._ops.output_msg(self.output)
347             if self.fail_immediately:
348                 raise ImmediateFail()
349         else:
350             self._ops.addSuccess(test)
351         self.output = None
352
353     def skip_testsuite(self, name, reason=None):
354         self._ops.skip_testsuite(name, reason)
355
356     def start_testsuite(self, name):
357         self._ops.start_testsuite(name)
358         self.error_added = 0
359         self.fail_added = 0
360         self.xfail_added = 0
361         self.uxsuccess_added = 0
362
363     def end_testsuite(self, name, result, reason=None):
364         xfail = False
365
366         if self.xfail_added > 0:
367             xfail = True
368         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
369             xfail = False
370
371         if xfail and result in ("fail", "failure"):
372             result = "xfail"
373
374         if self.uxsuccess_added > 0 and result != "uxsuccess":
375             result = "uxsuccess"
376             if reason is None:
377                 reason = "Subunit/Filter Reason"
378             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
379
380         if self.fail_added > 0 and result != "failure":
381             result = "failure"
382             if reason is None:
383                 reason = "Subunit/Filter Reason"
384             reason += "\n failures[%d]" % self.fail_added
385
386         if self.error_added > 0 and result != "error":
387             result = "error"
388             if reason is None:
389                 reason = "Subunit/Filter Reason"
390             reason += "\n errors[%d]" % self.error_added
391
392         self._ops.end_testsuite(name, result, reason)
393         if result not in ("success", "xfail"):
394             if self.output:
395                 self._ops.output_msg(self.output)
396             if self.fail_immediately:
397                 raise ImmediateFail()
398         self.output = None
399
400     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
401                  strip_ok_output=False, fail_immediately=False,
402                  flapping=None):
403         self._ops = out
404         self.seen_output = False
405         self.output = None
406         self.prefix = prefix
407         self.suffix = suffix
408         if expected_failures is not None:
409             self.expected_failures = expected_failures
410         else:
411             self.expected_failures = {}
412         if flapping is not None:
413             self.flapping = flapping
414         else:
415             self.flapping = {}
416         self.strip_ok_output = strip_ok_output
417         self.xfail_added = 0
418         self.fail_added = 0
419         self.uxsuccess_added = 0
420         self.total_xfail = 0
421         self.total_error = 0
422         self.total_fail = 0
423         self.total_uxsuccess = 0
424         self.error_added = 0
425         self.fail_immediately = fail_immediately
426
427
428 class PlainFormatter(TestsuiteEnabledTestResult):
429
430     def __init__(self, verbose, immediate, statistics,
431             totaltests=None):
432         super(PlainFormatter, self).__init__()
433         self.verbose = verbose
434         self.immediate = immediate
435         self.statistics = statistics
436         self.start_time = None
437         self.test_output = {}
438         self.suitesfailed = []
439         self.suites_ok = 0
440         self.skips = {}
441         self.index = 0
442         self.name = None
443         self._progress_level = 0
444         self.totalsuites = totaltests
445         self.last_time = None
446
447     @staticmethod
448     def _format_time(delta):
449         minutes, seconds = divmod(delta.seconds, 60)
450         hours, minutes = divmod(minutes, 60)
451         ret = ""
452         if hours:
453             ret += "%dh" % hours
454         if minutes:
455             ret += "%dm" % minutes
456         ret += "%ds" % seconds
457         return ret
458
459     def progress(self, offset, whence):
460         if whence == subunit.PROGRESS_POP:
461             self._progress_level -= 1
462         elif whence == subunit.PROGRESS_PUSH:
463             self._progress_level += 1
464         elif whence == subunit.PROGRESS_SET:
465             if self._progress_level == 0:
466                 self.totalsuites = offset
467         elif whence == subunit.PROGRESS_CUR:
468             raise NotImplementedError
469
470     def time(self, dt):
471         if self.start_time is None:
472             self.start_time = dt
473         self.last_time = dt
474
475     def start_testsuite(self, name):
476         self.index += 1
477         self.name = name
478
479         if not self.verbose:
480             self.test_output[name] = ""
481
482         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
483                        self.statistics['TESTS_EXPECTED_FAIL'] +
484                        self.statistics['TESTS_ERROR'] +
485                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
486                        self.statistics['TESTS_UNEXPECTED_OK'])
487
488         out = "[%d(%d)" % (self.index, total_tests)
489         if self.totalsuites is not None:
490             out += "/%d" % self.totalsuites
491         if self.start_time is not None:
492             out += " at " + self._format_time(self.last_time - self.start_time)
493         if self.suitesfailed:
494             out += ", %d errors" % (len(self.suitesfailed),)
495         out += "] %s" % name
496         if self.immediate:
497             sys.stdout.write(out + "\n")
498         else:
499             sys.stdout.write(out + ": ")
500
501     def output_msg(self, output):
502         if self.verbose:
503             sys.stdout.write(output)
504         elif self.name is not None:
505             self.test_output[self.name] += output
506         else:
507             sys.stdout.write(output)
508
509     def control_msg(self, output):
510         pass
511
512     def end_testsuite(self, name, result, reason):
513         out = ""
514         unexpected = False
515
516         if not name in self.test_output:
517             print "no output for name[%s]" % name
518
519         if result in ("success", "xfail"):
520             self.suites_ok+=1
521         else:
522             self.output_msg("ERROR: Testsuite[%s]\n" % name)
523             if reason is not None:
524                 self.output_msg("REASON: %s\n" % (reason,))
525             self.suitesfailed.append(name)
526             if self.immediate and not self.verbose and name in self.test_output:
527                 out += self.test_output[name]
528             unexpected = True
529
530         if not self.immediate:
531             if not unexpected:
532                 out += " ok\n"
533             else:
534                 out += " " + result.upper() + "\n"
535
536         sys.stdout.write(out)
537
538     def startTest(self, test):
539         pass
540
541     def addSuccess(self, test):
542         self.end_test(test.id(), "success", False)
543
544     def addError(self, test, err=None):
545         self.end_test(test.id(), "error", True, err)
546
547     def addFailure(self, test, err=None):
548         self.end_test(test.id(), "failure", True, err)
549
550     def addSkip(self, test, reason=None):
551         self.end_test(test.id(), "skip", False, reason)
552
553     def addExpectedFailure(self, test, err=None):
554         self.end_test(test.id(), "xfail", False, err)
555
556     def addUnexpectedSuccess(self, test):
557         self.end_test(test.id(), "uxsuccess", True)
558
559     def end_test(self, testname, result, unexpected, err=None):
560         if not unexpected:
561             self.test_output[self.name] = ""
562             if not self.immediate:
563                 sys.stdout.write({
564                     'failure': 'f',
565                     'xfail': 'X',
566                     'skip': 's',
567                     'success': '.'}.get(result, "?(%s)" % result))
568             return
569
570         if not self.name in self.test_output:
571             self.test_output[self.name] = ""
572
573         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
574         if err is not None:
575             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
576
577         if self.immediate and not self.verbose:
578             sys.stdout.write(self.test_output[self.name])
579             self.test_output[self.name] = ""
580
581         if not self.immediate:
582             sys.stdout.write({
583                'error': 'E',
584                'failure': 'F',
585                'uxsuccess': 'U',
586                'success': 'S'}.get(result, "?"))
587
588     def write_summary(self, path):
589         f = open(path, 'w+')
590
591         if self.suitesfailed:
592             f.write("= Failed tests =\n")
593
594             for suite in self.suitesfailed:
595                 f.write("== %s ==\n" % suite)
596                 if suite in self.test_output:
597                     f.write(self.test_output[suite]+"\n\n")
598
599             f.write("\n")
600
601         if not self.immediate and not self.verbose:
602             for suite in self.suitesfailed:
603                 print "=" * 78
604                 print "FAIL: %s" % suite
605                 if suite in self.test_output:
606                     print self.test_output[suite]
607                 print ""
608
609         f.write("= Skipped tests =\n")
610         for reason in self.skips.keys():
611             f.write(reason + "\n")
612             for name in self.skips[reason]:
613                 f.write("\t%s\n" % name)
614             f.write("\n")
615         f.close()
616
617         if (not self.suitesfailed and
618             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
619             not self.statistics['TESTS_UNEXPECTED_OK'] and
620             not self.statistics['TESTS_ERROR']):
621             ok = (self.statistics['TESTS_EXPECTED_OK'] +
622                   self.statistics['TESTS_EXPECTED_FAIL'])
623             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
624         else:
625             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
626                 self.statistics['TESTS_UNEXPECTED_FAIL'],
627                 self.statistics['TESTS_ERROR'],
628                 self.statistics['TESTS_UNEXPECTED_OK'],
629                 len(self.suitesfailed))
630
631     def skip_testsuite(self, name, reason="UNKNOWN"):
632         self.skips.setdefault(reason, []).append(name)
633         if self.totalsuites:
634             self.totalsuites-=1