Set failfast property for test reporters that need it.
[obnox/samba/samba-obnox.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 from samba import subunit
23 from samba.subunit.run import TestProtocolClient
24 from samba.subunit import iso8601
25 import unittest
26
27 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
28
29 class TestsuiteEnabledTestResult(unittest.TestResult):
30
31     def start_testsuite(self, name):
32         raise NotImplementedError(self.start_testsuite)
33
34
35 def parse_results(msg_ops, statistics, fh):
36     exitcode = 0
37     open_tests = {}
38
39     while fh:
40         l = fh.readline()
41         if l == "":
42             break
43         parts = l.split(None, 1)
44         if not len(parts) == 2 or not l.startswith(parts[0]):
45             msg_ops.output_msg(l)
46             continue
47         command = parts[0].rstrip(":")
48         arg = parts[1]
49         if command in ("test", "testing"):
50             msg_ops.control_msg(l)
51             name = arg.rstrip()
52             test = subunit.RemotedTestCase(name)
53             if name in open_tests:
54                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
55             msg_ops.startTest(test)
56             open_tests[name] = test
57         elif command == "time":
58             msg_ops.control_msg(l)
59             try:
60                 dt = iso8601.parse_date(arg.rstrip("\n"))
61             except TypeError, e:
62                 print "Unable to parse time line: %s" % arg.rstrip("\n")
63             else:
64                 msg_ops.time(dt)
65         elif command in VALID_RESULTS:
66             msg_ops.control_msg(l)
67             result = command
68             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
69             (testname, hasreason) = (grp.group(1), grp.group(2))
70             if hasreason:
71                 reason = ""
72                 # reason may be specified in next lines
73                 terminated = False
74                 while fh:
75                     l = fh.readline()
76                     if l == "":
77                         break
78                     msg_ops.control_msg(l)
79                     if l == "]\n":
80                         terminated = True
81                         break
82                     else:
83                         reason += l
84
85                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
86
87                 if not terminated:
88                     statistics['TESTS_ERROR']+=1
89                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
90                     return 1
91             else:
92                 reason = None
93                 remote_error = subunit.RemoteError(u"No reason specified")
94             if result in ("success", "successful"):
95                 try:
96                     test = open_tests.pop(testname)
97                 except KeyError:
98                     statistics['TESTS_ERROR']+=1
99                     exitcode = 1
100                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
101                 else:
102                     statistics['TESTS_EXPECTED_OK']+=1
103                     msg_ops.addSuccess(test)
104             elif result in ("xfail", "knownfail"):
105                 try:
106                     test = open_tests.pop(testname)
107                 except KeyError:
108                     statistics['TESTS_ERROR']+=1
109                     exitcode = 1
110                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
111                 else:
112                     statistics['TESTS_EXPECTED_FAIL']+=1
113                     msg_ops.addExpectedFailure(test, remote_error)
114             elif result in ("uxsuccess", ):
115                 try:
116                     test = open_tests.pop(testname)
117                 except KeyError:
118                     statistics['TESTS_ERROR']+=1
119                     exitcode = 1
120                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
121                 else:
122                     statistics['TESTS_UNEXPECTED_OK']+=1
123                     msg_ops.addUnexpectedSuccess(test, remote_error)
124                     exitcode = 1
125             elif result in ("failure", "fail"):
126                 try:
127                     test = open_tests.pop(testname)
128                 except KeyError:
129                     statistics['TESTS_ERROR']+=1
130                     exitcode = 1
131                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
132                 else:
133                     statistics['TESTS_UNEXPECTED_FAIL']+=1
134                     exitcode = 1
135                     msg_ops.addFailure(test, remote_error)
136             elif result == "skip":
137                 statistics['TESTS_SKIP']+=1
138                 # Allow tests to be skipped without prior announcement of test
139                 try:
140                     test = open_tests.pop(testname)
141                 except KeyError:
142                     test = subunit.RemotedTestCase(testname)
143                 msg_ops.addSkip(test, reason)
144             elif result == "error":
145                 statistics['TESTS_ERROR']+=1
146                 exitcode = 1
147                 try:
148                     test = open_tests.pop(testname)
149                 except KeyError:
150                     test = subunit.RemotedTestCase(testname)
151                 msg_ops.addError(test, remote_error)
152             elif result == "skip-testsuite":
153                 msg_ops.skip_testsuite(testname)
154             elif result == "testsuite-success":
155                 msg_ops.end_testsuite(testname, "success", reason)
156             elif result == "testsuite-failure":
157                 msg_ops.end_testsuite(testname, "failure", reason)
158                 exitcode = 1
159             elif result == "testsuite-xfail":
160                 msg_ops.end_testsuite(testname, "xfail", reason)
161             elif result == "testsuite-uxsuccess":
162                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
163                 exitcode = 1
164             elif result == "testsuite-error":
165                 msg_ops.end_testsuite(testname, "error", reason)
166                 exitcode = 1
167             else:
168                 raise AssertionError("Recognized but unhandled result %r" %
169                     result)
170         elif command == "testsuite":
171             msg_ops.start_testsuite(arg.strip())
172         elif command == "progress":
173             arg = arg.strip()
174             if arg == "pop":
175                 msg_ops.progress(None, subunit.PROGRESS_POP)
176             elif arg == "push":
177                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
178             elif arg[0] in '+-':
179                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
180             else:
181                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
182         else:
183             msg_ops.output_msg(l)
184
185     while open_tests:
186         test = subunit.RemotedTestCase(open_tests.popitem()[1])
187         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
188         statistics['TESTS_ERROR']+=1
189         exitcode = 1
190
191     return exitcode
192
193
194 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
195
196     def progress(self, count, whence):
197         if whence == subunit.PROGRESS_POP:
198             self._stream.write("progress: pop\n")
199         elif whence == subunit.PROGRESS_PUSH:
200             self._stream.write("progress: push\n")
201         elif whence == subunit.PROGRESS_SET:
202             self._stream.write("progress: %d\n" % count)
203         elif whence == subunit.PROGRESS_CUR:
204             raise NotImplementedError
205
206     # The following are Samba extensions:
207     def start_testsuite(self, name):
208         self._stream.write("testsuite: %s\n" % name)
209
210     def skip_testsuite(self, name, reason=None):
211         if reason:
212             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
213         else:
214             self._stream.write("skip-testsuite: %s\n" % name)
215
216     def end_testsuite(self, name, result, reason=None):
217         if reason:
218             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
219         else:
220             self._stream.write("testsuite-%s: %s\n" % (result, name))
221
222     def output_msg(self, msg):
223         self._stream.write(msg)
224
225
226 def read_test_regexes(name):
227     ret = {}
228     f = open(name, 'r')
229     try:
230         for l in f:
231             l = l.strip()
232             if l == "" or l[0] == "#":
233                 continue
234             if "#" in l:
235                 (regex, reason) = l.split("#", 1)
236                 ret[regex.strip()] = reason.strip()
237             else:
238                 ret[l] = None
239     finally:
240         f.close()
241     return ret
242
243
244 def find_in_list(regexes, fullname):
245     for regex, reason in regexes.iteritems():
246         if re.match(regex, fullname):
247             if reason is None:
248                 return ""
249             return reason
250     return None
251
252
253 class ImmediateFail(Exception):
254     """Raised to abort immediately."""
255
256     def __init__(self):
257         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
258
259
260 class FilterOps(unittest.TestResult):
261
262     def __init__(self, *args, **kwargs):
263         super(FilterOps, self).__init__(*args, **kwargs)
264         self.failfast = False
265
266     def control_msg(self, msg):
267         pass # We regenerate control messages, so ignore this
268
269     def time(self, time):
270         self._ops.time(time)
271
272     def progress(self, delta, whence):
273         self._ops.progress(delta, whence)
274
275     def output_msg(self, msg):
276         if self.output is None:
277             sys.stdout.write(msg)
278         else:
279             self.output+=msg
280
281     def startTest(self, test):
282         self.seen_output = True
283         test = self._add_prefix(test)
284         if self.strip_ok_output:
285            self.output = ""
286
287         self._ops.startTest(test)
288
289     def _add_prefix(self, test):
290         prefix = ""
291         suffix = ""
292         if self.prefix is not None:
293             prefix = self.prefix
294         if self.suffix is not None:
295             suffix = self.suffix
296
297         return subunit.RemotedTestCase(prefix + test.id() + suffix)
298
299     def addError(self, test, err=None):
300         test = self._add_prefix(test)
301         self.error_added+=1
302         self.total_error+=1
303         self._ops.addError(test, err)
304         self.output = None
305         if self.fail_immediately:
306             raise ImmediateFail()
307
308     def addSkip(self, test, reason=None):
309         self.seen_output = True
310         test = self._add_prefix(test)
311         self._ops.addSkip(test, reason)
312         self.output = None
313
314     def addExpectedFailure(self, test, err=None):
315         test = self._add_prefix(test)
316         self._ops.addExpectedFailure(test, err)
317         self.output = None
318
319     def addUnexpectedSuccess(self, test, err=None):
320         test = self._add_prefix(test)
321         self.uxsuccess_added+=1
322         self.total_uxsuccess+=1
323         self._ops.addUnexpectedSuccess(test, err)
324         if self.output:
325             self._ops.output_msg(self.output)
326         self.output = None
327         if self.fail_immediately:
328             raise ImmediateFail()
329
330     def addFailure(self, test, err=None):
331         test = self._add_prefix(test)
332         xfail_reason = find_in_list(self.expected_failures, test.id())
333         if xfail_reason is None:
334             xfail_reason = find_in_list(self.flapping, test.id())
335         if xfail_reason is not None:
336             self.xfail_added+=1
337             self.total_xfail+=1
338             self._ops.addExpectedFailure(test, err)
339         else:
340             self.fail_added+=1
341             self.total_fail+=1
342             self._ops.addFailure(test, err)
343             if self.output:
344                 self._ops.output_msg(self.output)
345             if self.fail_immediately:
346                 raise ImmediateFail()
347         self.output = None
348
349     def addSuccess(self, test):
350         test = self._add_prefix(test)
351         xfail_reason = find_in_list(self.expected_failures, test.id())
352         if xfail_reason is not None:
353             self.uxsuccess_added += 1
354             self.total_uxsuccess += 1
355             self._ops.addUnexpectedSuccess(test, subunit.RemoteError(xfail_reason))
356             if self.output:
357                 self._ops.output_msg(self.output)
358             if self.fail_immediately:
359                 raise ImmediateFail()
360         else:
361             self._ops.addSuccess(test)
362         self.output = None
363
364     def skip_testsuite(self, name, reason=None):
365         self._ops.skip_testsuite(name, reason)
366
367     def start_testsuite(self, name):
368         self._ops.start_testsuite(name)
369         self.error_added = 0
370         self.fail_added = 0
371         self.xfail_added = 0
372         self.uxsuccess_added = 0
373
374     def end_testsuite(self, name, result, reason=None):
375         xfail = False
376
377         if self.xfail_added > 0:
378             xfail = True
379         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
380             xfail = False
381
382         if xfail and result in ("fail", "failure"):
383             result = "xfail"
384
385         if self.uxsuccess_added > 0 and result != "uxsuccess":
386             result = "uxsuccess"
387             if reason is None:
388                 reason = "Subunit/Filter Reason"
389             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
390
391         if self.fail_added > 0 and result != "failure":
392             result = "failure"
393             if reason is None:
394                 reason = "Subunit/Filter Reason"
395             reason += "\n failures[%d]" % self.fail_added
396
397         if self.error_added > 0 and result != "error":
398             result = "error"
399             if reason is None:
400                 reason = "Subunit/Filter Reason"
401             reason += "\n errors[%d]" % self.error_added
402
403         self._ops.end_testsuite(name, result, reason)
404         if result not in ("success", "xfail"):
405             if self.output:
406                 self._ops.output_msg(self.output)
407             if self.fail_immediately:
408                 raise ImmediateFail()
409         self.output = None
410
411     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
412                  strip_ok_output=False, fail_immediately=False,
413                  flapping=None):
414         self._ops = out
415         self.seen_output = False
416         self.output = None
417         self.prefix = prefix
418         self.suffix = suffix
419         if expected_failures is not None:
420             self.expected_failures = expected_failures
421         else:
422             self.expected_failures = {}
423         if flapping is not None:
424             self.flapping = flapping
425         else:
426             self.flapping = {}
427         self.strip_ok_output = strip_ok_output
428         self.xfail_added = 0
429         self.fail_added = 0
430         self.uxsuccess_added = 0
431         self.total_xfail = 0
432         self.total_error = 0
433         self.total_fail = 0
434         self.total_uxsuccess = 0
435         self.error_added = 0
436         self.fail_immediately = fail_immediately
437
438
439 class PlainFormatter(TestsuiteEnabledTestResult):
440
441     def __init__(self, verbose, immediate, statistics,
442             totaltests=None):
443         super(PlainFormatter, self).__init__()
444         self.verbose = verbose
445         self.immediate = immediate
446         self.statistics = statistics
447         self.start_time = None
448         self.test_output = {}
449         self.suitesfailed = []
450         self.suites_ok = 0
451         self.skips = {}
452         self.index = 0
453         self.name = None
454         self._progress_level = 0
455         self.totalsuites = totaltests
456         self.last_time = None
457
458     @staticmethod
459     def _format_time(delta):
460         minutes, seconds = divmod(delta.seconds, 60)
461         hours, minutes = divmod(minutes, 60)
462         ret = ""
463         if hours:
464             ret += "%dh" % hours
465         if minutes:
466             ret += "%dm" % minutes
467         ret += "%ds" % seconds
468         return ret
469
470     def progress(self, offset, whence):
471         if whence == subunit.PROGRESS_POP:
472             self._progress_level -= 1
473         elif whence == subunit.PROGRESS_PUSH:
474             self._progress_level += 1
475         elif whence == subunit.PROGRESS_SET:
476             if self._progress_level == 0:
477                 self.totalsuites = offset
478         elif whence == subunit.PROGRESS_CUR:
479             raise NotImplementedError
480
481     def time(self, dt):
482         if self.start_time is None:
483             self.start_time = dt
484         self.last_time = dt
485
486     def start_testsuite(self, name):
487         self.index += 1
488         self.name = name
489
490         if not self.verbose:
491             self.test_output[name] = ""
492
493         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
494                        self.statistics['TESTS_EXPECTED_FAIL'] +
495                        self.statistics['TESTS_ERROR'] +
496                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
497                        self.statistics['TESTS_UNEXPECTED_OK'])
498
499         out = "[%d(%d)" % (self.index, total_tests)
500         if self.totalsuites is not None:
501             out += "/%d" % self.totalsuites
502         if self.start_time is not None:
503             out += " at " + self._format_time(self.last_time - self.start_time)
504         if self.suitesfailed:
505             out += ", %d errors" % (len(self.suitesfailed),)
506         out += "] %s" % name
507         if self.immediate:
508             sys.stdout.write(out + "\n")
509         else:
510             sys.stdout.write(out + ": ")
511
512     def output_msg(self, output):
513         if self.verbose:
514             sys.stdout.write(output)
515         elif self.name is not None:
516             self.test_output[self.name] += output
517         else:
518             sys.stdout.write(output)
519
520     def control_msg(self, output):
521         pass
522
523     def end_testsuite(self, name, result, reason):
524         out = ""
525         unexpected = False
526
527         if not name in self.test_output:
528             print "no output for name[%s]" % name
529
530         if result in ("success", "xfail"):
531             self.suites_ok+=1
532         else:
533             self.output_msg("ERROR: Testsuite[%s]\n" % name)
534             if reason is not None:
535                 self.output_msg("REASON: %s\n" % (reason,))
536             self.suitesfailed.append(name)
537             if self.immediate and not self.verbose and name in self.test_output:
538                 out += self.test_output[name]
539             unexpected = True
540
541         if not self.immediate:
542             if not unexpected:
543                 out += " ok\n"
544             else:
545                 out += " " + result.upper() + "\n"
546
547         sys.stdout.write(out)
548
549     def startTest(self, test):
550         pass
551
552     def addSuccess(self, test):
553         self.end_test(test.id(), "success", False)
554
555     def addError(self, test, err=None):
556         self.end_test(test.id(), "error", True, err)
557
558     def addFailure(self, test, err=None):
559         self.end_test(test.id(), "failure", True, err)
560
561     def addSkip(self, test, reason=None):
562         self.end_test(test.id(), "skip", False, reason)
563
564     def addExpectedFailure(self, test, err=None):
565         self.end_test(test.id(), "xfail", False, err)
566
567     def addUnexpectedSuccess(self, test):
568         self.end_test(test.id(), "uxsuccess", True)
569
570     def end_test(self, testname, result, unexpected, err=None):
571         if not unexpected:
572             self.test_output[self.name] = ""
573             if not self.immediate:
574                 sys.stdout.write({
575                     'failure': 'f',
576                     'xfail': 'X',
577                     'skip': 's',
578                     'success': '.'}.get(result, "?(%s)" % result))
579             return
580
581         if not self.name in self.test_output:
582             self.test_output[self.name] = ""
583
584         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
585         if err is not None:
586             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
587
588         if self.immediate and not self.verbose:
589             sys.stdout.write(self.test_output[self.name])
590             self.test_output[self.name] = ""
591
592         if not self.immediate:
593             sys.stdout.write({
594                'error': 'E',
595                'failure': 'F',
596                'uxsuccess': 'U',
597                'success': 'S'}.get(result, "?"))
598
599     def write_summary(self, path):
600         f = open(path, 'w+')
601
602         if self.suitesfailed:
603             f.write("= Failed tests =\n")
604
605             for suite in self.suitesfailed:
606                 f.write("== %s ==\n" % suite)
607                 if suite in self.test_output:
608                     f.write(self.test_output[suite]+"\n\n")
609
610             f.write("\n")
611
612         if not self.immediate and not self.verbose:
613             for suite in self.suitesfailed:
614                 print "=" * 78
615                 print "FAIL: %s" % suite
616                 if suite in self.test_output:
617                     print self.test_output[suite]
618                 print ""
619
620         f.write("= Skipped tests =\n")
621         for reason in self.skips.keys():
622             f.write(reason + "\n")
623             for name in self.skips[reason]:
624                 f.write("\t%s\n" % name)
625             f.write("\n")
626         f.close()
627
628         if (not self.suitesfailed and
629             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
630             not self.statistics['TESTS_UNEXPECTED_OK'] and
631             not self.statistics['TESTS_ERROR']):
632             ok = (self.statistics['TESTS_EXPECTED_OK'] +
633                   self.statistics['TESTS_EXPECTED_FAIL'])
634             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
635         else:
636             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
637                 self.statistics['TESTS_UNEXPECTED_FAIL'],
638                 self.statistics['TESTS_ERROR'],
639                 self.statistics['TESTS_UNEXPECTED_OK'],
640                 len(self.suitesfailed))
641
642     def skip_testsuite(self, name, reason="UNKNOWN"):
643         self.skips.setdefault(reason, []).append(name)
644         if self.totalsuites:
645             self.totalsuites-=1