subunithelper: Fix progress support.
[sfrench/samba-autobuild/.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 from samba import subunit
23 from samba.subunit.run import TestProtocolClient
24 import iso8601
25 import unittest
26
27 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
28
29 class TestsuiteEnabledTestResult(unittest.TestResult):
30
31     def start_testsuite(self, name):
32         raise NotImplementedError(self.start_testsuite)
33
34
35 def parse_results(msg_ops, statistics, fh):
36     exitcode = 0
37     open_tests = {}
38
39     while fh:
40         l = fh.readline()
41         if l == "":
42             break
43         parts = l.split(None, 1)
44         if not len(parts) == 2 or not l.startswith(parts[0]):
45             msg_ops.output_msg(l)
46             continue
47         command = parts[0].rstrip(":")
48         arg = parts[1]
49         if command in ("test", "testing"):
50             msg_ops.control_msg(l)
51             name = arg.rstrip()
52             test = subunit.RemotedTestCase(name)
53             if name in open_tests:
54                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
55             msg_ops.startTest(test)
56             open_tests[name] = test
57         elif command == "time":
58             msg_ops.control_msg(l)
59             try:
60                 dt = iso8601.parse_date(arg.rstrip("\n"))
61             except TypeError, e:
62                 print "Unable to parse time line: %s" % arg.rstrip("\n")
63             else:
64                 msg_ops.time(dt)
65         elif command in VALID_RESULTS:
66             msg_ops.control_msg(l)
67             result = command
68             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
69             (testname, hasreason) = (grp.group(1), grp.group(2))
70             if hasreason:
71                 reason = ""
72                 # reason may be specified in next lines
73                 terminated = False
74                 while fh:
75                     l = fh.readline()
76                     if l == "":
77                         break
78                     msg_ops.control_msg(l)
79                     if l == "]\n":
80                         terminated = True
81                         break
82                     else:
83                         reason += l
84
85                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
86
87                 if not terminated:
88                     statistics['TESTS_ERROR']+=1
89                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
90                     return 1
91             else:
92                 reason = None
93                 remote_error = subunit.RemoteError(u"No reason specified")
94             if result in ("success", "successful"):
95                 try:
96                     test = open_tests.pop(testname)
97                 except KeyError:
98                     statistics['TESTS_ERROR']+=1
99                     exitcode = 1
100                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
101                 else:
102                     statistics['TESTS_EXPECTED_OK']+=1
103                     msg_ops.addSuccess(test)
104             elif result in ("xfail", "knownfail"):
105                 try:
106                     test = open_tests.pop(testname)
107                 except KeyError:
108                     statistics['TESTS_ERROR']+=1
109                     exitcode = 1
110                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
111                 else:
112                     statistics['TESTS_EXPECTED_FAIL']+=1
113                     msg_ops.addExpectedFailure(test, remote_error)
114             elif result in ("uxsuccess", ):
115                 try:
116                     test = open_tests.pop(testname)
117                 except KeyError:
118                     statistics['TESTS_ERROR']+=1
119                     exitcode = 1
120                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
121                 else:
122                     statistics['TESTS_UNEXPECTED_OK']+=1
123                     msg_ops.addUnexpectedSuccess(test, remote_error)
124                     exitcode = 1
125             elif result in ("failure", "fail"):
126                 try:
127                     test = open_tests.pop(testname)
128                 except KeyError:
129                     statistics['TESTS_ERROR']+=1
130                     exitcode = 1
131                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
132                 else:
133                     statistics['TESTS_UNEXPECTED_FAIL']+=1
134                     exitcode = 1
135                     msg_ops.addFailure(test, remote_error)
136             elif result == "skip":
137                 statistics['TESTS_SKIP']+=1
138                 # Allow tests to be skipped without prior announcement of test
139                 try:
140                     test = open_tests.pop(testname)
141                 except KeyError:
142                     test = subunit.RemotedTestCase(testname)
143                 msg_ops.addSkip(test, reason)
144             elif result == "error":
145                 statistics['TESTS_ERROR']+=1
146                 exitcode = 1
147                 try:
148                     test = open_tests.pop(testname)
149                 except KeyError:
150                     test = subunit.RemotedTestCase(testname)
151                 msg_ops.addError(test, remote_error)
152             elif result == "skip-testsuite":
153                 msg_ops.skip_testsuite(testname)
154             elif result == "testsuite-success":
155                 msg_ops.end_testsuite(testname, "success", reason)
156             elif result == "testsuite-failure":
157                 msg_ops.end_testsuite(testname, "failure", reason)
158                 exitcode = 1
159             elif result == "testsuite-xfail":
160                 msg_ops.end_testsuite(testname, "xfail", reason)
161             elif result == "testsuite-uxsuccess":
162                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
163                 exitcode = 1
164             elif result == "testsuite-error":
165                 msg_ops.end_testsuite(testname, "error", reason)
166                 exitcode = 1
167             else:
168                 raise AssertionError("Recognized but unhandled result %r" %
169                     result)
170         elif command == "testsuite":
171             msg_ops.start_testsuite(arg.strip())
172         elif command == "progress":
173             arg = arg.strip()
174             if arg == "pop":
175                 msg_ops.progress(None, subunit.PROGRESS_POP)
176             elif arg == "push":
177                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
178             elif arg[0] in '+-':
179                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
180             else:
181                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
182         else:
183             msg_ops.output_msg(l)
184
185     while open_tests:
186         test = subunit.RemotedTestCase(open_tests.popitem()[1])
187         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
188         statistics['TESTS_ERROR']+=1
189         exitcode = 1
190
191     return exitcode
192
193
194 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
195
196     def progress(self, count, whence):
197         if whence == subunit.PROGRESS_POP:
198             self._stream.write("progress: pop\n")
199         elif whence == subunit.PROGRESS_PUSH:
200             self._stream.write("progress: push\n")
201         elif whence == subunit.PROGRESS_SET:
202             self._stream.write("progress: %d\n" % count)
203         elif whence == subunit.PROGRESS_CUR:
204             raise NotImplementedError
205
206     # The following are Samba extensions:
207     def start_testsuite(self, name):
208         self._stream.write("testsuite: %s\n" % name)
209
210     def skip_testsuite(self, name, reason=None):
211         if reason:
212             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
213         else:
214             self._stream.write("skip-testsuite: %s\n" % name)
215
216     def end_testsuite(self, name, result, reason=None):
217         if reason:
218             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
219         else:
220             self._stream.write("testsuite-%s: %s\n" % (result, name))
221
222     def output_msg(self, msg):
223         self._stream.write(msg)
224
225
226 def read_test_regexes(name):
227     ret = {}
228     f = open(name, 'r')
229     try:
230         for l in f:
231             l = l.strip()
232             if l == "" or l[0] == "#":
233                 continue
234             if "#" in l:
235                 (regex, reason) = l.split("#", 1)
236                 ret[regex.strip()] = reason.strip()
237             else:
238                 ret[l] = None
239     finally:
240         f.close()
241     return ret
242
243
244 def find_in_list(regexes, fullname):
245     for regex, reason in regexes.iteritems():
246         if re.match(regex, fullname):
247             if reason is None:
248                 return ""
249             return reason
250     return None
251
252
253 class ImmediateFail(Exception):
254     """Raised to abort immediately."""
255
256     def __init__(self):
257         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
258
259
260 class FilterOps(unittest.TestResult):
261
262     def control_msg(self, msg):
263         pass # We regenerate control messages, so ignore this
264
265     def time(self, time):
266         self._ops.time(time)
267
268     def progress(self, delta, whence):
269         self._ops.progress(delta, whence)
270
271     def output_msg(self, msg):
272         if self.output is None:
273             sys.stdout.write(msg)
274         else:
275             self.output+=msg
276
277     def startTest(self, test):
278         self.seen_output = True
279         test = self._add_prefix(test)
280         if self.strip_ok_output:
281            self.output = ""
282
283         self._ops.startTest(test)
284
285     def _add_prefix(self, test):
286         prefix = ""
287         suffix = ""
288         if self.prefix is not None:
289             prefix = self.prefix
290         if self.suffix is not None:
291             suffix = self.suffix
292
293         return subunit.RemotedTestCase(prefix + test.id() + suffix)
294
295     def addError(self, test, err=None):
296         test = self._add_prefix(test)
297         self.error_added+=1
298         self.total_error+=1
299         self._ops.addError(test, err)
300         self.output = None
301         if self.fail_immediately:
302             raise ImmediateFail()
303
304     def addSkip(self, test, reason=None):
305         self.seen_output = True
306         test = self._add_prefix(test)
307         self._ops.addSkip(test, reason)
308         self.output = None
309
310     def addExpectedFailure(self, test, err=None):
311         test = self._add_prefix(test)
312         self._ops.addExpectedFailure(test, err)
313         self.output = None
314
315     def addUnexpectedSuccess(self, test, err=None):
316         test = self._add_prefix(test)
317         self.uxsuccess_added+=1
318         self.total_uxsuccess+=1
319         self._ops.addUnexpectedSuccess(test, err)
320         if self.output:
321             self._ops.output_msg(self.output)
322         self.output = None
323         if self.fail_immediately:
324             raise ImmediateFail()
325
326     def addFailure(self, test, err=None):
327         test = self._add_prefix(test)
328         xfail_reason = find_in_list(self.expected_failures, test.id())
329         if xfail_reason is None:
330             xfail_reason = find_in_list(self.flapping, test.id())
331         if xfail_reason is not None:
332             self.xfail_added+=1
333             self.total_xfail+=1
334             self._ops.addExpectedFailure(test, err)
335         else:
336             self.fail_added+=1
337             self.total_fail+=1
338             self._ops.addFailure(test, err)
339             if self.output:
340                 self._ops.output_msg(self.output)
341             if self.fail_immediately:
342                 raise ImmediateFail()
343         self.output = None
344
345     def addSuccess(self, test):
346         test = self._add_prefix(test)
347         xfail_reason = find_in_list(self.expected_failures, test.id())
348         if xfail_reason is not None:
349             self.uxsuccess_added += 1
350             self.total_uxsuccess += 1
351             self._ops.addUnexpectedSuccess(test, subunit.RemoteError(xfail_reason))
352             if self.output:
353                 self._ops.output_msg(self.output)
354             if self.fail_immediately:
355                 raise ImmediateFail()
356         else:
357             self._ops.addSuccess(test)
358         self.output = None
359
360     def skip_testsuite(self, name, reason=None):
361         self._ops.skip_testsuite(name, reason)
362
363     def start_testsuite(self, name):
364         self._ops.start_testsuite(name)
365         self.error_added = 0
366         self.fail_added = 0
367         self.xfail_added = 0
368         self.uxsuccess_added = 0
369
370     def end_testsuite(self, name, result, reason=None):
371         xfail = False
372
373         if self.xfail_added > 0:
374             xfail = True
375         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
376             xfail = False
377
378         if xfail and result in ("fail", "failure"):
379             result = "xfail"
380
381         if self.uxsuccess_added > 0 and result != "uxsuccess":
382             result = "uxsuccess"
383             if reason is None:
384                 reason = "Subunit/Filter Reason"
385             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
386
387         if self.fail_added > 0 and result != "failure":
388             result = "failure"
389             if reason is None:
390                 reason = "Subunit/Filter Reason"
391             reason += "\n failures[%d]" % self.fail_added
392
393         if self.error_added > 0 and result != "error":
394             result = "error"
395             if reason is None:
396                 reason = "Subunit/Filter Reason"
397             reason += "\n errors[%d]" % self.error_added
398
399         self._ops.end_testsuite(name, result, reason)
400         if result not in ("success", "xfail"):
401             if self.output:
402                 self._ops.output_msg(self.output)
403             if self.fail_immediately:
404                 raise ImmediateFail()
405         self.output = None
406
407     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
408                  strip_ok_output=False, fail_immediately=False,
409                  flapping=None):
410         self._ops = out
411         self.seen_output = False
412         self.output = None
413         self.prefix = prefix
414         self.suffix = suffix
415         if expected_failures is not None:
416             self.expected_failures = expected_failures
417         else:
418             self.expected_failures = {}
419         if flapping is not None:
420             self.flapping = flapping
421         else:
422             self.flapping = {}
423         self.strip_ok_output = strip_ok_output
424         self.xfail_added = 0
425         self.fail_added = 0
426         self.uxsuccess_added = 0
427         self.total_xfail = 0
428         self.total_error = 0
429         self.total_fail = 0
430         self.total_uxsuccess = 0
431         self.error_added = 0
432         self.fail_immediately = fail_immediately
433
434
435 class PlainFormatter(TestsuiteEnabledTestResult):
436
437     def __init__(self, verbose, immediate, statistics,
438             totaltests=None):
439         super(PlainFormatter, self).__init__()
440         self.verbose = verbose
441         self.immediate = immediate
442         self.statistics = statistics
443         self.start_time = None
444         self.test_output = {}
445         self.suitesfailed = []
446         self.suites_ok = 0
447         self.skips = {}
448         self.index = 0
449         self.name = None
450         self._progress_level = 0
451         self.totalsuites = totaltests
452         self.last_time = None
453
454     @staticmethod
455     def _format_time(delta):
456         minutes, seconds = divmod(delta.seconds, 60)
457         hours, minutes = divmod(minutes, 60)
458         ret = ""
459         if hours:
460             ret += "%dh" % hours
461         if minutes:
462             ret += "%dm" % minutes
463         ret += "%ds" % seconds
464         return ret
465
466     def progress(self, offset, whence):
467         if whence == subunit.PROGRESS_POP:
468             self._progress_level -= 1
469         elif whence == subunit.PROGRESS_PUSH:
470             self._progress_level += 1
471         elif whence == subunit.PROGRESS_SET:
472             if self._progress_level == 0:
473                 self.totalsuites = offset
474         elif whence == subunit.PROGRESS_CUR:
475             raise NotImplementedError
476
477     def time(self, dt):
478         if self.start_time is None:
479             self.start_time = dt
480         self.last_time = dt
481
482     def start_testsuite(self, name):
483         self.index += 1
484         self.name = name
485
486         if not self.verbose:
487             self.test_output[name] = ""
488
489         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
490                        self.statistics['TESTS_EXPECTED_FAIL'] +
491                        self.statistics['TESTS_ERROR'] +
492                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
493                        self.statistics['TESTS_UNEXPECTED_OK'])
494
495         out = "[%d(%d)" % (self.index, total_tests)
496         if self.totalsuites is not None:
497             out += "/%d" % self.totalsuites
498         if self.start_time is not None:
499             out += " at " + self._format_time(self.last_time - self.start_time)
500         if self.suitesfailed:
501             out += ", %d errors" % (len(self.suitesfailed),)
502         out += "] %s" % name
503         if self.immediate:
504             sys.stdout.write(out + "\n")
505         else:
506             sys.stdout.write(out + ": ")
507
508     def output_msg(self, output):
509         if self.verbose:
510             sys.stdout.write(output)
511         elif self.name is not None:
512             self.test_output[self.name] += output
513         else:
514             sys.stdout.write(output)
515
516     def control_msg(self, output):
517         pass
518
519     def end_testsuite(self, name, result, reason):
520         out = ""
521         unexpected = False
522
523         if not name in self.test_output:
524             print "no output for name[%s]" % name
525
526         if result in ("success", "xfail"):
527             self.suites_ok+=1
528         else:
529             self.output_msg("ERROR: Testsuite[%s]\n" % name)
530             if reason is not None:
531                 self.output_msg("REASON: %s\n" % (reason,))
532             self.suitesfailed.append(name)
533             if self.immediate and not self.verbose and name in self.test_output:
534                 out += self.test_output[name]
535             unexpected = True
536
537         if not self.immediate:
538             if not unexpected:
539                 out += " ok\n"
540             else:
541                 out += " " + result.upper() + "\n"
542
543         sys.stdout.write(out)
544
545     def startTest(self, test):
546         pass
547
548     def addSuccess(self, test):
549         self.end_test(test.id(), "success", False)
550
551     def addError(self, test, err=None):
552         self.end_test(test.id(), "error", True, err)
553
554     def addFailure(self, test, err=None):
555         self.end_test(test.id(), "failure", True, err)
556
557     def addSkip(self, test, reason=None):
558         self.end_test(test.id(), "skip", False, reason)
559
560     def addExpectedFailure(self, test, err=None):
561         self.end_test(test.id(), "xfail", False, err)
562
563     def addUnexpectedSuccess(self, test):
564         self.end_test(test.id(), "uxsuccess", True)
565
566     def end_test(self, testname, result, unexpected, err=None):
567         if not unexpected:
568             self.test_output[self.name] = ""
569             if not self.immediate:
570                 sys.stdout.write({
571                     'failure': 'f',
572                     'xfail': 'X',
573                     'skip': 's',
574                     'success': '.'}.get(result, "?(%s)" % result))
575             return
576
577         if not self.name in self.test_output:
578             self.test_output[self.name] = ""
579
580         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
581         if err is not None:
582             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
583
584         if self.immediate and not self.verbose:
585             sys.stdout.write(self.test_output[self.name])
586             self.test_output[self.name] = ""
587
588         if not self.immediate:
589             sys.stdout.write({
590                'error': 'E',
591                'failure': 'F',
592                'uxsuccess': 'U',
593                'success': 'S'}.get(result, "?"))
594
595     def write_summary(self, path):
596         f = open(path, 'w+')
597
598         if self.suitesfailed:
599             f.write("= Failed tests =\n")
600
601             for suite in self.suitesfailed:
602                 f.write("== %s ==\n" % suite)
603                 if suite in self.test_output:
604                     f.write(self.test_output[suite]+"\n\n")
605
606             f.write("\n")
607
608         if not self.immediate and not self.verbose:
609             for suite in self.suitesfailed:
610                 print "=" * 78
611                 print "FAIL: %s" % suite
612                 if suite in self.test_output:
613                     print self.test_output[suite]
614                 print ""
615
616         f.write("= Skipped tests =\n")
617         for reason in self.skips.keys():
618             f.write(reason + "\n")
619             for name in self.skips[reason]:
620                 f.write("\t%s\n" % name)
621             f.write("\n")
622         f.close()
623
624         if (not self.suitesfailed and
625             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
626             not self.statistics['TESTS_UNEXPECTED_OK'] and
627             not self.statistics['TESTS_ERROR']):
628             ok = (self.statistics['TESTS_EXPECTED_OK'] +
629                   self.statistics['TESTS_EXPECTED_FAIL'])
630             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
631         else:
632             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
633                 self.statistics['TESTS_UNEXPECTED_FAIL'],
634                 self.statistics['TESTS_ERROR'],
635                 self.statistics['TESTS_UNEXPECTED_OK'],
636                 len(self.suitesfailed))
637
638     def skip_testsuite(self, name, reason="UNKNOWN"):
639         self.skips.setdefault(reason, []).append(name)
640         if self.totalsuites:
641             self.totalsuites-=1