subunithelper: use set for efficient inclusion test
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 from samba import subunit
23 from samba.subunit.run import TestProtocolClient
24 from samba.subunit import iso8601
25 import unittest
26
27 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
28                      'knownfail', 'error', 'xfail', 'skip-testsuite',
29                      'testsuite-failure', 'testsuite-xfail',
30                      'testsuite-success', 'testsuite-error',
31                      'uxsuccess', 'testsuite-uxsuccess'])
32
33 class TestsuiteEnabledTestResult(unittest.TestResult):
34
35     def start_testsuite(self, name):
36         raise NotImplementedError(self.start_testsuite)
37
38
39 def parse_results(msg_ops, statistics, fh):
40     exitcode = 0
41     open_tests = {}
42
43     while fh:
44         l = fh.readline()
45         if l == "":
46             break
47         parts = l.split(None, 1)
48         if not len(parts) == 2 or not l.startswith(parts[0]):
49             msg_ops.output_msg(l)
50             continue
51         command = parts[0].rstrip(":")
52         arg = parts[1]
53         if command in ("test", "testing"):
54             msg_ops.control_msg(l)
55             name = arg.rstrip()
56             test = subunit.RemotedTestCase(name)
57             if name in open_tests:
58                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
59             msg_ops.startTest(test)
60             open_tests[name] = test
61         elif command == "time":
62             msg_ops.control_msg(l)
63             try:
64                 dt = iso8601.parse_date(arg.rstrip("\n"))
65             except TypeError, e:
66                 print "Unable to parse time line: %s" % arg.rstrip("\n")
67             else:
68                 msg_ops.time(dt)
69         elif command in VALID_RESULTS:
70             msg_ops.control_msg(l)
71             result = command
72             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
73             (testname, hasreason) = (grp.group(1), grp.group(2))
74             if hasreason:
75                 reason = ""
76                 # reason may be specified in next lines
77                 terminated = False
78                 while fh:
79                     l = fh.readline()
80                     if l == "":
81                         break
82                     msg_ops.control_msg(l)
83                     if l == "]\n":
84                         terminated = True
85                         break
86                     else:
87                         reason += l
88
89                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
90
91                 if not terminated:
92                     statistics['TESTS_ERROR']+=1
93                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
94                     return 1
95             else:
96                 reason = None
97                 remote_error = subunit.RemoteError(u"No reason specified")
98             if result in ("success", "successful"):
99                 try:
100                     test = open_tests.pop(testname)
101                 except KeyError:
102                     statistics['TESTS_ERROR']+=1
103                     exitcode = 1
104                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
105                 else:
106                     statistics['TESTS_EXPECTED_OK']+=1
107                     msg_ops.addSuccess(test)
108             elif result in ("xfail", "knownfail"):
109                 try:
110                     test = open_tests.pop(testname)
111                 except KeyError:
112                     statistics['TESTS_ERROR']+=1
113                     exitcode = 1
114                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
115                 else:
116                     statistics['TESTS_EXPECTED_FAIL']+=1
117                     msg_ops.addExpectedFailure(test, remote_error)
118             elif result in ("uxsuccess", ):
119                 try:
120                     test = open_tests.pop(testname)
121                 except KeyError:
122                     statistics['TESTS_ERROR']+=1
123                     exitcode = 1
124                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
125                 else:
126                     statistics['TESTS_UNEXPECTED_OK']+=1
127                     msg_ops.addUnexpectedSuccess(test)
128                     exitcode = 1
129             elif result in ("failure", "fail"):
130                 try:
131                     test = open_tests.pop(testname)
132                 except KeyError:
133                     statistics['TESTS_ERROR']+=1
134                     exitcode = 1
135                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
136                 else:
137                     statistics['TESTS_UNEXPECTED_FAIL']+=1
138                     exitcode = 1
139                     msg_ops.addFailure(test, remote_error)
140             elif result == "skip":
141                 statistics['TESTS_SKIP']+=1
142                 # Allow tests to be skipped without prior announcement of test
143                 try:
144                     test = open_tests.pop(testname)
145                 except KeyError:
146                     test = subunit.RemotedTestCase(testname)
147                 msg_ops.addSkip(test, reason)
148             elif result == "error":
149                 statistics['TESTS_ERROR']+=1
150                 exitcode = 1
151                 try:
152                     test = open_tests.pop(testname)
153                 except KeyError:
154                     test = subunit.RemotedTestCase(testname)
155                 msg_ops.addError(test, remote_error)
156             elif result == "skip-testsuite":
157                 msg_ops.skip_testsuite(testname)
158             elif result == "testsuite-success":
159                 msg_ops.end_testsuite(testname, "success", reason)
160             elif result == "testsuite-failure":
161                 msg_ops.end_testsuite(testname, "failure", reason)
162                 exitcode = 1
163             elif result == "testsuite-xfail":
164                 msg_ops.end_testsuite(testname, "xfail", reason)
165             elif result == "testsuite-uxsuccess":
166                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
167                 exitcode = 1
168             elif result == "testsuite-error":
169                 msg_ops.end_testsuite(testname, "error", reason)
170                 exitcode = 1
171             else:
172                 raise AssertionError("Recognized but unhandled result %r" %
173                     result)
174         elif command == "testsuite":
175             msg_ops.start_testsuite(arg.strip())
176         elif command == "progress":
177             arg = arg.strip()
178             if arg == "pop":
179                 msg_ops.progress(None, subunit.PROGRESS_POP)
180             elif arg == "push":
181                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
182             elif arg[0] in '+-':
183                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
184             else:
185                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
186         else:
187             msg_ops.output_msg(l)
188
189     while open_tests:
190         test = subunit.RemotedTestCase(open_tests.popitem()[1])
191         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
192         statistics['TESTS_ERROR']+=1
193         exitcode = 1
194
195     return exitcode
196
197
198 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
199
200     def progress(self, count, whence):
201         if whence == subunit.PROGRESS_POP:
202             self._stream.write("progress: pop\n")
203         elif whence == subunit.PROGRESS_PUSH:
204             self._stream.write("progress: push\n")
205         elif whence == subunit.PROGRESS_SET:
206             self._stream.write("progress: %d\n" % count)
207         elif whence == subunit.PROGRESS_CUR:
208             raise NotImplementedError
209
210     # The following are Samba extensions:
211     def start_testsuite(self, name):
212         self._stream.write("testsuite: %s\n" % name)
213
214     def skip_testsuite(self, name, reason=None):
215         if reason:
216             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
217         else:
218             self._stream.write("skip-testsuite: %s\n" % name)
219
220     def end_testsuite(self, name, result, reason=None):
221         if reason:
222             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
223         else:
224             self._stream.write("testsuite-%s: %s\n" % (result, name))
225
226     def output_msg(self, msg):
227         self._stream.write(msg)
228
229
230 def read_test_regexes(name):
231     ret = {}
232     f = open(name, 'r')
233     try:
234         for l in f:
235             l = l.strip()
236             if l == "" or l[0] == "#":
237                 continue
238             if "#" in l:
239                 (regex, reason) = l.split("#", 1)
240                 ret[regex.strip()] = reason.strip()
241             else:
242                 ret[l] = None
243     finally:
244         f.close()
245     return ret
246
247
248 def find_in_list(regexes, fullname):
249     for regex, reason in regexes.iteritems():
250         if re.match(regex, fullname):
251             if reason is None:
252                 return ""
253             return reason
254     return None
255
256
257 class ImmediateFail(Exception):
258     """Raised to abort immediately."""
259
260     def __init__(self):
261         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
262
263
264 class FilterOps(unittest.TestResult):
265
266     def control_msg(self, msg):
267         pass # We regenerate control messages, so ignore this
268
269     def time(self, time):
270         self._ops.time(time)
271
272     def progress(self, delta, whence):
273         self._ops.progress(delta, whence)
274
275     def output_msg(self, msg):
276         if self.output is None:
277             sys.stdout.write(msg)
278         else:
279             self.output+=msg
280
281     def startTest(self, test):
282         self.seen_output = True
283         test = self._add_prefix(test)
284         if self.strip_ok_output:
285            self.output = ""
286
287         self._ops.startTest(test)
288
289     def _add_prefix(self, test):
290         prefix = ""
291         suffix = ""
292         if self.prefix is not None:
293             prefix = self.prefix
294         if self.suffix is not None:
295             suffix = self.suffix
296
297         return subunit.RemotedTestCase(prefix + test.id() + suffix)
298
299     def addError(self, test, err=None):
300         test = self._add_prefix(test)
301         self.error_added+=1
302         self.total_error+=1
303         self._ops.addError(test, err)
304         self.output = None
305         if self.fail_immediately:
306             raise ImmediateFail()
307
308     def addSkip(self, test, reason=None):
309         self.seen_output = True
310         test = self._add_prefix(test)
311         self._ops.addSkip(test, reason)
312         self.output = None
313
314     def addExpectedFailure(self, test, err=None):
315         test = self._add_prefix(test)
316         self._ops.addExpectedFailure(test, err)
317         self.output = None
318
319     def addUnexpectedSuccess(self, test):
320         test = self._add_prefix(test)
321         self.uxsuccess_added+=1
322         self.total_uxsuccess+=1
323         self._ops.addUnexpectedSuccess(test)
324         if self.output:
325             self._ops.output_msg(self.output)
326         self.output = None
327         if self.fail_immediately:
328             raise ImmediateFail()
329
330     def addFailure(self, test, err=None):
331         test = self._add_prefix(test)
332         xfail_reason = find_in_list(self.expected_failures, test.id())
333         if xfail_reason is None:
334             xfail_reason = find_in_list(self.flapping, test.id())
335         if xfail_reason is not None:
336             self.xfail_added+=1
337             self.total_xfail+=1
338             self._ops.addExpectedFailure(test, err)
339         else:
340             self.fail_added+=1
341             self.total_fail+=1
342             self._ops.addFailure(test, err)
343             if self.output:
344                 self._ops.output_msg(self.output)
345             if self.fail_immediately:
346                 raise ImmediateFail()
347         self.output = None
348
349     def addSuccess(self, test):
350         test = self._add_prefix(test)
351         xfail_reason = find_in_list(self.expected_failures, test.id())
352         if xfail_reason is not None:
353             self.uxsuccess_added += 1
354             self.total_uxsuccess += 1
355             self._ops.addUnexpectedSuccess(test)
356             if self.output:
357                 self._ops.output_msg(self.output)
358             if self.fail_immediately:
359                 raise ImmediateFail()
360         else:
361             self._ops.addSuccess(test)
362         self.output = None
363
364     def skip_testsuite(self, name, reason=None):
365         self._ops.skip_testsuite(name, reason)
366
367     def start_testsuite(self, name):
368         self._ops.start_testsuite(name)
369         self.error_added = 0
370         self.fail_added = 0
371         self.xfail_added = 0
372         self.uxsuccess_added = 0
373
374     def end_testsuite(self, name, result, reason=None):
375         xfail = False
376
377         if self.xfail_added > 0:
378             xfail = True
379         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
380             xfail = False
381
382         if xfail and result in ("fail", "failure"):
383             result = "xfail"
384
385         if self.uxsuccess_added > 0 and result != "uxsuccess":
386             result = "uxsuccess"
387             if reason is None:
388                 reason = "Subunit/Filter Reason"
389             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
390
391         if self.fail_added > 0 and result != "failure":
392             result = "failure"
393             if reason is None:
394                 reason = "Subunit/Filter Reason"
395             reason += "\n failures[%d]" % self.fail_added
396
397         if self.error_added > 0 and result != "error":
398             result = "error"
399             if reason is None:
400                 reason = "Subunit/Filter Reason"
401             reason += "\n errors[%d]" % self.error_added
402
403         self._ops.end_testsuite(name, result, reason)
404         if result not in ("success", "xfail"):
405             if self.output:
406                 self._ops.output_msg(self.output)
407             if self.fail_immediately:
408                 raise ImmediateFail()
409         self.output = None
410
411     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
412                  strip_ok_output=False, fail_immediately=False,
413                  flapping=None):
414         self._ops = out
415         self.seen_output = False
416         self.output = None
417         self.prefix = prefix
418         self.suffix = suffix
419         if expected_failures is not None:
420             self.expected_failures = expected_failures
421         else:
422             self.expected_failures = {}
423         if flapping is not None:
424             self.flapping = flapping
425         else:
426             self.flapping = {}
427         self.strip_ok_output = strip_ok_output
428         self.xfail_added = 0
429         self.fail_added = 0
430         self.uxsuccess_added = 0
431         self.total_xfail = 0
432         self.total_error = 0
433         self.total_fail = 0
434         self.total_uxsuccess = 0
435         self.error_added = 0
436         self.fail_immediately = fail_immediately
437
438
439 class PlainFormatter(TestsuiteEnabledTestResult):
440
441     def __init__(self, verbose, immediate, statistics,
442             totaltests=None):
443         super(PlainFormatter, self).__init__()
444         self.verbose = verbose
445         self.immediate = immediate
446         self.statistics = statistics
447         self.start_time = None
448         self.test_output = {}
449         self.suitesfailed = []
450         self.suites_ok = 0
451         self.skips = {}
452         self.index = 0
453         self.name = None
454         self._progress_level = 0
455         self.totalsuites = totaltests
456         self.last_time = None
457
458     @staticmethod
459     def _format_time(delta):
460         minutes, seconds = divmod(delta.seconds, 60)
461         hours, minutes = divmod(minutes, 60)
462         ret = ""
463         if hours:
464             ret += "%dh" % hours
465         if minutes:
466             ret += "%dm" % minutes
467         ret += "%ds" % seconds
468         return ret
469
470     def progress(self, offset, whence):
471         if whence == subunit.PROGRESS_POP:
472             self._progress_level -= 1
473         elif whence == subunit.PROGRESS_PUSH:
474             self._progress_level += 1
475         elif whence == subunit.PROGRESS_SET:
476             if self._progress_level == 0:
477                 self.totalsuites = offset
478         elif whence == subunit.PROGRESS_CUR:
479             raise NotImplementedError
480
481     def time(self, dt):
482         if self.start_time is None:
483             self.start_time = dt
484         self.last_time = dt
485
486     def start_testsuite(self, name):
487         self.index += 1
488         self.name = name
489
490         if not self.verbose:
491             self.test_output[name] = ""
492
493         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
494                        self.statistics['TESTS_EXPECTED_FAIL'] +
495                        self.statistics['TESTS_ERROR'] +
496                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
497                        self.statistics['TESTS_UNEXPECTED_OK'])
498
499         out = "[%d(%d)" % (self.index, total_tests)
500         if self.totalsuites is not None:
501             out += "/%d" % self.totalsuites
502         if self.start_time is not None:
503             out += " at " + self._format_time(self.last_time - self.start_time)
504         if self.suitesfailed:
505             out += ", %d errors" % (len(self.suitesfailed),)
506         out += "] %s" % name
507         if self.immediate:
508             sys.stdout.write(out + "\n")
509         else:
510             sys.stdout.write(out + ": ")
511
512     def output_msg(self, output):
513         if self.verbose:
514             sys.stdout.write(output)
515         elif self.name is not None:
516             self.test_output[self.name] += output
517         else:
518             sys.stdout.write(output)
519
520     def control_msg(self, output):
521         pass
522
523     def end_testsuite(self, name, result, reason):
524         out = ""
525         unexpected = False
526
527         if not name in self.test_output:
528             print "no output for name[%s]" % name
529
530         if result in ("success", "xfail"):
531             self.suites_ok+=1
532         else:
533             self.output_msg("ERROR: Testsuite[%s]\n" % name)
534             if reason is not None:
535                 self.output_msg("REASON: %s\n" % (reason,))
536             self.suitesfailed.append(name)
537             if self.immediate and not self.verbose and name in self.test_output:
538                 out += self.test_output[name]
539             unexpected = True
540
541         if not self.immediate:
542             if not unexpected:
543                 out += " ok\n"
544             else:
545                 out += " " + result.upper() + "\n"
546
547         sys.stdout.write(out)
548
549     def startTest(self, test):
550         pass
551
552     def addSuccess(self, test):
553         self.end_test(test.id(), "success", False)
554
555     def addError(self, test, err=None):
556         self.end_test(test.id(), "error", True, err)
557
558     def addFailure(self, test, err=None):
559         self.end_test(test.id(), "failure", True, err)
560
561     def addSkip(self, test, reason=None):
562         self.end_test(test.id(), "skip", False, reason)
563
564     def addExpectedFailure(self, test, err=None):
565         self.end_test(test.id(), "xfail", False, err)
566
567     def addUnexpectedSuccess(self, test):
568         self.end_test(test.id(), "uxsuccess", True)
569
570     def end_test(self, testname, result, unexpected, err=None):
571         if not unexpected:
572             self.test_output[self.name] = ""
573             if not self.immediate:
574                 sys.stdout.write({
575                     'failure': 'f',
576                     'xfail': 'X',
577                     'skip': 's',
578                     'success': '.'}.get(result, "?(%s)" % result))
579             return
580
581         if not self.name in self.test_output:
582             self.test_output[self.name] = ""
583
584         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
585         if err is not None:
586             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
587
588         if self.immediate and not self.verbose:
589             sys.stdout.write(self.test_output[self.name])
590             self.test_output[self.name] = ""
591
592         if not self.immediate:
593             sys.stdout.write({
594                'error': 'E',
595                'failure': 'F',
596                'uxsuccess': 'U',
597                'success': 'S'}.get(result, "?"))
598
599     def write_summary(self, path):
600         f = open(path, 'w+')
601
602         if self.suitesfailed:
603             f.write("= Failed tests =\n")
604
605             for suite in self.suitesfailed:
606                 f.write("== %s ==\n" % suite)
607                 if suite in self.test_output:
608                     f.write(self.test_output[suite]+"\n\n")
609
610             f.write("\n")
611
612         if not self.immediate and not self.verbose:
613             for suite in self.suitesfailed:
614                 print "=" * 78
615                 print "FAIL: %s" % suite
616                 if suite in self.test_output:
617                     print self.test_output[suite]
618                 print ""
619
620         f.write("= Skipped tests =\n")
621         for reason in self.skips.keys():
622             f.write(reason + "\n")
623             for name in self.skips[reason]:
624                 f.write("\t%s\n" % name)
625             f.write("\n")
626         f.close()
627
628         if (not self.suitesfailed and
629             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
630             not self.statistics['TESTS_UNEXPECTED_OK'] and
631             not self.statistics['TESTS_ERROR']):
632             ok = (self.statistics['TESTS_EXPECTED_OK'] +
633                   self.statistics['TESTS_EXPECTED_FAIL'])
634             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
635         else:
636             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
637                 self.statistics['TESTS_UNEXPECTED_FAIL'],
638                 self.statistics['TESTS_ERROR'],
639                 self.statistics['TESTS_UNEXPECTED_OK'],
640                 len(self.suitesfailed))
641
642     def skip_testsuite(self, name, reason="UNKNOWN"):
643         self.skips.setdefault(reason, []).append(name)
644         if self.totalsuites:
645             self.totalsuites-=1