1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
31 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
32 'knownfail', 'error', 'xfail', 'skip-testsuite',
33 'testsuite-failure', 'testsuite-xfail',
34 'testsuite-success', 'testsuite-error',
35 'uxsuccess', 'testsuite-uxsuccess'])
38 class TestsuiteEnabledTestResult(unittest.TestResult):
40 def start_testsuite(self, name):
41 raise NotImplementedError(self.start_testsuite)
44 def parse_results(msg_ops, statistics, fh):
52 parts = l.split(None, 1)
53 if not len(parts) == 2 or not l.startswith(parts[0]):
56 command = parts[0].rstrip(":")
58 if command in ("test", "testing"):
59 msg_ops.control_msg(l)
61 test = subunit.RemotedTestCase(name)
62 if name in open_tests:
63 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
64 msg_ops.startTest(test)
65 open_tests[name] = test
66 elif command == "time":
67 msg_ops.control_msg(l)
69 dt = iso8601.parse_date(arg.rstrip("\n"))
70 except TypeError as e:
71 print("Unable to parse time line: %s" % arg.rstrip("\n"))
74 elif command in VALID_RESULTS:
75 msg_ops.control_msg(l)
77 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
78 (testname, hasreason) = (grp.group(1), grp.group(2))
81 # reason may be specified in next lines
87 msg_ops.control_msg(l)
94 if isinstance(reason, bytes):
95 remote_error = subunit.RemoteError(reason.decode("utf-8"))
97 remote_error = subunit.RemoteError(reason)
100 statistics['TESTS_ERROR'] += 1
101 msg_ops.addError(subunit.RemotedTestCase(testname),
102 subunit.RemoteError(u"result (%s) reason (%s) interrupted" % (result, reason)))
106 remote_error = subunit.RemoteError(u"No reason specified")
107 if result in ("success", "successful"):
109 test = open_tests.pop(testname)
111 statistics['TESTS_ERROR'] += 1
113 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
115 statistics['TESTS_EXPECTED_OK'] += 1
116 msg_ops.addSuccess(test)
117 elif result in ("xfail", "knownfail"):
119 test = open_tests.pop(testname)
121 statistics['TESTS_ERROR'] += 1
123 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
125 statistics['TESTS_EXPECTED_FAIL'] += 1
126 msg_ops.addExpectedFailure(test, remote_error)
127 elif result in ("uxsuccess", ):
129 test = open_tests.pop(testname)
131 statistics['TESTS_ERROR'] += 1
133 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
135 statistics['TESTS_UNEXPECTED_OK'] += 1
136 msg_ops.addUnexpectedSuccess(test)
138 elif result in ("failure", "fail"):
140 test = open_tests.pop(testname)
142 statistics['TESTS_ERROR'] += 1
144 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
146 statistics['TESTS_UNEXPECTED_FAIL'] += 1
148 msg_ops.addFailure(test, remote_error)
149 elif result == "skip":
150 statistics['TESTS_SKIP'] += 1
151 # Allow tests to be skipped without prior announcement of test
153 test = open_tests.pop(testname)
155 test = subunit.RemotedTestCase(testname)
156 msg_ops.addSkip(test, reason)
157 elif result == "error":
158 statistics['TESTS_ERROR'] += 1
161 test = open_tests.pop(testname)
163 test = subunit.RemotedTestCase(testname)
164 msg_ops.addError(test, remote_error)
165 elif result == "skip-testsuite":
166 msg_ops.skip_testsuite(testname)
167 elif result == "testsuite-success":
168 msg_ops.end_testsuite(testname, "success", reason)
169 elif result == "testsuite-failure":
170 msg_ops.end_testsuite(testname, "failure", reason)
172 elif result == "testsuite-xfail":
173 msg_ops.end_testsuite(testname, "xfail", reason)
174 elif result == "testsuite-uxsuccess":
175 msg_ops.end_testsuite(testname, "uxsuccess", reason)
177 elif result == "testsuite-error":
178 msg_ops.end_testsuite(testname, "error", reason)
181 raise AssertionError("Recognized but unhandled result %r" %
183 elif command == "testsuite":
184 msg_ops.start_testsuite(arg.strip())
185 elif command == "progress":
188 msg_ops.progress(None, subunit.PROGRESS_POP)
190 msg_ops.progress(None, subunit.PROGRESS_PUSH)
192 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
194 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
196 msg_ops.output_msg(l)
199 test = subunit.RemotedTestCase(open_tests.popitem()[1])
200 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
201 statistics['TESTS_ERROR'] += 1
207 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
209 def progress(self, count, whence):
210 if whence == subunit.PROGRESS_POP:
211 self._stream.write("progress: pop\n")
212 elif whence == subunit.PROGRESS_PUSH:
213 self._stream.write("progress: push\n")
214 elif whence == subunit.PROGRESS_SET:
215 self._stream.write("progress: %d\n" % count)
216 elif whence == subunit.PROGRESS_CUR:
217 raise NotImplementedError
219 # The following are Samba extensions:
220 def start_testsuite(self, name):
221 self._stream.write("testsuite: %s\n" % name)
223 def skip_testsuite(self, name, reason=None):
225 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
227 self._stream.write("skip-testsuite: %s\n" % name)
229 def end_testsuite(self, name, result, reason=None):
231 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
233 self._stream.write("testsuite-%s: %s\n" % (result, name))
235 def output_msg(self, msg):
236 self._stream.write(msg)
239 def read_test_regexes(*names):
243 # if we are given a directory, we read all the files it contains
244 # (except the ones that end with "~").
245 if os.path.isdir(name):
246 files.extend([os.path.join(name, x)
247 for x in os.listdir(name)
252 for filename in files:
253 f = open(filename, 'r')
257 if l == "" or l[0] == "#":
260 (regex, reason) = l.split("#", 1)
261 ret[regex.strip()] = reason.strip()
269 def find_in_list(regexes, fullname):
270 for regex, reason in regexes.items():
271 if re.match(regex, fullname):
278 class ImmediateFail(Exception):
279 """Raised to abort immediately."""
282 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
285 class FilterOps(unittest.TestResult):
287 def control_msg(self, msg):
288 pass # We regenerate control messages, so ignore this
290 def time(self, time):
293 def progress(self, delta, whence):
294 self._ops.progress(delta, whence)
296 def output_msg(self, msg):
297 if self.output is None:
298 sys.stdout.write(msg)
302 def startTest(self, test):
303 self.seen_output = True
304 test = self._add_prefix(test)
305 if self.strip_ok_output:
308 self._ops.startTest(test)
310 def _add_prefix(self, test):
311 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
313 def addError(self, test, err=None):
314 test = self._add_prefix(test)
315 self.error_added += 1
316 self.total_error += 1
317 self._ops.addError(test, err)
319 if self.fail_immediately:
320 raise ImmediateFail()
322 def addSkip(self, test, reason=None):
323 self.seen_output = True
324 test = self._add_prefix(test)
325 self._ops.addSkip(test, reason)
328 def addExpectedFailure(self, test, err=None):
329 test = self._add_prefix(test)
330 self._ops.addExpectedFailure(test, err)
333 def addUnexpectedSuccess(self, test):
334 test = self._add_prefix(test)
335 self.uxsuccess_added += 1
336 self.total_uxsuccess += 1
337 self._ops.addUnexpectedSuccess(test)
339 self._ops.output_msg(self.output)
341 if self.fail_immediately:
342 raise ImmediateFail()
344 def addFailure(self, test, err=None):
345 test = self._add_prefix(test)
346 xfail_reason = find_in_list(self.expected_failures, test.id())
347 if xfail_reason is None:
348 xfail_reason = find_in_list(self.flapping, test.id())
349 if xfail_reason is not None:
350 self.xfail_added += 1
351 self.total_xfail += 1
352 self._ops.addExpectedFailure(test, err)
356 self._ops.addFailure(test, err)
358 self._ops.output_msg(self.output)
359 if self.fail_immediately:
360 raise ImmediateFail()
363 def addSuccess(self, test):
364 test = self._add_prefix(test)
365 xfail_reason = find_in_list(self.expected_failures, test.id())
366 if xfail_reason is not None:
367 self.uxsuccess_added += 1
368 self.total_uxsuccess += 1
369 self._ops.addUnexpectedSuccess(test)
371 self._ops.output_msg(self.output)
372 if self.fail_immediately:
373 raise ImmediateFail()
375 self._ops.addSuccess(test)
378 def skip_testsuite(self, name, reason=None):
379 self._ops.skip_testsuite(name, reason)
381 def start_testsuite(self, name):
382 self._ops.start_testsuite(name)
386 self.uxsuccess_added = 0
388 def end_testsuite(self, name, result, reason=None):
391 if self.xfail_added > 0:
393 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
396 if xfail and result in ("fail", "failure"):
399 if self.uxsuccess_added > 0 and result != "uxsuccess":
402 reason = "Subunit/Filter Reason"
403 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
405 if self.fail_added > 0 and result != "failure":
408 reason = "Subunit/Filter Reason"
409 reason += "\n failures[%d]" % self.fail_added
411 if self.error_added > 0 and result != "error":
414 reason = "Subunit/Filter Reason"
415 reason += "\n errors[%d]" % self.error_added
417 self._ops.end_testsuite(name, result, reason)
418 if result not in ("success", "xfail"):
420 self._ops.output_msg(self.output)
421 if self.fail_immediately:
422 raise ImmediateFail()
425 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
426 strip_ok_output=False, fail_immediately=False,
429 self.seen_output = False
433 if expected_failures is not None:
434 self.expected_failures = expected_failures
436 self.expected_failures = {}
437 if flapping is not None:
438 self.flapping = flapping
441 self.strip_ok_output = strip_ok_output
444 self.uxsuccess_added = 0
448 self.total_uxsuccess = 0
450 self.fail_immediately = fail_immediately
453 class PerfFilterOps(unittest.TestResult):
455 def progress(self, delta, whence):
458 def output_msg(self, msg):
461 def control_msg(self, msg):
464 def skip_testsuite(self, name, reason=None):
465 self._ops.skip_testsuite(name, reason)
467 def start_testsuite(self, name):
468 self.suite_has_time = False
470 def end_testsuite(self, name, result, reason=None):
473 def _add_prefix(self, test):
474 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
476 def time(self, time):
477 self.latest_time = time
478 #self._ops.output_msg("found time %s\n" % time)
479 self.suite_has_time = True
482 if self.suite_has_time:
483 return self.latest_time
484 return datetime.datetime.utcnow()
486 def startTest(self, test):
487 self.seen_output = True
488 test = self._add_prefix(test)
489 self.starts[test.id()] = self.get_time()
491 def addSuccess(self, test):
492 test = self._add_prefix(test)
494 if tid not in self.starts:
495 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
496 delta = self.get_time() - self.starts[tid]
497 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
499 def addFailure(self, test, err=''):
501 delta = self.get_time() - self.starts[tid]
502 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
503 (tid, delta.total_seconds(), err))
505 def addError(self, test, err=''):
507 delta = self.get_time() - self.starts[tid]
508 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
509 (tid, delta.total_seconds(), err))
511 def __init__(self, out, prefix='', suffix=''):
513 self.prefix = prefix or ''
514 self.suffix = suffix or ''
516 self.seen_output = False
517 self.suite_has_time = False
520 class PlainFormatter(TestsuiteEnabledTestResult):
522 def __init__(self, verbose, immediate, statistics,
524 super(PlainFormatter, self).__init__()
525 self.verbose = verbose
526 self.immediate = immediate
527 self.statistics = statistics
528 self.start_time = None
529 self.test_output = {}
530 self.suitesfailed = []
535 self._progress_level = 0
536 self.totalsuites = totaltests
537 self.last_time = None
540 def _format_time(delta):
541 minutes, seconds = divmod(delta.seconds, 60)
542 hours, minutes = divmod(minutes, 60)
547 ret += "%dm" % minutes
548 ret += "%ds" % seconds
551 def progress(self, offset, whence):
552 if whence == subunit.PROGRESS_POP:
553 self._progress_level -= 1
554 elif whence == subunit.PROGRESS_PUSH:
555 self._progress_level += 1
556 elif whence == subunit.PROGRESS_SET:
557 if self._progress_level == 0:
558 self.totalsuites = offset
559 elif whence == subunit.PROGRESS_CUR:
560 raise NotImplementedError
563 if self.start_time is None:
567 def start_testsuite(self, name):
572 self.test_output[name] = ""
574 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
575 self.statistics['TESTS_EXPECTED_FAIL'] +
576 self.statistics['TESTS_ERROR'] +
577 self.statistics['TESTS_UNEXPECTED_FAIL'] +
578 self.statistics['TESTS_UNEXPECTED_OK'])
580 out = "[%d(%d)" % (self.index, total_tests)
581 if self.totalsuites is not None:
582 out += "/%d" % self.totalsuites
583 if self.start_time is not None:
584 out += " at " + self._format_time(self.last_time - self.start_time)
585 if self.suitesfailed:
586 out += ", %d errors" % (len(self.suitesfailed),)
589 sys.stdout.write(out + "\n")
591 sys.stdout.write(out + ": ")
593 def output_msg(self, output):
595 sys.stdout.write(output)
596 elif self.name is not None:
597 self.test_output[self.name] += output
599 sys.stdout.write(output)
601 def control_msg(self, output):
604 def end_testsuite(self, name, result, reason):
608 if name not in self.test_output:
609 print("no output for name[%s]" % name)
611 if result in ("success", "xfail"):
614 self.output_msg("ERROR: Testsuite[%s]\n" % name)
615 if reason is not None:
616 self.output_msg("REASON: %s\n" % (reason,))
617 self.suitesfailed.append(name)
618 if self.immediate and not self.verbose and name in self.test_output:
619 out += self.test_output[name]
622 if not self.immediate:
626 out += " " + result.upper() + "\n"
628 sys.stdout.write(out)
630 def startTest(self, test):
633 def addSuccess(self, test):
634 self.end_test(test.id(), "success", False)
636 def addError(self, test, err=None):
637 self.end_test(test.id(), "error", True, err)
639 def addFailure(self, test, err=None):
640 self.end_test(test.id(), "failure", True, err)
642 def addSkip(self, test, reason=None):
643 self.end_test(test.id(), "skip", False, reason)
645 def addExpectedFailure(self, test, err=None):
646 self.end_test(test.id(), "xfail", False, err)
648 def addUnexpectedSuccess(self, test):
649 self.end_test(test.id(), "uxsuccess", True)
651 def end_test(self, testname, result, unexpected, err=None):
653 self.test_output[self.name] = ""
654 if not self.immediate:
659 'success': '.'}.get(result, "?(%s)" % result))
662 if self.name not in self.test_output:
663 self.test_output[self.name] = ""
665 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
667 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
669 if self.immediate and not self.verbose:
670 sys.stdout.write(self.test_output[self.name])
671 self.test_output[self.name] = ""
673 if not self.immediate:
678 'success': 'S'}.get(result, "?"))
680 def write_summary(self, path):
683 if self.suitesfailed:
684 f.write("= Failed tests =\n")
686 for suite in self.suitesfailed:
687 f.write("== %s ==\n" % suite)
688 if suite in self.test_output:
689 f.write(self.test_output[suite] + "\n\n")
693 if not self.immediate and not self.verbose:
694 for suite in self.suitesfailed:
696 print("FAIL: %s" % suite)
697 if suite in self.test_output:
698 print(self.test_output[suite])
701 f.write("= Skipped tests =\n")
702 for reason in self.skips.keys():
703 f.write(reason + "\n")
704 for name in self.skips[reason]:
705 f.write("\t%s\n" % name)
709 if (not self.suitesfailed and
710 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
711 not self.statistics['TESTS_UNEXPECTED_OK'] and
712 not self.statistics['TESTS_ERROR']):
713 ok = (self.statistics['TESTS_EXPECTED_OK'] +
714 self.statistics['TESTS_EXPECTED_FAIL'])
715 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
717 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
718 self.statistics['TESTS_UNEXPECTED_FAIL'],
719 self.statistics['TESTS_ERROR'],
720 self.statistics['TESTS_UNEXPECTED_OK'],
721 len(self.suitesfailed)))
723 def skip_testsuite(self, name, reason="UNKNOWN"):
724 self.skips.setdefault(reason, []).append(name)
726 self.totalsuites -= 1