c17036defbab6a24925d753bfeae7485257da4b9
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import datetime
21 import re
22 import sys
23 from samba import subunit
24 from samba.subunit.run import TestProtocolClient
25 from samba.subunit import iso8601
26 import unittest
27
28 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
29                      'knownfail', 'error', 'xfail', 'skip-testsuite',
30                      'testsuite-failure', 'testsuite-xfail',
31                      'testsuite-success', 'testsuite-error',
32                      'uxsuccess', 'testsuite-uxsuccess'])
33
34 class TestsuiteEnabledTestResult(unittest.TestResult):
35
36     def start_testsuite(self, name):
37         raise NotImplementedError(self.start_testsuite)
38
39
40 def parse_results(msg_ops, statistics, fh):
41     exitcode = 0
42     open_tests = {}
43
44     while fh:
45         l = fh.readline()
46         if l == "":
47             break
48         parts = l.split(None, 1)
49         if not len(parts) == 2 or not l.startswith(parts[0]):
50             msg_ops.output_msg(l)
51             continue
52         command = parts[0].rstrip(":")
53         arg = parts[1]
54         if command in ("test", "testing"):
55             msg_ops.control_msg(l)
56             name = arg.rstrip()
57             test = subunit.RemotedTestCase(name)
58             if name in open_tests:
59                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
60             msg_ops.startTest(test)
61             open_tests[name] = test
62         elif command == "time":
63             msg_ops.control_msg(l)
64             try:
65                 dt = iso8601.parse_date(arg.rstrip("\n"))
66             except TypeError, e:
67                 print "Unable to parse time line: %s" % arg.rstrip("\n")
68             else:
69                 msg_ops.time(dt)
70         elif command in VALID_RESULTS:
71             msg_ops.control_msg(l)
72             result = command
73             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
74             (testname, hasreason) = (grp.group(1), grp.group(2))
75             if hasreason:
76                 reason = ""
77                 # reason may be specified in next lines
78                 terminated = False
79                 while fh:
80                     l = fh.readline()
81                     if l == "":
82                         break
83                     msg_ops.control_msg(l)
84                     if l == "]\n":
85                         terminated = True
86                         break
87                     else:
88                         reason += l
89
90                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
91
92                 if not terminated:
93                     statistics['TESTS_ERROR']+=1
94                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
95                     return 1
96             else:
97                 reason = None
98                 remote_error = subunit.RemoteError(u"No reason specified")
99             if result in ("success", "successful"):
100                 try:
101                     test = open_tests.pop(testname)
102                 except KeyError:
103                     statistics['TESTS_ERROR']+=1
104                     exitcode = 1
105                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
106                 else:
107                     statistics['TESTS_EXPECTED_OK']+=1
108                     msg_ops.addSuccess(test)
109             elif result in ("xfail", "knownfail"):
110                 try:
111                     test = open_tests.pop(testname)
112                 except KeyError:
113                     statistics['TESTS_ERROR']+=1
114                     exitcode = 1
115                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
116                 else:
117                     statistics['TESTS_EXPECTED_FAIL']+=1
118                     msg_ops.addExpectedFailure(test, remote_error)
119             elif result in ("uxsuccess", ):
120                 try:
121                     test = open_tests.pop(testname)
122                 except KeyError:
123                     statistics['TESTS_ERROR']+=1
124                     exitcode = 1
125                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
126                 else:
127                     statistics['TESTS_UNEXPECTED_OK']+=1
128                     msg_ops.addUnexpectedSuccess(test)
129                     exitcode = 1
130             elif result in ("failure", "fail"):
131                 try:
132                     test = open_tests.pop(testname)
133                 except KeyError:
134                     statistics['TESTS_ERROR']+=1
135                     exitcode = 1
136                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
137                 else:
138                     statistics['TESTS_UNEXPECTED_FAIL']+=1
139                     exitcode = 1
140                     msg_ops.addFailure(test, remote_error)
141             elif result == "skip":
142                 statistics['TESTS_SKIP']+=1
143                 # Allow tests to be skipped without prior announcement of test
144                 try:
145                     test = open_tests.pop(testname)
146                 except KeyError:
147                     test = subunit.RemotedTestCase(testname)
148                 msg_ops.addSkip(test, reason)
149             elif result == "error":
150                 statistics['TESTS_ERROR']+=1
151                 exitcode = 1
152                 try:
153                     test = open_tests.pop(testname)
154                 except KeyError:
155                     test = subunit.RemotedTestCase(testname)
156                 msg_ops.addError(test, remote_error)
157             elif result == "skip-testsuite":
158                 msg_ops.skip_testsuite(testname)
159             elif result == "testsuite-success":
160                 msg_ops.end_testsuite(testname, "success", reason)
161             elif result == "testsuite-failure":
162                 msg_ops.end_testsuite(testname, "failure", reason)
163                 exitcode = 1
164             elif result == "testsuite-xfail":
165                 msg_ops.end_testsuite(testname, "xfail", reason)
166             elif result == "testsuite-uxsuccess":
167                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
168                 exitcode = 1
169             elif result == "testsuite-error":
170                 msg_ops.end_testsuite(testname, "error", reason)
171                 exitcode = 1
172             else:
173                 raise AssertionError("Recognized but unhandled result %r" %
174                     result)
175         elif command == "testsuite":
176             msg_ops.start_testsuite(arg.strip())
177         elif command == "progress":
178             arg = arg.strip()
179             if arg == "pop":
180                 msg_ops.progress(None, subunit.PROGRESS_POP)
181             elif arg == "push":
182                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
183             elif arg[0] in '+-':
184                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
185             else:
186                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
187         else:
188             msg_ops.output_msg(l)
189
190     while open_tests:
191         test = subunit.RemotedTestCase(open_tests.popitem()[1])
192         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
193         statistics['TESTS_ERROR']+=1
194         exitcode = 1
195
196     return exitcode
197
198
199 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
200
201     def progress(self, count, whence):
202         if whence == subunit.PROGRESS_POP:
203             self._stream.write("progress: pop\n")
204         elif whence == subunit.PROGRESS_PUSH:
205             self._stream.write("progress: push\n")
206         elif whence == subunit.PROGRESS_SET:
207             self._stream.write("progress: %d\n" % count)
208         elif whence == subunit.PROGRESS_CUR:
209             raise NotImplementedError
210
211     # The following are Samba extensions:
212     def start_testsuite(self, name):
213         self._stream.write("testsuite: %s\n" % name)
214
215     def skip_testsuite(self, name, reason=None):
216         if reason:
217             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
218         else:
219             self._stream.write("skip-testsuite: %s\n" % name)
220
221     def end_testsuite(self, name, result, reason=None):
222         if reason:
223             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
224         else:
225             self._stream.write("testsuite-%s: %s\n" % (result, name))
226
227     def output_msg(self, msg):
228         self._stream.write(msg)
229
230
231 def read_test_regexes(name):
232     ret = {}
233     f = open(name, 'r')
234     try:
235         for l in f:
236             l = l.strip()
237             if l == "" or l[0] == "#":
238                 continue
239             if "#" in l:
240                 (regex, reason) = l.split("#", 1)
241                 ret[regex.strip()] = reason.strip()
242             else:
243                 ret[l] = None
244     finally:
245         f.close()
246     return ret
247
248
249 def find_in_list(regexes, fullname):
250     for regex, reason in regexes.iteritems():
251         if re.match(regex, fullname):
252             if reason is None:
253                 return ""
254             return reason
255     return None
256
257
258 class ImmediateFail(Exception):
259     """Raised to abort immediately."""
260
261     def __init__(self):
262         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
263
264
265 class FilterOps(unittest.TestResult):
266
267     def control_msg(self, msg):
268         pass # We regenerate control messages, so ignore this
269
270     def time(self, time):
271         self._ops.time(time)
272
273     def progress(self, delta, whence):
274         self._ops.progress(delta, whence)
275
276     def output_msg(self, msg):
277         if self.output is None:
278             sys.stdout.write(msg)
279         else:
280             self.output+=msg
281
282     def startTest(self, test):
283         self.seen_output = True
284         test = self._add_prefix(test)
285         if self.strip_ok_output:
286            self.output = ""
287
288         self._ops.startTest(test)
289
290     def _add_prefix(self, test):
291         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
292
293     def addError(self, test, err=None):
294         test = self._add_prefix(test)
295         self.error_added+=1
296         self.total_error+=1
297         self._ops.addError(test, err)
298         self.output = None
299         if self.fail_immediately:
300             raise ImmediateFail()
301
302     def addSkip(self, test, reason=None):
303         self.seen_output = True
304         test = self._add_prefix(test)
305         self._ops.addSkip(test, reason)
306         self.output = None
307
308     def addExpectedFailure(self, test, err=None):
309         test = self._add_prefix(test)
310         self._ops.addExpectedFailure(test, err)
311         self.output = None
312
313     def addUnexpectedSuccess(self, test):
314         test = self._add_prefix(test)
315         self.uxsuccess_added+=1
316         self.total_uxsuccess+=1
317         self._ops.addUnexpectedSuccess(test)
318         if self.output:
319             self._ops.output_msg(self.output)
320         self.output = None
321         if self.fail_immediately:
322             raise ImmediateFail()
323
324     def addFailure(self, test, err=None):
325         test = self._add_prefix(test)
326         xfail_reason = find_in_list(self.expected_failures, test.id())
327         if xfail_reason is None:
328             xfail_reason = find_in_list(self.flapping, test.id())
329         if xfail_reason is not None:
330             self.xfail_added+=1
331             self.total_xfail+=1
332             self._ops.addExpectedFailure(test, err)
333         else:
334             self.fail_added+=1
335             self.total_fail+=1
336             self._ops.addFailure(test, err)
337             if self.output:
338                 self._ops.output_msg(self.output)
339             if self.fail_immediately:
340                 raise ImmediateFail()
341         self.output = None
342
343     def addSuccess(self, test):
344         test = self._add_prefix(test)
345         xfail_reason = find_in_list(self.expected_failures, test.id())
346         if xfail_reason is not None:
347             self.uxsuccess_added += 1
348             self.total_uxsuccess += 1
349             self._ops.addUnexpectedSuccess(test)
350             if self.output:
351                 self._ops.output_msg(self.output)
352             if self.fail_immediately:
353                 raise ImmediateFail()
354         else:
355             self._ops.addSuccess(test)
356         self.output = None
357
358     def skip_testsuite(self, name, reason=None):
359         self._ops.skip_testsuite(name, reason)
360
361     def start_testsuite(self, name):
362         self._ops.start_testsuite(name)
363         self.error_added = 0
364         self.fail_added = 0
365         self.xfail_added = 0
366         self.uxsuccess_added = 0
367
368     def end_testsuite(self, name, result, reason=None):
369         xfail = False
370
371         if self.xfail_added > 0:
372             xfail = True
373         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
374             xfail = False
375
376         if xfail and result in ("fail", "failure"):
377             result = "xfail"
378
379         if self.uxsuccess_added > 0 and result != "uxsuccess":
380             result = "uxsuccess"
381             if reason is None:
382                 reason = "Subunit/Filter Reason"
383             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
384
385         if self.fail_added > 0 and result != "failure":
386             result = "failure"
387             if reason is None:
388                 reason = "Subunit/Filter Reason"
389             reason += "\n failures[%d]" % self.fail_added
390
391         if self.error_added > 0 and result != "error":
392             result = "error"
393             if reason is None:
394                 reason = "Subunit/Filter Reason"
395             reason += "\n errors[%d]" % self.error_added
396
397         self._ops.end_testsuite(name, result, reason)
398         if result not in ("success", "xfail"):
399             if self.output:
400                 self._ops.output_msg(self.output)
401             if self.fail_immediately:
402                 raise ImmediateFail()
403         self.output = None
404
405     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
406                  strip_ok_output=False, fail_immediately=False,
407                  flapping=None):
408         self._ops = out
409         self.seen_output = False
410         self.output = None
411         self.prefix = prefix
412         self.suffix = suffix
413         if expected_failures is not None:
414             self.expected_failures = expected_failures
415         else:
416             self.expected_failures = {}
417         if flapping is not None:
418             self.flapping = flapping
419         else:
420             self.flapping = {}
421         self.strip_ok_output = strip_ok_output
422         self.xfail_added = 0
423         self.fail_added = 0
424         self.uxsuccess_added = 0
425         self.total_xfail = 0
426         self.total_error = 0
427         self.total_fail = 0
428         self.total_uxsuccess = 0
429         self.error_added = 0
430         self.fail_immediately = fail_immediately
431
432
433 class PerfFilterOps(unittest.TestResult):
434
435     def progress(self, delta, whence):
436         pass
437
438     def output_msg(self, msg):
439         pass
440
441     def control_msg(self, msg):
442         pass
443
444     def skip_testsuite(self, name, reason=None):
445         self._ops.skip_testsuite(name, reason)
446
447     def start_testsuite(self, name):
448         self.suite_has_time = False
449
450     def end_testsuite(self, name, result, reason=None):
451         pass
452
453     def _add_prefix(self, test):
454         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
455
456     def time(self, time):
457         self.latest_time = time
458         #self._ops.output_msg("found time %s\n" % time)
459         self.suite_has_time = True
460
461     def get_time(self):
462         if self.suite_has_time:
463             return self.latest_time
464         return datetime.datetime.utcnow()
465
466     def startTest(self, test):
467         self.seen_output = True
468         test = self._add_prefix(test)
469         self.starts[test.id()] = self.get_time()
470
471     def addSuccess(self, test):
472         test = self._add_prefix(test)
473         tid = test.id()
474         if tid not in self.starts:
475             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
476         delta = self.get_time() - self.starts[tid]
477         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
478
479     def addFailure(self, test, err=''):
480         tid = test.id()
481         delta = self.get_time() - self.starts[tid]
482         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
483                              (tid, delta.total_seconds(), err))
484
485     def addError(self, test, err=''):
486         tid = test.id()
487         delta = self.get_time() - self.starts[tid]
488         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
489                              (tid, delta.total_seconds(), err))
490
491     def __init__(self, out, prefix='', suffix=''):
492         self._ops = out
493         self.prefix = prefix or ''
494         self.suffix = suffix or ''
495         self.starts = {}
496         self.seen_output = False
497         self.suite_has_time = False
498
499
500 class PlainFormatter(TestsuiteEnabledTestResult):
501
502     def __init__(self, verbose, immediate, statistics,
503             totaltests=None):
504         super(PlainFormatter, self).__init__()
505         self.verbose = verbose
506         self.immediate = immediate
507         self.statistics = statistics
508         self.start_time = None
509         self.test_output = {}
510         self.suitesfailed = []
511         self.suites_ok = 0
512         self.skips = {}
513         self.index = 0
514         self.name = None
515         self._progress_level = 0
516         self.totalsuites = totaltests
517         self.last_time = None
518
519     @staticmethod
520     def _format_time(delta):
521         minutes, seconds = divmod(delta.seconds, 60)
522         hours, minutes = divmod(minutes, 60)
523         ret = ""
524         if hours:
525             ret += "%dh" % hours
526         if minutes:
527             ret += "%dm" % minutes
528         ret += "%ds" % seconds
529         return ret
530
531     def progress(self, offset, whence):
532         if whence == subunit.PROGRESS_POP:
533             self._progress_level -= 1
534         elif whence == subunit.PROGRESS_PUSH:
535             self._progress_level += 1
536         elif whence == subunit.PROGRESS_SET:
537             if self._progress_level == 0:
538                 self.totalsuites = offset
539         elif whence == subunit.PROGRESS_CUR:
540             raise NotImplementedError
541
542     def time(self, dt):
543         if self.start_time is None:
544             self.start_time = dt
545         self.last_time = dt
546
547     def start_testsuite(self, name):
548         self.index += 1
549         self.name = name
550
551         if not self.verbose:
552             self.test_output[name] = ""
553
554         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
555                        self.statistics['TESTS_EXPECTED_FAIL'] +
556                        self.statistics['TESTS_ERROR'] +
557                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
558                        self.statistics['TESTS_UNEXPECTED_OK'])
559
560         out = "[%d(%d)" % (self.index, total_tests)
561         if self.totalsuites is not None:
562             out += "/%d" % self.totalsuites
563         if self.start_time is not None:
564             out += " at " + self._format_time(self.last_time - self.start_time)
565         if self.suitesfailed:
566             out += ", %d errors" % (len(self.suitesfailed),)
567         out += "] %s" % name
568         if self.immediate:
569             sys.stdout.write(out + "\n")
570         else:
571             sys.stdout.write(out + ": ")
572
573     def output_msg(self, output):
574         if self.verbose:
575             sys.stdout.write(output)
576         elif self.name is not None:
577             self.test_output[self.name] += output
578         else:
579             sys.stdout.write(output)
580
581     def control_msg(self, output):
582         pass
583
584     def end_testsuite(self, name, result, reason):
585         out = ""
586         unexpected = False
587
588         if not name in self.test_output:
589             print "no output for name[%s]" % name
590
591         if result in ("success", "xfail"):
592             self.suites_ok+=1
593         else:
594             self.output_msg("ERROR: Testsuite[%s]\n" % name)
595             if reason is not None:
596                 self.output_msg("REASON: %s\n" % (reason,))
597             self.suitesfailed.append(name)
598             if self.immediate and not self.verbose and name in self.test_output:
599                 out += self.test_output[name]
600             unexpected = True
601
602         if not self.immediate:
603             if not unexpected:
604                 out += " ok\n"
605             else:
606                 out += " " + result.upper() + "\n"
607
608         sys.stdout.write(out)
609
610     def startTest(self, test):
611         pass
612
613     def addSuccess(self, test):
614         self.end_test(test.id(), "success", False)
615
616     def addError(self, test, err=None):
617         self.end_test(test.id(), "error", True, err)
618
619     def addFailure(self, test, err=None):
620         self.end_test(test.id(), "failure", True, err)
621
622     def addSkip(self, test, reason=None):
623         self.end_test(test.id(), "skip", False, reason)
624
625     def addExpectedFailure(self, test, err=None):
626         self.end_test(test.id(), "xfail", False, err)
627
628     def addUnexpectedSuccess(self, test):
629         self.end_test(test.id(), "uxsuccess", True)
630
631     def end_test(self, testname, result, unexpected, err=None):
632         if not unexpected:
633             self.test_output[self.name] = ""
634             if not self.immediate:
635                 sys.stdout.write({
636                     'failure': 'f',
637                     'xfail': 'X',
638                     'skip': 's',
639                     'success': '.'}.get(result, "?(%s)" % result))
640             return
641
642         if not self.name in self.test_output:
643             self.test_output[self.name] = ""
644
645         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
646         if err is not None:
647             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
648
649         if self.immediate and not self.verbose:
650             sys.stdout.write(self.test_output[self.name])
651             self.test_output[self.name] = ""
652
653         if not self.immediate:
654             sys.stdout.write({
655                'error': 'E',
656                'failure': 'F',
657                'uxsuccess': 'U',
658                'success': 'S'}.get(result, "?"))
659
660     def write_summary(self, path):
661         f = open(path, 'w+')
662
663         if self.suitesfailed:
664             f.write("= Failed tests =\n")
665
666             for suite in self.suitesfailed:
667                 f.write("== %s ==\n" % suite)
668                 if suite in self.test_output:
669                     f.write(self.test_output[suite]+"\n\n")
670
671             f.write("\n")
672
673         if not self.immediate and not self.verbose:
674             for suite in self.suitesfailed:
675                 print "=" * 78
676                 print "FAIL: %s" % suite
677                 if suite in self.test_output:
678                     print self.test_output[suite]
679                 print ""
680
681         f.write("= Skipped tests =\n")
682         for reason in self.skips.keys():
683             f.write(reason + "\n")
684             for name in self.skips[reason]:
685                 f.write("\t%s\n" % name)
686             f.write("\n")
687         f.close()
688
689         if (not self.suitesfailed and
690             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
691             not self.statistics['TESTS_UNEXPECTED_OK'] and
692             not self.statistics['TESTS_ERROR']):
693             ok = (self.statistics['TESTS_EXPECTED_OK'] +
694                   self.statistics['TESTS_EXPECTED_FAIL'])
695             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
696         else:
697             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
698                 self.statistics['TESTS_UNEXPECTED_FAIL'],
699                 self.statistics['TESTS_ERROR'],
700                 self.statistics['TESTS_UNEXPECTED_OK'],
701                 len(self.suitesfailed))
702
703     def skip_testsuite(self, name, reason="UNKNOWN"):
704         self.skips.setdefault(reason, []).append(name)
705         if self.totalsuites:
706             self.totalsuites-=1