selftest: subunithelper needs to follow the subunit spec more closely
[kai/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import datetime
21 import re
22 import sys
23 import os
24 from samba import subunit
25 from samba.subunit.run import TestProtocolClient
26 from samba.subunit import iso8601
27 import unittest
28
29 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
30                      'knownfail', 'error', 'xfail', 'skip-testsuite',
31                      'testsuite-failure', 'testsuite-xfail',
32                      'testsuite-success', 'testsuite-error',
33                      'uxsuccess', 'testsuite-uxsuccess'])
34
35 class TestsuiteEnabledTestResult(unittest.TestResult):
36
37     def start_testsuite(self, name):
38         raise NotImplementedError(self.start_testsuite)
39
40
41 def parse_results(msg_ops, statistics, fh):
42     exitcode = 0
43     open_tests = {}
44
45     while fh:
46         l = fh.readline()
47         if l == "":
48             break
49         parts = l.split(None, 1)
50         if not len(parts) == 2 or not l.startswith(parts[0]):
51             msg_ops.output_msg(l)
52             continue
53         command = parts[0].rstrip(":")
54         arg = parts[1]
55         if command in ("test", "testing"):
56             msg_ops.control_msg(l)
57             name = arg.rstrip()
58             test = subunit.RemotedTestCase(name)
59             if name in open_tests:
60                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
61             msg_ops.startTest(test)
62             open_tests[name] = test
63         elif command == "time":
64             msg_ops.control_msg(l)
65             try:
66                 dt = iso8601.parse_date(arg.rstrip("\n"))
67             except TypeError as e:
68                 print "Unable to parse time line: %s" % arg.rstrip("\n")
69             else:
70                 msg_ops.time(dt)
71         elif command in VALID_RESULTS:
72             msg_ops.control_msg(l)
73             result = command
74             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
75             (testname, hasreason) = (grp.group(1), grp.group(2))
76             if hasreason:
77                 reason = ""
78                 # reason may be specified in next lines
79                 terminated = False
80                 while fh:
81                     l = fh.readline()
82                     if l == "":
83                         break
84                     msg_ops.control_msg(l)
85                     if l[-2:] == "]\n":
86                         reason += l[:-2]
87                         terminated = True
88                         break
89                     else:
90                         reason += l
91
92                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
93
94                 if not terminated:
95                     statistics['TESTS_ERROR']+=1
96                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
97                     return 1
98             else:
99                 reason = None
100                 remote_error = subunit.RemoteError(u"No reason specified")
101             if result in ("success", "successful"):
102                 try:
103                     test = open_tests.pop(testname)
104                 except KeyError:
105                     statistics['TESTS_ERROR']+=1
106                     exitcode = 1
107                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
108                 else:
109                     statistics['TESTS_EXPECTED_OK']+=1
110                     msg_ops.addSuccess(test)
111             elif result in ("xfail", "knownfail"):
112                 try:
113                     test = open_tests.pop(testname)
114                 except KeyError:
115                     statistics['TESTS_ERROR']+=1
116                     exitcode = 1
117                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
118                 else:
119                     statistics['TESTS_EXPECTED_FAIL']+=1
120                     msg_ops.addExpectedFailure(test, remote_error)
121             elif result in ("uxsuccess", ):
122                 try:
123                     test = open_tests.pop(testname)
124                 except KeyError:
125                     statistics['TESTS_ERROR']+=1
126                     exitcode = 1
127                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
128                 else:
129                     statistics['TESTS_UNEXPECTED_OK']+=1
130                     msg_ops.addUnexpectedSuccess(test)
131                     exitcode = 1
132             elif result in ("failure", "fail"):
133                 try:
134                     test = open_tests.pop(testname)
135                 except KeyError:
136                     statistics['TESTS_ERROR']+=1
137                     exitcode = 1
138                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
139                 else:
140                     statistics['TESTS_UNEXPECTED_FAIL']+=1
141                     exitcode = 1
142                     msg_ops.addFailure(test, remote_error)
143             elif result == "skip":
144                 statistics['TESTS_SKIP']+=1
145                 # Allow tests to be skipped without prior announcement of test
146                 try:
147                     test = open_tests.pop(testname)
148                 except KeyError:
149                     test = subunit.RemotedTestCase(testname)
150                 msg_ops.addSkip(test, reason)
151             elif result == "error":
152                 statistics['TESTS_ERROR']+=1
153                 exitcode = 1
154                 try:
155                     test = open_tests.pop(testname)
156                 except KeyError:
157                     test = subunit.RemotedTestCase(testname)
158                 msg_ops.addError(test, remote_error)
159             elif result == "skip-testsuite":
160                 msg_ops.skip_testsuite(testname)
161             elif result == "testsuite-success":
162                 msg_ops.end_testsuite(testname, "success", reason)
163             elif result == "testsuite-failure":
164                 msg_ops.end_testsuite(testname, "failure", reason)
165                 exitcode = 1
166             elif result == "testsuite-xfail":
167                 msg_ops.end_testsuite(testname, "xfail", reason)
168             elif result == "testsuite-uxsuccess":
169                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
170                 exitcode = 1
171             elif result == "testsuite-error":
172                 msg_ops.end_testsuite(testname, "error", reason)
173                 exitcode = 1
174             else:
175                 raise AssertionError("Recognized but unhandled result %r" %
176                     result)
177         elif command == "testsuite":
178             msg_ops.start_testsuite(arg.strip())
179         elif command == "progress":
180             arg = arg.strip()
181             if arg == "pop":
182                 msg_ops.progress(None, subunit.PROGRESS_POP)
183             elif arg == "push":
184                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
185             elif arg[0] in '+-':
186                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
187             else:
188                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
189         else:
190             msg_ops.output_msg(l)
191
192     while open_tests:
193         test = subunit.RemotedTestCase(open_tests.popitem()[1])
194         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
195         statistics['TESTS_ERROR']+=1
196         exitcode = 1
197
198     return exitcode
199
200
201 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
202
203     def progress(self, count, whence):
204         if whence == subunit.PROGRESS_POP:
205             self._stream.write("progress: pop\n")
206         elif whence == subunit.PROGRESS_PUSH:
207             self._stream.write("progress: push\n")
208         elif whence == subunit.PROGRESS_SET:
209             self._stream.write("progress: %d\n" % count)
210         elif whence == subunit.PROGRESS_CUR:
211             raise NotImplementedError
212
213     # The following are Samba extensions:
214     def start_testsuite(self, name):
215         self._stream.write("testsuite: %s\n" % name)
216
217     def skip_testsuite(self, name, reason=None):
218         if reason:
219             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
220         else:
221             self._stream.write("skip-testsuite: %s\n" % name)
222
223     def end_testsuite(self, name, result, reason=None):
224         if reason:
225             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
226         else:
227             self._stream.write("testsuite-%s: %s\n" % (result, name))
228
229     def output_msg(self, msg):
230         self._stream.write(msg)
231
232
233 def read_test_regexes(*names):
234     ret = {}
235     files = []
236     for name in names:
237         # if we are given a directory, we read all the files it contains
238         # (except the ones that end with "~").
239         if os.path.isdir(name):
240             files.extend([os.path.join(name, x)
241                           for x in os.listdir(name)
242                           if x[-1] != '~'])
243         else:
244             files.append(name)
245
246     for filename in files:
247         f = open(filename, 'r')
248         try:
249             for l in f:
250                 l = l.strip()
251                 if l == "" or l[0] == "#":
252                     continue
253                 if "#" in l:
254                     (regex, reason) = l.split("#", 1)
255                     ret[regex.strip()] = reason.strip()
256                 else:
257                     ret[l] = None
258         finally:
259             f.close()
260     return ret
261
262
263 def find_in_list(regexes, fullname):
264     for regex, reason in regexes.iteritems():
265         if re.match(regex, fullname):
266             if reason is None:
267                 return ""
268             return reason
269     return None
270
271
272 class ImmediateFail(Exception):
273     """Raised to abort immediately."""
274
275     def __init__(self):
276         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
277
278
279 class FilterOps(unittest.TestResult):
280
281     def control_msg(self, msg):
282         pass # We regenerate control messages, so ignore this
283
284     def time(self, time):
285         self._ops.time(time)
286
287     def progress(self, delta, whence):
288         self._ops.progress(delta, whence)
289
290     def output_msg(self, msg):
291         if self.output is None:
292             sys.stdout.write(msg)
293         else:
294             self.output+=msg
295
296     def startTest(self, test):
297         self.seen_output = True
298         test = self._add_prefix(test)
299         if self.strip_ok_output:
300            self.output = ""
301
302         self._ops.startTest(test)
303
304     def _add_prefix(self, test):
305         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
306
307     def addError(self, test, err=None):
308         test = self._add_prefix(test)
309         self.error_added+=1
310         self.total_error+=1
311         self._ops.addError(test, err)
312         self.output = None
313         if self.fail_immediately:
314             raise ImmediateFail()
315
316     def addSkip(self, test, reason=None):
317         self.seen_output = True
318         test = self._add_prefix(test)
319         self._ops.addSkip(test, reason)
320         self.output = None
321
322     def addExpectedFailure(self, test, err=None):
323         test = self._add_prefix(test)
324         self._ops.addExpectedFailure(test, err)
325         self.output = None
326
327     def addUnexpectedSuccess(self, test):
328         test = self._add_prefix(test)
329         self.uxsuccess_added+=1
330         self.total_uxsuccess+=1
331         self._ops.addUnexpectedSuccess(test)
332         if self.output:
333             self._ops.output_msg(self.output)
334         self.output = None
335         if self.fail_immediately:
336             raise ImmediateFail()
337
338     def addFailure(self, test, err=None):
339         test = self._add_prefix(test)
340         xfail_reason = find_in_list(self.expected_failures, test.id())
341         if xfail_reason is None:
342             xfail_reason = find_in_list(self.flapping, test.id())
343         if xfail_reason is not None:
344             self.xfail_added+=1
345             self.total_xfail+=1
346             self._ops.addExpectedFailure(test, err)
347         else:
348             self.fail_added+=1
349             self.total_fail+=1
350             self._ops.addFailure(test, err)
351             if self.output:
352                 self._ops.output_msg(self.output)
353             if self.fail_immediately:
354                 raise ImmediateFail()
355         self.output = None
356
357     def addSuccess(self, test):
358         test = self._add_prefix(test)
359         xfail_reason = find_in_list(self.expected_failures, test.id())
360         if xfail_reason is not None:
361             self.uxsuccess_added += 1
362             self.total_uxsuccess += 1
363             self._ops.addUnexpectedSuccess(test)
364             if self.output:
365                 self._ops.output_msg(self.output)
366             if self.fail_immediately:
367                 raise ImmediateFail()
368         else:
369             self._ops.addSuccess(test)
370         self.output = None
371
372     def skip_testsuite(self, name, reason=None):
373         self._ops.skip_testsuite(name, reason)
374
375     def start_testsuite(self, name):
376         self._ops.start_testsuite(name)
377         self.error_added = 0
378         self.fail_added = 0
379         self.xfail_added = 0
380         self.uxsuccess_added = 0
381
382     def end_testsuite(self, name, result, reason=None):
383         xfail = False
384
385         if self.xfail_added > 0:
386             xfail = True
387         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
388             xfail = False
389
390         if xfail and result in ("fail", "failure"):
391             result = "xfail"
392
393         if self.uxsuccess_added > 0 and result != "uxsuccess":
394             result = "uxsuccess"
395             if reason is None:
396                 reason = "Subunit/Filter Reason"
397             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
398
399         if self.fail_added > 0 and result != "failure":
400             result = "failure"
401             if reason is None:
402                 reason = "Subunit/Filter Reason"
403             reason += "\n failures[%d]" % self.fail_added
404
405         if self.error_added > 0 and result != "error":
406             result = "error"
407             if reason is None:
408                 reason = "Subunit/Filter Reason"
409             reason += "\n errors[%d]" % self.error_added
410
411         self._ops.end_testsuite(name, result, reason)
412         if result not in ("success", "xfail"):
413             if self.output:
414                 self._ops.output_msg(self.output)
415             if self.fail_immediately:
416                 raise ImmediateFail()
417         self.output = None
418
419     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
420                  strip_ok_output=False, fail_immediately=False,
421                  flapping=None):
422         self._ops = out
423         self.seen_output = False
424         self.output = None
425         self.prefix = prefix
426         self.suffix = suffix
427         if expected_failures is not None:
428             self.expected_failures = expected_failures
429         else:
430             self.expected_failures = {}
431         if flapping is not None:
432             self.flapping = flapping
433         else:
434             self.flapping = {}
435         self.strip_ok_output = strip_ok_output
436         self.xfail_added = 0
437         self.fail_added = 0
438         self.uxsuccess_added = 0
439         self.total_xfail = 0
440         self.total_error = 0
441         self.total_fail = 0
442         self.total_uxsuccess = 0
443         self.error_added = 0
444         self.fail_immediately = fail_immediately
445
446
447 class PerfFilterOps(unittest.TestResult):
448
449     def progress(self, delta, whence):
450         pass
451
452     def output_msg(self, msg):
453         pass
454
455     def control_msg(self, msg):
456         pass
457
458     def skip_testsuite(self, name, reason=None):
459         self._ops.skip_testsuite(name, reason)
460
461     def start_testsuite(self, name):
462         self.suite_has_time = False
463
464     def end_testsuite(self, name, result, reason=None):
465         pass
466
467     def _add_prefix(self, test):
468         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
469
470     def time(self, time):
471         self.latest_time = time
472         #self._ops.output_msg("found time %s\n" % time)
473         self.suite_has_time = True
474
475     def get_time(self):
476         if self.suite_has_time:
477             return self.latest_time
478         return datetime.datetime.utcnow()
479
480     def startTest(self, test):
481         self.seen_output = True
482         test = self._add_prefix(test)
483         self.starts[test.id()] = self.get_time()
484
485     def addSuccess(self, test):
486         test = self._add_prefix(test)
487         tid = test.id()
488         if tid not in self.starts:
489             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
490         delta = self.get_time() - self.starts[tid]
491         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
492
493     def addFailure(self, test, err=''):
494         tid = test.id()
495         delta = self.get_time() - self.starts[tid]
496         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
497                              (tid, delta.total_seconds(), err))
498
499     def addError(self, test, err=''):
500         tid = test.id()
501         delta = self.get_time() - self.starts[tid]
502         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
503                              (tid, delta.total_seconds(), err))
504
505     def __init__(self, out, prefix='', suffix=''):
506         self._ops = out
507         self.prefix = prefix or ''
508         self.suffix = suffix or ''
509         self.starts = {}
510         self.seen_output = False
511         self.suite_has_time = False
512
513
514 class PlainFormatter(TestsuiteEnabledTestResult):
515
516     def __init__(self, verbose, immediate, statistics,
517             totaltests=None):
518         super(PlainFormatter, self).__init__()
519         self.verbose = verbose
520         self.immediate = immediate
521         self.statistics = statistics
522         self.start_time = None
523         self.test_output = {}
524         self.suitesfailed = []
525         self.suites_ok = 0
526         self.skips = {}
527         self.index = 0
528         self.name = None
529         self._progress_level = 0
530         self.totalsuites = totaltests
531         self.last_time = None
532
533     @staticmethod
534     def _format_time(delta):
535         minutes, seconds = divmod(delta.seconds, 60)
536         hours, minutes = divmod(minutes, 60)
537         ret = ""
538         if hours:
539             ret += "%dh" % hours
540         if minutes:
541             ret += "%dm" % minutes
542         ret += "%ds" % seconds
543         return ret
544
545     def progress(self, offset, whence):
546         if whence == subunit.PROGRESS_POP:
547             self._progress_level -= 1
548         elif whence == subunit.PROGRESS_PUSH:
549             self._progress_level += 1
550         elif whence == subunit.PROGRESS_SET:
551             if self._progress_level == 0:
552                 self.totalsuites = offset
553         elif whence == subunit.PROGRESS_CUR:
554             raise NotImplementedError
555
556     def time(self, dt):
557         if self.start_time is None:
558             self.start_time = dt
559         self.last_time = dt
560
561     def start_testsuite(self, name):
562         self.index += 1
563         self.name = name
564
565         if not self.verbose:
566             self.test_output[name] = ""
567
568         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
569                        self.statistics['TESTS_EXPECTED_FAIL'] +
570                        self.statistics['TESTS_ERROR'] +
571                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
572                        self.statistics['TESTS_UNEXPECTED_OK'])
573
574         out = "[%d(%d)" % (self.index, total_tests)
575         if self.totalsuites is not None:
576             out += "/%d" % self.totalsuites
577         if self.start_time is not None:
578             out += " at " + self._format_time(self.last_time - self.start_time)
579         if self.suitesfailed:
580             out += ", %d errors" % (len(self.suitesfailed),)
581         out += "] %s" % name
582         if self.immediate:
583             sys.stdout.write(out + "\n")
584         else:
585             sys.stdout.write(out + ": ")
586
587     def output_msg(self, output):
588         if self.verbose:
589             sys.stdout.write(output)
590         elif self.name is not None:
591             self.test_output[self.name] += output
592         else:
593             sys.stdout.write(output)
594
595     def control_msg(self, output):
596         pass
597
598     def end_testsuite(self, name, result, reason):
599         out = ""
600         unexpected = False
601
602         if not name in self.test_output:
603             print "no output for name[%s]" % name
604
605         if result in ("success", "xfail"):
606             self.suites_ok+=1
607         else:
608             self.output_msg("ERROR: Testsuite[%s]\n" % name)
609             if reason is not None:
610                 self.output_msg("REASON: %s\n" % (reason,))
611             self.suitesfailed.append(name)
612             if self.immediate and not self.verbose and name in self.test_output:
613                 out += self.test_output[name]
614             unexpected = True
615
616         if not self.immediate:
617             if not unexpected:
618                 out += " ok\n"
619             else:
620                 out += " " + result.upper() + "\n"
621
622         sys.stdout.write(out)
623
624     def startTest(self, test):
625         pass
626
627     def addSuccess(self, test):
628         self.end_test(test.id(), "success", False)
629
630     def addError(self, test, err=None):
631         self.end_test(test.id(), "error", True, err)
632
633     def addFailure(self, test, err=None):
634         self.end_test(test.id(), "failure", True, err)
635
636     def addSkip(self, test, reason=None):
637         self.end_test(test.id(), "skip", False, reason)
638
639     def addExpectedFailure(self, test, err=None):
640         self.end_test(test.id(), "xfail", False, err)
641
642     def addUnexpectedSuccess(self, test):
643         self.end_test(test.id(), "uxsuccess", True)
644
645     def end_test(self, testname, result, unexpected, err=None):
646         if not unexpected:
647             self.test_output[self.name] = ""
648             if not self.immediate:
649                 sys.stdout.write({
650                     'failure': 'f',
651                     'xfail': 'X',
652                     'skip': 's',
653                     'success': '.'}.get(result, "?(%s)" % result))
654             return
655
656         if not self.name in self.test_output:
657             self.test_output[self.name] = ""
658
659         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
660         if err is not None:
661             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
662
663         if self.immediate and not self.verbose:
664             sys.stdout.write(self.test_output[self.name])
665             self.test_output[self.name] = ""
666
667         if not self.immediate:
668             sys.stdout.write({
669                'error': 'E',
670                'failure': 'F',
671                'uxsuccess': 'U',
672                'success': 'S'}.get(result, "?"))
673
674     def write_summary(self, path):
675         f = open(path, 'w+')
676
677         if self.suitesfailed:
678             f.write("= Failed tests =\n")
679
680             for suite in self.suitesfailed:
681                 f.write("== %s ==\n" % suite)
682                 if suite in self.test_output:
683                     f.write(self.test_output[suite]+"\n\n")
684
685             f.write("\n")
686
687         if not self.immediate and not self.verbose:
688             for suite in self.suitesfailed:
689                 print "=" * 78
690                 print "FAIL: %s" % suite
691                 if suite in self.test_output:
692                     print self.test_output[suite]
693                 print ""
694
695         f.write("= Skipped tests =\n")
696         for reason in self.skips.keys():
697             f.write(reason + "\n")
698             for name in self.skips[reason]:
699                 f.write("\t%s\n" % name)
700             f.write("\n")
701         f.close()
702
703         if (not self.suitesfailed and
704             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
705             not self.statistics['TESTS_UNEXPECTED_OK'] and
706             not self.statistics['TESTS_ERROR']):
707             ok = (self.statistics['TESTS_EXPECTED_OK'] +
708                   self.statistics['TESTS_EXPECTED_FAIL'])
709             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
710         else:
711             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
712                 self.statistics['TESTS_UNEXPECTED_FAIL'],
713                 self.statistics['TESTS_ERROR'],
714                 self.statistics['TESTS_UNEXPECTED_OK'],
715                 len(self.suitesfailed))
716
717     def skip_testsuite(self, name, reason="UNKNOWN"):
718         self.skips.setdefault(reason, []).append(name)
719         if self.totalsuites:
720             self.totalsuites-=1