26158cad33938003347684fa9076cd9d5cb7af08
[samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 from __future__ import print_function
19 __all__ = ['parse_results']
20
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
29
30
31 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
32                      'knownfail', 'error', 'xfail', 'skip-testsuite',
33                      'testsuite-failure', 'testsuite-xfail',
34                      'testsuite-success', 'testsuite-error',
35                      'uxsuccess', 'testsuite-uxsuccess'])
36
37
38 class TestsuiteEnabledTestResult(unittest.TestResult):
39
40     def start_testsuite(self, name):
41         raise NotImplementedError(self.start_testsuite)
42
43
44 def parse_results(msg_ops, statistics, fh):
45     exitcode = 0
46     open_tests = {}
47
48     while fh:
49         l = fh.readline()
50         if l == "":
51             break
52         parts = l.split(None, 1)
53         if not len(parts) == 2 or not l.startswith(parts[0]):
54             msg_ops.output_msg(l)
55             continue
56         command = parts[0].rstrip(":")
57         arg = parts[1]
58         if command in ("test", "testing"):
59             msg_ops.control_msg(l)
60             name = arg.rstrip()
61             test = subunit.RemotedTestCase(name)
62             if name in open_tests:
63                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
64             msg_ops.startTest(test)
65             open_tests[name] = test
66         elif command == "time":
67             msg_ops.control_msg(l)
68             try:
69                 dt = iso8601.parse_date(arg.rstrip("\n"))
70             except TypeError as e:
71                 print("Unable to parse time line: %s" % arg.rstrip("\n"))
72             else:
73                 msg_ops.time(dt)
74         elif command in VALID_RESULTS:
75             msg_ops.control_msg(l)
76             result = command
77             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
78             (testname, hasreason) = (grp.group(1), grp.group(2))
79             if hasreason:
80                 reason = ""
81                 # reason may be specified in next lines
82                 terminated = False
83                 while fh:
84                     l = fh.readline()
85                     if l == "":
86                         break
87                     msg_ops.control_msg(l)
88                     if l == "]\n":
89                         terminated = True
90                         break
91                     else:
92                         reason += l
93
94                 if isinstance(reason, bytes):
95                     remote_error = subunit.RemoteError(reason.decode("utf-8"))
96                 else:
97                     remote_error = subunit.RemoteError(reason)
98
99                 if not terminated:
100                     statistics['TESTS_ERROR'] += 1
101                     msg_ops.addError(subunit.RemotedTestCase(testname),
102                                      subunit.RemoteError(u"result (%s) reason (%s) interrupted" % (result, reason)))
103                     return 1
104             else:
105                 reason = None
106                 remote_error = subunit.RemoteError(u"No reason specified")
107             if result in ("success", "successful"):
108                 try:
109                     test = open_tests.pop(testname)
110                 except KeyError:
111                     statistics['TESTS_ERROR'] += 1
112                     exitcode = 1
113                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
114                 else:
115                     statistics['TESTS_EXPECTED_OK'] += 1
116                     msg_ops.addSuccess(test)
117             elif result in ("xfail", "knownfail"):
118                 try:
119                     test = open_tests.pop(testname)
120                 except KeyError:
121                     statistics['TESTS_ERROR'] += 1
122                     exitcode = 1
123                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
124                 else:
125                     statistics['TESTS_EXPECTED_FAIL'] += 1
126                     msg_ops.addExpectedFailure(test, remote_error)
127             elif result in ("uxsuccess", ):
128                 try:
129                     test = open_tests.pop(testname)
130                 except KeyError:
131                     statistics['TESTS_ERROR'] += 1
132                     exitcode = 1
133                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
134                 else:
135                     statistics['TESTS_UNEXPECTED_OK'] += 1
136                     msg_ops.addUnexpectedSuccess(test)
137                     exitcode = 1
138             elif result in ("failure", "fail"):
139                 try:
140                     test = open_tests.pop(testname)
141                 except KeyError:
142                     statistics['TESTS_ERROR'] += 1
143                     exitcode = 1
144                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
145                 else:
146                     statistics['TESTS_UNEXPECTED_FAIL'] += 1
147                     exitcode = 1
148                     msg_ops.addFailure(test, remote_error)
149             elif result == "skip":
150                 statistics['TESTS_SKIP'] += 1
151                 # Allow tests to be skipped without prior announcement of test
152                 try:
153                     test = open_tests.pop(testname)
154                 except KeyError:
155                     test = subunit.RemotedTestCase(testname)
156                 msg_ops.addSkip(test, reason)
157             elif result == "error":
158                 statistics['TESTS_ERROR'] += 1
159                 exitcode = 1
160                 try:
161                     test = open_tests.pop(testname)
162                 except KeyError:
163                     test = subunit.RemotedTestCase(testname)
164                 msg_ops.addError(test, remote_error)
165             elif result == "skip-testsuite":
166                 msg_ops.skip_testsuite(testname)
167             elif result == "testsuite-success":
168                 msg_ops.end_testsuite(testname, "success", reason)
169             elif result == "testsuite-failure":
170                 msg_ops.end_testsuite(testname, "failure", reason)
171                 exitcode = 1
172             elif result == "testsuite-xfail":
173                 msg_ops.end_testsuite(testname, "xfail", reason)
174             elif result == "testsuite-uxsuccess":
175                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
176                 exitcode = 1
177             elif result == "testsuite-error":
178                 msg_ops.end_testsuite(testname, "error", reason)
179                 exitcode = 1
180             else:
181                 raise AssertionError("Recognized but unhandled result %r" %
182                                      result)
183         elif command == "testsuite":
184             msg_ops.start_testsuite(arg.strip())
185         elif command == "progress":
186             arg = arg.strip()
187             if arg == "pop":
188                 msg_ops.progress(None, subunit.PROGRESS_POP)
189             elif arg == "push":
190                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
191             elif arg[0] in '+-':
192                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
193             else:
194                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
195         else:
196             msg_ops.output_msg(l)
197
198     while open_tests:
199         test = subunit.RemotedTestCase(open_tests.popitem()[1])
200         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
201         statistics['TESTS_ERROR'] += 1
202         exitcode = 1
203
204     return exitcode
205
206
207 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
208
209     def progress(self, count, whence):
210         if whence == subunit.PROGRESS_POP:
211             self._stream.write("progress: pop\n")
212         elif whence == subunit.PROGRESS_PUSH:
213             self._stream.write("progress: push\n")
214         elif whence == subunit.PROGRESS_SET:
215             self._stream.write("progress: %d\n" % count)
216         elif whence == subunit.PROGRESS_CUR:
217             raise NotImplementedError
218
219     # The following are Samba extensions:
220     def start_testsuite(self, name):
221         self._stream.write("testsuite: %s\n" % name)
222
223     def skip_testsuite(self, name, reason=None):
224         if reason:
225             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
226         else:
227             self._stream.write("skip-testsuite: %s\n" % name)
228
229     def end_testsuite(self, name, result, reason=None):
230         if reason:
231             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
232         else:
233             self._stream.write("testsuite-%s: %s\n" % (result, name))
234
235     def output_msg(self, msg):
236         self._stream.write(msg)
237
238
239 def read_test_regexes(*names):
240     ret = {}
241     files = []
242     for name in names:
243         # if we are given a directory, we read all the files it contains
244         # (except the ones that end with "~").
245         if os.path.isdir(name):
246             files.extend([os.path.join(name, x)
247                           for x in os.listdir(name)
248                           if x[-1] != '~'])
249         else:
250             files.append(name)
251
252     for filename in files:
253         f = open(filename, 'r')
254         try:
255             for l in f:
256                 l = l.strip()
257                 if l == "" or l[0] == "#":
258                     continue
259                 if "#" in l:
260                     (regex, reason) = l.split("#", 1)
261                     ret[regex.strip()] = reason.strip()
262                 else:
263                     ret[l] = None
264         finally:
265             f.close()
266     return ret
267
268
269 def find_in_list(regexes, fullname):
270     for regex, reason in regexes.items():
271         if re.match(regex, fullname):
272             if reason is None:
273                 return ""
274             return reason
275     return None
276
277
278 class ImmediateFail(Exception):
279     """Raised to abort immediately."""
280
281     def __init__(self):
282         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
283
284
285 class FilterOps(unittest.TestResult):
286
287     def control_msg(self, msg):
288         pass  # We regenerate control messages, so ignore this
289
290     def time(self, time):
291         self._ops.time(time)
292
293     def progress(self, delta, whence):
294         self._ops.progress(delta, whence)
295
296     def output_msg(self, msg):
297         if self.output is None:
298             sys.stdout.write(msg)
299         else:
300             self.output += msg
301
302     def startTest(self, test):
303         self.seen_output = True
304         test = self._add_prefix(test)
305         if self.strip_ok_output:
306             self.output = ""
307
308         self._ops.startTest(test)
309
310     def _add_prefix(self, test):
311         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
312
313     def addError(self, test, err=None):
314         test = self._add_prefix(test)
315         self.error_added += 1
316         self.total_error += 1
317         self._ops.addError(test, err)
318         self.output = None
319         if self.fail_immediately:
320             raise ImmediateFail()
321
322     def addSkip(self, test, reason=None):
323         self.seen_output = True
324         test = self._add_prefix(test)
325         self._ops.addSkip(test, reason)
326         self.output = None
327
328     def addExpectedFailure(self, test, err=None):
329         test = self._add_prefix(test)
330         self._ops.addExpectedFailure(test, err)
331         self.output = None
332
333     def addUnexpectedSuccess(self, test):
334         test = self._add_prefix(test)
335         self.uxsuccess_added += 1
336         self.total_uxsuccess += 1
337         self._ops.addUnexpectedSuccess(test)
338         if self.output:
339             self._ops.output_msg(self.output)
340         self.output = None
341         if self.fail_immediately:
342             raise ImmediateFail()
343
344     def addFailure(self, test, err=None):
345         test = self._add_prefix(test)
346         xfail_reason = find_in_list(self.expected_failures, test.id())
347         if xfail_reason is None:
348             xfail_reason = find_in_list(self.flapping, test.id())
349         if xfail_reason is not None:
350             self.xfail_added += 1
351             self.total_xfail += 1
352             self._ops.addExpectedFailure(test, err)
353         else:
354             self.fail_added += 1
355             self.total_fail += 1
356             self._ops.addFailure(test, err)
357             if self.output:
358                 self._ops.output_msg(self.output)
359             if self.fail_immediately:
360                 raise ImmediateFail()
361         self.output = None
362
363     def addSuccess(self, test):
364         test = self._add_prefix(test)
365         xfail_reason = find_in_list(self.expected_failures, test.id())
366         if xfail_reason is not None:
367             self.uxsuccess_added += 1
368             self.total_uxsuccess += 1
369             self._ops.addUnexpectedSuccess(test)
370             if self.output:
371                 self._ops.output_msg(self.output)
372             if self.fail_immediately:
373                 raise ImmediateFail()
374         else:
375             self._ops.addSuccess(test)
376         self.output = None
377
378     def skip_testsuite(self, name, reason=None):
379         self._ops.skip_testsuite(name, reason)
380
381     def start_testsuite(self, name):
382         self._ops.start_testsuite(name)
383         self.error_added = 0
384         self.fail_added = 0
385         self.xfail_added = 0
386         self.uxsuccess_added = 0
387
388     def end_testsuite(self, name, result, reason=None):
389         xfail = False
390
391         if self.xfail_added > 0:
392             xfail = True
393         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
394             xfail = False
395
396         if xfail and result in ("fail", "failure"):
397             result = "xfail"
398
399         if self.uxsuccess_added > 0 and result != "uxsuccess":
400             result = "uxsuccess"
401             if reason is None:
402                 reason = "Subunit/Filter Reason"
403             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
404
405         if self.fail_added > 0 and result != "failure":
406             result = "failure"
407             if reason is None:
408                 reason = "Subunit/Filter Reason"
409             reason += "\n failures[%d]" % self.fail_added
410
411         if self.error_added > 0 and result != "error":
412             result = "error"
413             if reason is None:
414                 reason = "Subunit/Filter Reason"
415             reason += "\n errors[%d]" % self.error_added
416
417         self._ops.end_testsuite(name, result, reason)
418         if result not in ("success", "xfail"):
419             if self.output:
420                 self._ops.output_msg(self.output)
421             if self.fail_immediately:
422                 raise ImmediateFail()
423         self.output = None
424
425     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
426                  strip_ok_output=False, fail_immediately=False,
427                  flapping=None):
428         self._ops = out
429         self.seen_output = False
430         self.output = None
431         self.prefix = prefix
432         self.suffix = suffix
433         if expected_failures is not None:
434             self.expected_failures = expected_failures
435         else:
436             self.expected_failures = {}
437         if flapping is not None:
438             self.flapping = flapping
439         else:
440             self.flapping = {}
441         self.strip_ok_output = strip_ok_output
442         self.xfail_added = 0
443         self.fail_added = 0
444         self.uxsuccess_added = 0
445         self.total_xfail = 0
446         self.total_error = 0
447         self.total_fail = 0
448         self.total_uxsuccess = 0
449         self.error_added = 0
450         self.fail_immediately = fail_immediately
451
452
453 class PerfFilterOps(unittest.TestResult):
454
455     def progress(self, delta, whence):
456         pass
457
458     def output_msg(self, msg):
459         pass
460
461     def control_msg(self, msg):
462         pass
463
464     def skip_testsuite(self, name, reason=None):
465         self._ops.skip_testsuite(name, reason)
466
467     def start_testsuite(self, name):
468         self.suite_has_time = False
469
470     def end_testsuite(self, name, result, reason=None):
471         pass
472
473     def _add_prefix(self, test):
474         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
475
476     def time(self, time):
477         self.latest_time = time
478         #self._ops.output_msg("found time %s\n" % time)
479         self.suite_has_time = True
480
481     def get_time(self):
482         if self.suite_has_time:
483             return self.latest_time
484         return datetime.datetime.utcnow()
485
486     def startTest(self, test):
487         self.seen_output = True
488         test = self._add_prefix(test)
489         self.starts[test.id()] = self.get_time()
490
491     def addSuccess(self, test):
492         test = self._add_prefix(test)
493         tid = test.id()
494         if tid not in self.starts:
495             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
496         delta = self.get_time() - self.starts[tid]
497         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
498
499     def addFailure(self, test, err=''):
500         tid = test.id()
501         delta = self.get_time() - self.starts[tid]
502         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
503                              (tid, delta.total_seconds(), err))
504
505     def addError(self, test, err=''):
506         tid = test.id()
507         delta = self.get_time() - self.starts[tid]
508         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
509                              (tid, delta.total_seconds(), err))
510
511     def __init__(self, out, prefix='', suffix=''):
512         self._ops = out
513         self.prefix = prefix or ''
514         self.suffix = suffix or ''
515         self.starts = {}
516         self.seen_output = False
517         self.suite_has_time = False
518
519
520 class PlainFormatter(TestsuiteEnabledTestResult):
521
522     def __init__(self, verbose, immediate, statistics,
523                  totaltests=None):
524         super(PlainFormatter, self).__init__()
525         self.verbose = verbose
526         self.immediate = immediate
527         self.statistics = statistics
528         self.start_time = None
529         self.test_output = {}
530         self.suitesfailed = []
531         self.suites_ok = 0
532         self.skips = {}
533         self.index = 0
534         self.name = None
535         self._progress_level = 0
536         self.totalsuites = totaltests
537         self.last_time = None
538
539     @staticmethod
540     def _format_time(delta):
541         minutes, seconds = divmod(delta.seconds, 60)
542         hours, minutes = divmod(minutes, 60)
543         ret = ""
544         if hours:
545             ret += "%dh" % hours
546         if minutes:
547             ret += "%dm" % minutes
548         ret += "%ds" % seconds
549         return ret
550
551     def progress(self, offset, whence):
552         if whence == subunit.PROGRESS_POP:
553             self._progress_level -= 1
554         elif whence == subunit.PROGRESS_PUSH:
555             self._progress_level += 1
556         elif whence == subunit.PROGRESS_SET:
557             if self._progress_level == 0:
558                 self.totalsuites = offset
559         elif whence == subunit.PROGRESS_CUR:
560             raise NotImplementedError
561
562     def time(self, dt):
563         if self.start_time is None:
564             self.start_time = dt
565         self.last_time = dt
566
567     def start_testsuite(self, name):
568         self.index += 1
569         self.name = name
570
571         if not self.verbose:
572             self.test_output[name] = ""
573
574         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
575                        self.statistics['TESTS_EXPECTED_FAIL'] +
576                        self.statistics['TESTS_ERROR'] +
577                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
578                        self.statistics['TESTS_UNEXPECTED_OK'])
579
580         out = "[%d(%d)" % (self.index, total_tests)
581         if self.totalsuites is not None:
582             out += "/%d" % self.totalsuites
583         if self.start_time is not None:
584             out += " at " + self._format_time(self.last_time - self.start_time)
585         if self.suitesfailed:
586             out += ", %d errors" % (len(self.suitesfailed),)
587         out += "] %s" % name
588         if self.immediate:
589             sys.stdout.write(out + "\n")
590         else:
591             sys.stdout.write(out + ": ")
592
593     def output_msg(self, output):
594         if self.verbose:
595             sys.stdout.write(output)
596         elif self.name is not None:
597             self.test_output[self.name] += output
598         else:
599             sys.stdout.write(output)
600
601     def control_msg(self, output):
602         pass
603
604     def end_testsuite(self, name, result, reason):
605         out = ""
606         unexpected = False
607
608         if name not in self.test_output:
609             print("no output for name[%s]" % name)
610
611         if result in ("success", "xfail"):
612             self.suites_ok += 1
613         else:
614             self.output_msg("ERROR: Testsuite[%s]\n" % name)
615             if reason is not None:
616                 self.output_msg("REASON: %s\n" % (reason,))
617             self.suitesfailed.append(name)
618             if self.immediate and not self.verbose and name in self.test_output:
619                 out += self.test_output[name]
620             unexpected = True
621
622         if not self.immediate:
623             if not unexpected:
624                 out += " ok\n"
625             else:
626                 out += " " + result.upper() + "\n"
627
628         sys.stdout.write(out)
629
630     def startTest(self, test):
631         pass
632
633     def addSuccess(self, test):
634         self.end_test(test.id(), "success", False)
635
636     def addError(self, test, err=None):
637         self.end_test(test.id(), "error", True, err)
638
639     def addFailure(self, test, err=None):
640         self.end_test(test.id(), "failure", True, err)
641
642     def addSkip(self, test, reason=None):
643         self.end_test(test.id(), "skip", False, reason)
644
645     def addExpectedFailure(self, test, err=None):
646         self.end_test(test.id(), "xfail", False, err)
647
648     def addUnexpectedSuccess(self, test):
649         self.end_test(test.id(), "uxsuccess", True)
650
651     def end_test(self, testname, result, unexpected, err=None):
652         if not unexpected:
653             self.test_output[self.name] = ""
654             if not self.immediate:
655                 sys.stdout.write({
656                     'failure': 'f',
657                     'xfail': 'X',
658                     'skip': 's',
659                     'success': '.'}.get(result, "?(%s)" % result))
660             return
661
662         if self.name not in self.test_output:
663             self.test_output[self.name] = ""
664
665         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
666         if err is not None:
667             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
668
669         if self.immediate and not self.verbose:
670             sys.stdout.write(self.test_output[self.name])
671             self.test_output[self.name] = ""
672
673         if not self.immediate:
674             sys.stdout.write({
675                 'error': 'E',
676                'failure': 'F',
677                'uxsuccess': 'U',
678                'success': 'S'}.get(result, "?"))
679
680     def write_summary(self, path):
681         f = open(path, 'w+')
682
683         if self.suitesfailed:
684             f.write("= Failed tests =\n")
685
686             for suite in self.suitesfailed:
687                 f.write("== %s ==\n" % suite)
688                 if suite in self.test_output:
689                     f.write(self.test_output[suite] + "\n\n")
690
691             f.write("\n")
692
693         if not self.immediate and not self.verbose:
694             for suite in self.suitesfailed:
695                 print("=" * 78)
696                 print("FAIL: %s" % suite)
697                 if suite in self.test_output:
698                     print(self.test_output[suite])
699                 print("")
700
701         f.write("= Skipped tests =\n")
702         for reason in self.skips.keys():
703             f.write(reason + "\n")
704             for name in self.skips[reason]:
705                 f.write("\t%s\n" % name)
706             f.write("\n")
707         f.close()
708
709         if (not self.suitesfailed and
710             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
711             not self.statistics['TESTS_UNEXPECTED_OK'] and
712             not self.statistics['TESTS_ERROR']):
713             ok = (self.statistics['TESTS_EXPECTED_OK'] +
714                   self.statistics['TESTS_EXPECTED_FAIL'])
715             print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
716         else:
717             print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
718                 self.statistics['TESTS_UNEXPECTED_FAIL'],
719                 self.statistics['TESTS_ERROR'],
720                 self.statistics['TESTS_UNEXPECTED_OK'],
721                 len(self.suitesfailed)))
722
723     def skip_testsuite(self, name, reason="UNKNOWN"):
724         self.skips.setdefault(reason, []).append(name)
725         if self.totalsuites:
726             self.totalsuites -= 1