PEP8: fix E302: expected 2 blank lines, found 1
[samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 from __future__ import print_function
19 __all__ = ['parse_results']
20
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
29
30 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
31                      'knownfail', 'error', 'xfail', 'skip-testsuite',
32                      'testsuite-failure', 'testsuite-xfail',
33                      'testsuite-success', 'testsuite-error',
34                      'uxsuccess', 'testsuite-uxsuccess'])
35
36
37 class TestsuiteEnabledTestResult(unittest.TestResult):
38
39     def start_testsuite(self, name):
40         raise NotImplementedError(self.start_testsuite)
41
42
43 def parse_results(msg_ops, statistics, fh):
44     exitcode = 0
45     open_tests = {}
46
47     while fh:
48         l = fh.readline()
49         if l == "":
50             break
51         parts = l.split(None, 1)
52         if not len(parts) == 2 or not l.startswith(parts[0]):
53             msg_ops.output_msg(l)
54             continue
55         command = parts[0].rstrip(":")
56         arg = parts[1]
57         if command in ("test", "testing"):
58             msg_ops.control_msg(l)
59             name = arg.rstrip()
60             test = subunit.RemotedTestCase(name)
61             if name in open_tests:
62                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
63             msg_ops.startTest(test)
64             open_tests[name] = test
65         elif command == "time":
66             msg_ops.control_msg(l)
67             try:
68                 dt = iso8601.parse_date(arg.rstrip("\n"))
69             except TypeError as e:
70                 print("Unable to parse time line: %s" % arg.rstrip("\n"))
71             else:
72                 msg_ops.time(dt)
73         elif command in VALID_RESULTS:
74             msg_ops.control_msg(l)
75             result = command
76             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
77             (testname, hasreason) = (grp.group(1), grp.group(2))
78             if hasreason:
79                 reason = ""
80                 # reason may be specified in next lines
81                 terminated = False
82                 while fh:
83                     l = fh.readline()
84                     if l == "":
85                         break
86                     msg_ops.control_msg(l)
87                     if l[-2:] == "]\n":
88                         reason += l[:-2]
89                         terminated = True
90                         break
91                     else:
92                         reason += l
93
94                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
95
96                 if not terminated:
97                     statistics['TESTS_ERROR'] += 1
98                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
99                     return 1
100             else:
101                 reason = None
102                 remote_error = subunit.RemoteError(u"No reason specified")
103             if result in ("success", "successful"):
104                 try:
105                     test = open_tests.pop(testname)
106                 except KeyError:
107                     statistics['TESTS_ERROR'] += 1
108                     exitcode = 1
109                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
110                 else:
111                     statistics['TESTS_EXPECTED_OK'] += 1
112                     msg_ops.addSuccess(test)
113             elif result in ("xfail", "knownfail"):
114                 try:
115                     test = open_tests.pop(testname)
116                 except KeyError:
117                     statistics['TESTS_ERROR'] += 1
118                     exitcode = 1
119                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
120                 else:
121                     statistics['TESTS_EXPECTED_FAIL'] += 1
122                     msg_ops.addExpectedFailure(test, remote_error)
123             elif result in ("uxsuccess", ):
124                 try:
125                     test = open_tests.pop(testname)
126                 except KeyError:
127                     statistics['TESTS_ERROR'] += 1
128                     exitcode = 1
129                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
130                 else:
131                     statistics['TESTS_UNEXPECTED_OK'] += 1
132                     msg_ops.addUnexpectedSuccess(test)
133                     exitcode = 1
134             elif result in ("failure", "fail"):
135                 try:
136                     test = open_tests.pop(testname)
137                 except KeyError:
138                     statistics['TESTS_ERROR'] += 1
139                     exitcode = 1
140                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
141                 else:
142                     statistics['TESTS_UNEXPECTED_FAIL'] += 1
143                     exitcode = 1
144                     msg_ops.addFailure(test, remote_error)
145             elif result == "skip":
146                 statistics['TESTS_SKIP'] += 1
147                 # Allow tests to be skipped without prior announcement of test
148                 try:
149                     test = open_tests.pop(testname)
150                 except KeyError:
151                     test = subunit.RemotedTestCase(testname)
152                 msg_ops.addSkip(test, reason)
153             elif result == "error":
154                 statistics['TESTS_ERROR'] += 1
155                 exitcode = 1
156                 try:
157                     test = open_tests.pop(testname)
158                 except KeyError:
159                     test = subunit.RemotedTestCase(testname)
160                 msg_ops.addError(test, remote_error)
161             elif result == "skip-testsuite":
162                 msg_ops.skip_testsuite(testname)
163             elif result == "testsuite-success":
164                 msg_ops.end_testsuite(testname, "success", reason)
165             elif result == "testsuite-failure":
166                 msg_ops.end_testsuite(testname, "failure", reason)
167                 exitcode = 1
168             elif result == "testsuite-xfail":
169                 msg_ops.end_testsuite(testname, "xfail", reason)
170             elif result == "testsuite-uxsuccess":
171                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
172                 exitcode = 1
173             elif result == "testsuite-error":
174                 msg_ops.end_testsuite(testname, "error", reason)
175                 exitcode = 1
176             else:
177                 raise AssertionError("Recognized but unhandled result %r" %
178                                      result)
179         elif command == "testsuite":
180             msg_ops.start_testsuite(arg.strip())
181         elif command == "progress":
182             arg = arg.strip()
183             if arg == "pop":
184                 msg_ops.progress(None, subunit.PROGRESS_POP)
185             elif arg == "push":
186                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
187             elif arg[0] in '+-':
188                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
189             else:
190                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
191         else:
192             msg_ops.output_msg(l)
193
194     while open_tests:
195         test = subunit.RemotedTestCase(open_tests.popitem()[1])
196         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
197         statistics['TESTS_ERROR'] += 1
198         exitcode = 1
199
200     return exitcode
201
202
203 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
204
205     def progress(self, count, whence):
206         if whence == subunit.PROGRESS_POP:
207             self._stream.write("progress: pop\n")
208         elif whence == subunit.PROGRESS_PUSH:
209             self._stream.write("progress: push\n")
210         elif whence == subunit.PROGRESS_SET:
211             self._stream.write("progress: %d\n" % count)
212         elif whence == subunit.PROGRESS_CUR:
213             raise NotImplementedError
214
215     # The following are Samba extensions:
216     def start_testsuite(self, name):
217         self._stream.write("testsuite: %s\n" % name)
218
219     def skip_testsuite(self, name, reason=None):
220         if reason:
221             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
222         else:
223             self._stream.write("skip-testsuite: %s\n" % name)
224
225     def end_testsuite(self, name, result, reason=None):
226         if reason:
227             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
228         else:
229             self._stream.write("testsuite-%s: %s\n" % (result, name))
230
231     def output_msg(self, msg):
232         self._stream.write(msg)
233
234
235 def read_test_regexes(*names):
236     ret = {}
237     files = []
238     for name in names:
239         # if we are given a directory, we read all the files it contains
240         # (except the ones that end with "~").
241         if os.path.isdir(name):
242             files.extend([os.path.join(name, x)
243                           for x in os.listdir(name)
244                           if x[-1] != '~'])
245         else:
246             files.append(name)
247
248     for filename in files:
249         f = open(filename, 'r')
250         try:
251             for l in f:
252                 l = l.strip()
253                 if l == "" or l[0] == "#":
254                     continue
255                 if "#" in l:
256                     (regex, reason) = l.split("#", 1)
257                     ret[regex.strip()] = reason.strip()
258                 else:
259                     ret[l] = None
260         finally:
261             f.close()
262     return ret
263
264
265 def find_in_list(regexes, fullname):
266     for regex, reason in regexes.items():
267         if re.match(regex, fullname):
268             if reason is None:
269                 return ""
270             return reason
271     return None
272
273
274 class ImmediateFail(Exception):
275     """Raised to abort immediately."""
276
277     def __init__(self):
278         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
279
280
281 class FilterOps(unittest.TestResult):
282
283     def control_msg(self, msg):
284         pass  # We regenerate control messages, so ignore this
285
286     def time(self, time):
287         self._ops.time(time)
288
289     def progress(self, delta, whence):
290         self._ops.progress(delta, whence)
291
292     def output_msg(self, msg):
293         if self.output is None:
294             sys.stdout.write(msg)
295         else:
296             self.output += msg
297
298     def startTest(self, test):
299         self.seen_output = True
300         test = self._add_prefix(test)
301         if self.strip_ok_output:
302             self.output = ""
303
304         self._ops.startTest(test)
305
306     def _add_prefix(self, test):
307         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
308
309     def addError(self, test, err=None):
310         test = self._add_prefix(test)
311         self.error_added += 1
312         self.total_error += 1
313         self._ops.addError(test, err)
314         self.output = None
315         if self.fail_immediately:
316             raise ImmediateFail()
317
318     def addSkip(self, test, reason=None):
319         self.seen_output = True
320         test = self._add_prefix(test)
321         self._ops.addSkip(test, reason)
322         self.output = None
323
324     def addExpectedFailure(self, test, err=None):
325         test = self._add_prefix(test)
326         self._ops.addExpectedFailure(test, err)
327         self.output = None
328
329     def addUnexpectedSuccess(self, test):
330         test = self._add_prefix(test)
331         self.uxsuccess_added += 1
332         self.total_uxsuccess += 1
333         self._ops.addUnexpectedSuccess(test)
334         if self.output:
335             self._ops.output_msg(self.output)
336         self.output = None
337         if self.fail_immediately:
338             raise ImmediateFail()
339
340     def addFailure(self, test, err=None):
341         test = self._add_prefix(test)
342         xfail_reason = find_in_list(self.expected_failures, test.id())
343         if xfail_reason is None:
344             xfail_reason = find_in_list(self.flapping, test.id())
345         if xfail_reason is not None:
346             self.xfail_added += 1
347             self.total_xfail += 1
348             self._ops.addExpectedFailure(test, err)
349         else:
350             self.fail_added += 1
351             self.total_fail += 1
352             self._ops.addFailure(test, err)
353             if self.output:
354                 self._ops.output_msg(self.output)
355             if self.fail_immediately:
356                 raise ImmediateFail()
357         self.output = None
358
359     def addSuccess(self, test):
360         test = self._add_prefix(test)
361         xfail_reason = find_in_list(self.expected_failures, test.id())
362         if xfail_reason is not None:
363             self.uxsuccess_added += 1
364             self.total_uxsuccess += 1
365             self._ops.addUnexpectedSuccess(test)
366             if self.output:
367                 self._ops.output_msg(self.output)
368             if self.fail_immediately:
369                 raise ImmediateFail()
370         else:
371             self._ops.addSuccess(test)
372         self.output = None
373
374     def skip_testsuite(self, name, reason=None):
375         self._ops.skip_testsuite(name, reason)
376
377     def start_testsuite(self, name):
378         self._ops.start_testsuite(name)
379         self.error_added = 0
380         self.fail_added = 0
381         self.xfail_added = 0
382         self.uxsuccess_added = 0
383
384     def end_testsuite(self, name, result, reason=None):
385         xfail = False
386
387         if self.xfail_added > 0:
388             xfail = True
389         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
390             xfail = False
391
392         if xfail and result in ("fail", "failure"):
393             result = "xfail"
394
395         if self.uxsuccess_added > 0 and result != "uxsuccess":
396             result = "uxsuccess"
397             if reason is None:
398                 reason = "Subunit/Filter Reason"
399             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
400
401         if self.fail_added > 0 and result != "failure":
402             result = "failure"
403             if reason is None:
404                 reason = "Subunit/Filter Reason"
405             reason += "\n failures[%d]" % self.fail_added
406
407         if self.error_added > 0 and result != "error":
408             result = "error"
409             if reason is None:
410                 reason = "Subunit/Filter Reason"
411             reason += "\n errors[%d]" % self.error_added
412
413         self._ops.end_testsuite(name, result, reason)
414         if result not in ("success", "xfail"):
415             if self.output:
416                 self._ops.output_msg(self.output)
417             if self.fail_immediately:
418                 raise ImmediateFail()
419         self.output = None
420
421     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
422                  strip_ok_output=False, fail_immediately=False,
423                  flapping=None):
424         self._ops = out
425         self.seen_output = False
426         self.output = None
427         self.prefix = prefix
428         self.suffix = suffix
429         if expected_failures is not None:
430             self.expected_failures = expected_failures
431         else:
432             self.expected_failures = {}
433         if flapping is not None:
434             self.flapping = flapping
435         else:
436             self.flapping = {}
437         self.strip_ok_output = strip_ok_output
438         self.xfail_added = 0
439         self.fail_added = 0
440         self.uxsuccess_added = 0
441         self.total_xfail = 0
442         self.total_error = 0
443         self.total_fail = 0
444         self.total_uxsuccess = 0
445         self.error_added = 0
446         self.fail_immediately = fail_immediately
447
448
449 class PerfFilterOps(unittest.TestResult):
450
451     def progress(self, delta, whence):
452         pass
453
454     def output_msg(self, msg):
455         pass
456
457     def control_msg(self, msg):
458         pass
459
460     def skip_testsuite(self, name, reason=None):
461         self._ops.skip_testsuite(name, reason)
462
463     def start_testsuite(self, name):
464         self.suite_has_time = False
465
466     def end_testsuite(self, name, result, reason=None):
467         pass
468
469     def _add_prefix(self, test):
470         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
471
472     def time(self, time):
473         self.latest_time = time
474         #self._ops.output_msg("found time %s\n" % time)
475         self.suite_has_time = True
476
477     def get_time(self):
478         if self.suite_has_time:
479             return self.latest_time
480         return datetime.datetime.utcnow()
481
482     def startTest(self, test):
483         self.seen_output = True
484         test = self._add_prefix(test)
485         self.starts[test.id()] = self.get_time()
486
487     def addSuccess(self, test):
488         test = self._add_prefix(test)
489         tid = test.id()
490         if tid not in self.starts:
491             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
492         delta = self.get_time() - self.starts[tid]
493         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
494
495     def addFailure(self, test, err=''):
496         tid = test.id()
497         delta = self.get_time() - self.starts[tid]
498         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
499                              (tid, delta.total_seconds(), err))
500
501     def addError(self, test, err=''):
502         tid = test.id()
503         delta = self.get_time() - self.starts[tid]
504         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
505                              (tid, delta.total_seconds(), err))
506
507     def __init__(self, out, prefix='', suffix=''):
508         self._ops = out
509         self.prefix = prefix or ''
510         self.suffix = suffix or ''
511         self.starts = {}
512         self.seen_output = False
513         self.suite_has_time = False
514
515
516 class PlainFormatter(TestsuiteEnabledTestResult):
517
518     def __init__(self, verbose, immediate, statistics,
519                  totaltests=None):
520         super(PlainFormatter, self).__init__()
521         self.verbose = verbose
522         self.immediate = immediate
523         self.statistics = statistics
524         self.start_time = None
525         self.test_output = {}
526         self.suitesfailed = []
527         self.suites_ok = 0
528         self.skips = {}
529         self.index = 0
530         self.name = None
531         self._progress_level = 0
532         self.totalsuites = totaltests
533         self.last_time = None
534
535     @staticmethod
536     def _format_time(delta):
537         minutes, seconds = divmod(delta.seconds, 60)
538         hours, minutes = divmod(minutes, 60)
539         ret = ""
540         if hours:
541             ret += "%dh" % hours
542         if minutes:
543             ret += "%dm" % minutes
544         ret += "%ds" % seconds
545         return ret
546
547     def progress(self, offset, whence):
548         if whence == subunit.PROGRESS_POP:
549             self._progress_level -= 1
550         elif whence == subunit.PROGRESS_PUSH:
551             self._progress_level += 1
552         elif whence == subunit.PROGRESS_SET:
553             if self._progress_level == 0:
554                 self.totalsuites = offset
555         elif whence == subunit.PROGRESS_CUR:
556             raise NotImplementedError
557
558     def time(self, dt):
559         if self.start_time is None:
560             self.start_time = dt
561         self.last_time = dt
562
563     def start_testsuite(self, name):
564         self.index += 1
565         self.name = name
566
567         if not self.verbose:
568             self.test_output[name] = ""
569
570         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
571                        self.statistics['TESTS_EXPECTED_FAIL'] +
572                        self.statistics['TESTS_ERROR'] +
573                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
574                        self.statistics['TESTS_UNEXPECTED_OK'])
575
576         out = "[%d(%d)" % (self.index, total_tests)
577         if self.totalsuites is not None:
578             out += "/%d" % self.totalsuites
579         if self.start_time is not None:
580             out += " at " + self._format_time(self.last_time - self.start_time)
581         if self.suitesfailed:
582             out += ", %d errors" % (len(self.suitesfailed),)
583         out += "] %s" % name
584         if self.immediate:
585             sys.stdout.write(out + "\n")
586         else:
587             sys.stdout.write(out + ": ")
588
589     def output_msg(self, output):
590         if self.verbose:
591             sys.stdout.write(output)
592         elif self.name is not None:
593             self.test_output[self.name] += output
594         else:
595             sys.stdout.write(output)
596
597     def control_msg(self, output):
598         pass
599
600     def end_testsuite(self, name, result, reason):
601         out = ""
602         unexpected = False
603
604         if not name in self.test_output:
605             print("no output for name[%s]" % name)
606
607         if result in ("success", "xfail"):
608             self.suites_ok += 1
609         else:
610             self.output_msg("ERROR: Testsuite[%s]\n" % name)
611             if reason is not None:
612                 self.output_msg("REASON: %s\n" % (reason,))
613             self.suitesfailed.append(name)
614             if self.immediate and not self.verbose and name in self.test_output:
615                 out += self.test_output[name]
616             unexpected = True
617
618         if not self.immediate:
619             if not unexpected:
620                 out += " ok\n"
621             else:
622                 out += " " + result.upper() + "\n"
623
624         sys.stdout.write(out)
625
626     def startTest(self, test):
627         pass
628
629     def addSuccess(self, test):
630         self.end_test(test.id(), "success", False)
631
632     def addError(self, test, err=None):
633         self.end_test(test.id(), "error", True, err)
634
635     def addFailure(self, test, err=None):
636         self.end_test(test.id(), "failure", True, err)
637
638     def addSkip(self, test, reason=None):
639         self.end_test(test.id(), "skip", False, reason)
640
641     def addExpectedFailure(self, test, err=None):
642         self.end_test(test.id(), "xfail", False, err)
643
644     def addUnexpectedSuccess(self, test):
645         self.end_test(test.id(), "uxsuccess", True)
646
647     def end_test(self, testname, result, unexpected, err=None):
648         if not unexpected:
649             self.test_output[self.name] = ""
650             if not self.immediate:
651                 sys.stdout.write({
652                     'failure': 'f',
653                     'xfail': 'X',
654                     'skip': 's',
655                     'success': '.'}.get(result, "?(%s)" % result))
656             return
657
658         if not self.name in self.test_output:
659             self.test_output[self.name] = ""
660
661         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
662         if err is not None:
663             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
664
665         if self.immediate and not self.verbose:
666             sys.stdout.write(self.test_output[self.name])
667             self.test_output[self.name] = ""
668
669         if not self.immediate:
670             sys.stdout.write({
671                 'error': 'E',
672                'failure': 'F',
673                'uxsuccess': 'U',
674                'success': 'S'}.get(result, "?"))
675
676     def write_summary(self, path):
677         f = open(path, 'w+')
678
679         if self.suitesfailed:
680             f.write("= Failed tests =\n")
681
682             for suite in self.suitesfailed:
683                 f.write("== %s ==\n" % suite)
684                 if suite in self.test_output:
685                     f.write(self.test_output[suite] + "\n\n")
686
687             f.write("\n")
688
689         if not self.immediate and not self.verbose:
690             for suite in self.suitesfailed:
691                 print("=" * 78)
692                 print("FAIL: %s" % suite)
693                 if suite in self.test_output:
694                     print(self.test_output[suite])
695                 print("")
696
697         f.write("= Skipped tests =\n")
698         for reason in self.skips.keys():
699             f.write(reason + "\n")
700             for name in self.skips[reason]:
701                 f.write("\t%s\n" % name)
702             f.write("\n")
703         f.close()
704
705         if (not self.suitesfailed and
706             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
707             not self.statistics['TESTS_UNEXPECTED_OK'] and
708             not self.statistics['TESTS_ERROR']):
709             ok = (self.statistics['TESTS_EXPECTED_OK'] +
710                   self.statistics['TESTS_EXPECTED_FAIL'])
711             print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
712         else:
713             print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
714                 self.statistics['TESTS_UNEXPECTED_FAIL'],
715                 self.statistics['TESTS_ERROR'],
716                 self.statistics['TESTS_UNEXPECTED_OK'],
717                 len(self.suitesfailed)))
718
719     def skip_testsuite(self, name, reason="UNKNOWN"):
720         self.skips.setdefault(reason, []).append(name)
721         if self.totalsuites:
722             self.totalsuites -= 1