tests/replica_sync_rodc: Test conflict handling on an RODC
[samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import datetime
21 import re
22 import sys
23 import os
24 from samba import subunit
25 from samba.subunit.run import TestProtocolClient
26 from samba.subunit import iso8601
27 import unittest
28
29 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
30                      'knownfail', 'error', 'xfail', 'skip-testsuite',
31                      'testsuite-failure', 'testsuite-xfail',
32                      'testsuite-success', 'testsuite-error',
33                      'uxsuccess', 'testsuite-uxsuccess'])
34
35 class TestsuiteEnabledTestResult(unittest.TestResult):
36
37     def start_testsuite(self, name):
38         raise NotImplementedError(self.start_testsuite)
39
40
41 def parse_results(msg_ops, statistics, fh):
42     exitcode = 0
43     open_tests = {}
44
45     while fh:
46         l = fh.readline()
47         if l == "":
48             break
49         parts = l.split(None, 1)
50         if not len(parts) == 2 or not l.startswith(parts[0]):
51             msg_ops.output_msg(l)
52             continue
53         command = parts[0].rstrip(":")
54         arg = parts[1]
55         if command in ("test", "testing"):
56             msg_ops.control_msg(l)
57             name = arg.rstrip()
58             test = subunit.RemotedTestCase(name)
59             if name in open_tests:
60                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
61             msg_ops.startTest(test)
62             open_tests[name] = test
63         elif command == "time":
64             msg_ops.control_msg(l)
65             try:
66                 dt = iso8601.parse_date(arg.rstrip("\n"))
67             except TypeError as e:
68                 print "Unable to parse time line: %s" % arg.rstrip("\n")
69             else:
70                 msg_ops.time(dt)
71         elif command in VALID_RESULTS:
72             msg_ops.control_msg(l)
73             result = command
74             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
75             (testname, hasreason) = (grp.group(1), grp.group(2))
76             if hasreason:
77                 reason = ""
78                 # reason may be specified in next lines
79                 terminated = False
80                 while fh:
81                     l = fh.readline()
82                     if l == "":
83                         break
84                     msg_ops.control_msg(l)
85                     if l == "]\n":
86                         terminated = True
87                         break
88                     else:
89                         reason += l
90
91                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
92
93                 if not terminated:
94                     statistics['TESTS_ERROR']+=1
95                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
96                     return 1
97             else:
98                 reason = None
99                 remote_error = subunit.RemoteError(u"No reason specified")
100             if result in ("success", "successful"):
101                 try:
102                     test = open_tests.pop(testname)
103                 except KeyError:
104                     statistics['TESTS_ERROR']+=1
105                     exitcode = 1
106                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
107                 else:
108                     statistics['TESTS_EXPECTED_OK']+=1
109                     msg_ops.addSuccess(test)
110             elif result in ("xfail", "knownfail"):
111                 try:
112                     test = open_tests.pop(testname)
113                 except KeyError:
114                     statistics['TESTS_ERROR']+=1
115                     exitcode = 1
116                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
117                 else:
118                     statistics['TESTS_EXPECTED_FAIL']+=1
119                     msg_ops.addExpectedFailure(test, remote_error)
120             elif result in ("uxsuccess", ):
121                 try:
122                     test = open_tests.pop(testname)
123                 except KeyError:
124                     statistics['TESTS_ERROR']+=1
125                     exitcode = 1
126                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
127                 else:
128                     statistics['TESTS_UNEXPECTED_OK']+=1
129                     msg_ops.addUnexpectedSuccess(test)
130                     exitcode = 1
131             elif result in ("failure", "fail"):
132                 try:
133                     test = open_tests.pop(testname)
134                 except KeyError:
135                     statistics['TESTS_ERROR']+=1
136                     exitcode = 1
137                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
138                 else:
139                     statistics['TESTS_UNEXPECTED_FAIL']+=1
140                     exitcode = 1
141                     msg_ops.addFailure(test, remote_error)
142             elif result == "skip":
143                 statistics['TESTS_SKIP']+=1
144                 # Allow tests to be skipped without prior announcement of test
145                 try:
146                     test = open_tests.pop(testname)
147                 except KeyError:
148                     test = subunit.RemotedTestCase(testname)
149                 msg_ops.addSkip(test, reason)
150             elif result == "error":
151                 statistics['TESTS_ERROR']+=1
152                 exitcode = 1
153                 try:
154                     test = open_tests.pop(testname)
155                 except KeyError:
156                     test = subunit.RemotedTestCase(testname)
157                 msg_ops.addError(test, remote_error)
158             elif result == "skip-testsuite":
159                 msg_ops.skip_testsuite(testname)
160             elif result == "testsuite-success":
161                 msg_ops.end_testsuite(testname, "success", reason)
162             elif result == "testsuite-failure":
163                 msg_ops.end_testsuite(testname, "failure", reason)
164                 exitcode = 1
165             elif result == "testsuite-xfail":
166                 msg_ops.end_testsuite(testname, "xfail", reason)
167             elif result == "testsuite-uxsuccess":
168                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
169                 exitcode = 1
170             elif result == "testsuite-error":
171                 msg_ops.end_testsuite(testname, "error", reason)
172                 exitcode = 1
173             else:
174                 raise AssertionError("Recognized but unhandled result %r" %
175                     result)
176         elif command == "testsuite":
177             msg_ops.start_testsuite(arg.strip())
178         elif command == "progress":
179             arg = arg.strip()
180             if arg == "pop":
181                 msg_ops.progress(None, subunit.PROGRESS_POP)
182             elif arg == "push":
183                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
184             elif arg[0] in '+-':
185                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
186             else:
187                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
188         else:
189             msg_ops.output_msg(l)
190
191     while open_tests:
192         test = subunit.RemotedTestCase(open_tests.popitem()[1])
193         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
194         statistics['TESTS_ERROR']+=1
195         exitcode = 1
196
197     return exitcode
198
199
200 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
201
202     def progress(self, count, whence):
203         if whence == subunit.PROGRESS_POP:
204             self._stream.write("progress: pop\n")
205         elif whence == subunit.PROGRESS_PUSH:
206             self._stream.write("progress: push\n")
207         elif whence == subunit.PROGRESS_SET:
208             self._stream.write("progress: %d\n" % count)
209         elif whence == subunit.PROGRESS_CUR:
210             raise NotImplementedError
211
212     # The following are Samba extensions:
213     def start_testsuite(self, name):
214         self._stream.write("testsuite: %s\n" % name)
215
216     def skip_testsuite(self, name, reason=None):
217         if reason:
218             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
219         else:
220             self._stream.write("skip-testsuite: %s\n" % name)
221
222     def end_testsuite(self, name, result, reason=None):
223         if reason:
224             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
225         else:
226             self._stream.write("testsuite-%s: %s\n" % (result, name))
227
228     def output_msg(self, msg):
229         self._stream.write(msg)
230
231
232 def read_test_regexes(*names):
233     ret = {}
234     files = []
235     for name in names:
236         # if we are given a directory, we read all the files it contains
237         # (except the ones that end with "~").
238         if os.path.isdir(name):
239             files.extend([os.path.join(name, x)
240                           for x in os.listdir(name)
241                           if x[-1] != '~'])
242         else:
243             files.append(name)
244
245     for filename in files:
246         f = open(filename, 'r')
247         try:
248             for l in f:
249                 l = l.strip()
250                 if l == "" or l[0] == "#":
251                     continue
252                 if "#" in l:
253                     (regex, reason) = l.split("#", 1)
254                     ret[regex.strip()] = reason.strip()
255                 else:
256                     ret[l] = None
257         finally:
258             f.close()
259     return ret
260
261
262 def find_in_list(regexes, fullname):
263     for regex, reason in regexes.iteritems():
264         if re.match(regex, fullname):
265             if reason is None:
266                 return ""
267             return reason
268     return None
269
270
271 class ImmediateFail(Exception):
272     """Raised to abort immediately."""
273
274     def __init__(self):
275         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
276
277
278 class FilterOps(unittest.TestResult):
279
280     def control_msg(self, msg):
281         pass # We regenerate control messages, so ignore this
282
283     def time(self, time):
284         self._ops.time(time)
285
286     def progress(self, delta, whence):
287         self._ops.progress(delta, whence)
288
289     def output_msg(self, msg):
290         if self.output is None:
291             sys.stdout.write(msg)
292         else:
293             self.output+=msg
294
295     def startTest(self, test):
296         self.seen_output = True
297         test = self._add_prefix(test)
298         if self.strip_ok_output:
299            self.output = ""
300
301         self._ops.startTest(test)
302
303     def _add_prefix(self, test):
304         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
305
306     def addError(self, test, err=None):
307         test = self._add_prefix(test)
308         self.error_added+=1
309         self.total_error+=1
310         self._ops.addError(test, err)
311         self.output = None
312         if self.fail_immediately:
313             raise ImmediateFail()
314
315     def addSkip(self, test, reason=None):
316         self.seen_output = True
317         test = self._add_prefix(test)
318         self._ops.addSkip(test, reason)
319         self.output = None
320
321     def addExpectedFailure(self, test, err=None):
322         test = self._add_prefix(test)
323         self._ops.addExpectedFailure(test, err)
324         self.output = None
325
326     def addUnexpectedSuccess(self, test):
327         test = self._add_prefix(test)
328         self.uxsuccess_added+=1
329         self.total_uxsuccess+=1
330         self._ops.addUnexpectedSuccess(test)
331         if self.output:
332             self._ops.output_msg(self.output)
333         self.output = None
334         if self.fail_immediately:
335             raise ImmediateFail()
336
337     def addFailure(self, test, err=None):
338         test = self._add_prefix(test)
339         xfail_reason = find_in_list(self.expected_failures, test.id())
340         if xfail_reason is None:
341             xfail_reason = find_in_list(self.flapping, test.id())
342         if xfail_reason is not None:
343             self.xfail_added+=1
344             self.total_xfail+=1
345             self._ops.addExpectedFailure(test, err)
346         else:
347             self.fail_added+=1
348             self.total_fail+=1
349             self._ops.addFailure(test, err)
350             if self.output:
351                 self._ops.output_msg(self.output)
352             if self.fail_immediately:
353                 raise ImmediateFail()
354         self.output = None
355
356     def addSuccess(self, test):
357         test = self._add_prefix(test)
358         xfail_reason = find_in_list(self.expected_failures, test.id())
359         if xfail_reason is not None:
360             self.uxsuccess_added += 1
361             self.total_uxsuccess += 1
362             self._ops.addUnexpectedSuccess(test)
363             if self.output:
364                 self._ops.output_msg(self.output)
365             if self.fail_immediately:
366                 raise ImmediateFail()
367         else:
368             self._ops.addSuccess(test)
369         self.output = None
370
371     def skip_testsuite(self, name, reason=None):
372         self._ops.skip_testsuite(name, reason)
373
374     def start_testsuite(self, name):
375         self._ops.start_testsuite(name)
376         self.error_added = 0
377         self.fail_added = 0
378         self.xfail_added = 0
379         self.uxsuccess_added = 0
380
381     def end_testsuite(self, name, result, reason=None):
382         xfail = False
383
384         if self.xfail_added > 0:
385             xfail = True
386         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
387             xfail = False
388
389         if xfail and result in ("fail", "failure"):
390             result = "xfail"
391
392         if self.uxsuccess_added > 0 and result != "uxsuccess":
393             result = "uxsuccess"
394             if reason is None:
395                 reason = "Subunit/Filter Reason"
396             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
397
398         if self.fail_added > 0 and result != "failure":
399             result = "failure"
400             if reason is None:
401                 reason = "Subunit/Filter Reason"
402             reason += "\n failures[%d]" % self.fail_added
403
404         if self.error_added > 0 and result != "error":
405             result = "error"
406             if reason is None:
407                 reason = "Subunit/Filter Reason"
408             reason += "\n errors[%d]" % self.error_added
409
410         self._ops.end_testsuite(name, result, reason)
411         if result not in ("success", "xfail"):
412             if self.output:
413                 self._ops.output_msg(self.output)
414             if self.fail_immediately:
415                 raise ImmediateFail()
416         self.output = None
417
418     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
419                  strip_ok_output=False, fail_immediately=False,
420                  flapping=None):
421         self._ops = out
422         self.seen_output = False
423         self.output = None
424         self.prefix = prefix
425         self.suffix = suffix
426         if expected_failures is not None:
427             self.expected_failures = expected_failures
428         else:
429             self.expected_failures = {}
430         if flapping is not None:
431             self.flapping = flapping
432         else:
433             self.flapping = {}
434         self.strip_ok_output = strip_ok_output
435         self.xfail_added = 0
436         self.fail_added = 0
437         self.uxsuccess_added = 0
438         self.total_xfail = 0
439         self.total_error = 0
440         self.total_fail = 0
441         self.total_uxsuccess = 0
442         self.error_added = 0
443         self.fail_immediately = fail_immediately
444
445
446 class PerfFilterOps(unittest.TestResult):
447
448     def progress(self, delta, whence):
449         pass
450
451     def output_msg(self, msg):
452         pass
453
454     def control_msg(self, msg):
455         pass
456
457     def skip_testsuite(self, name, reason=None):
458         self._ops.skip_testsuite(name, reason)
459
460     def start_testsuite(self, name):
461         self.suite_has_time = False
462
463     def end_testsuite(self, name, result, reason=None):
464         pass
465
466     def _add_prefix(self, test):
467         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
468
469     def time(self, time):
470         self.latest_time = time
471         #self._ops.output_msg("found time %s\n" % time)
472         self.suite_has_time = True
473
474     def get_time(self):
475         if self.suite_has_time:
476             return self.latest_time
477         return datetime.datetime.utcnow()
478
479     def startTest(self, test):
480         self.seen_output = True
481         test = self._add_prefix(test)
482         self.starts[test.id()] = self.get_time()
483
484     def addSuccess(self, test):
485         test = self._add_prefix(test)
486         tid = test.id()
487         if tid not in self.starts:
488             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
489         delta = self.get_time() - self.starts[tid]
490         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
491
492     def addFailure(self, test, err=''):
493         tid = test.id()
494         delta = self.get_time() - self.starts[tid]
495         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
496                              (tid, delta.total_seconds(), err))
497
498     def addError(self, test, err=''):
499         tid = test.id()
500         delta = self.get_time() - self.starts[tid]
501         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
502                              (tid, delta.total_seconds(), err))
503
504     def __init__(self, out, prefix='', suffix=''):
505         self._ops = out
506         self.prefix = prefix or ''
507         self.suffix = suffix or ''
508         self.starts = {}
509         self.seen_output = False
510         self.suite_has_time = False
511
512
513 class PlainFormatter(TestsuiteEnabledTestResult):
514
515     def __init__(self, verbose, immediate, statistics,
516             totaltests=None):
517         super(PlainFormatter, self).__init__()
518         self.verbose = verbose
519         self.immediate = immediate
520         self.statistics = statistics
521         self.start_time = None
522         self.test_output = {}
523         self.suitesfailed = []
524         self.suites_ok = 0
525         self.skips = {}
526         self.index = 0
527         self.name = None
528         self._progress_level = 0
529         self.totalsuites = totaltests
530         self.last_time = None
531
532     @staticmethod
533     def _format_time(delta):
534         minutes, seconds = divmod(delta.seconds, 60)
535         hours, minutes = divmod(minutes, 60)
536         ret = ""
537         if hours:
538             ret += "%dh" % hours
539         if minutes:
540             ret += "%dm" % minutes
541         ret += "%ds" % seconds
542         return ret
543
544     def progress(self, offset, whence):
545         if whence == subunit.PROGRESS_POP:
546             self._progress_level -= 1
547         elif whence == subunit.PROGRESS_PUSH:
548             self._progress_level += 1
549         elif whence == subunit.PROGRESS_SET:
550             if self._progress_level == 0:
551                 self.totalsuites = offset
552         elif whence == subunit.PROGRESS_CUR:
553             raise NotImplementedError
554
555     def time(self, dt):
556         if self.start_time is None:
557             self.start_time = dt
558         self.last_time = dt
559
560     def start_testsuite(self, name):
561         self.index += 1
562         self.name = name
563
564         if not self.verbose:
565             self.test_output[name] = ""
566
567         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
568                        self.statistics['TESTS_EXPECTED_FAIL'] +
569                        self.statistics['TESTS_ERROR'] +
570                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
571                        self.statistics['TESTS_UNEXPECTED_OK'])
572
573         out = "[%d(%d)" % (self.index, total_tests)
574         if self.totalsuites is not None:
575             out += "/%d" % self.totalsuites
576         if self.start_time is not None:
577             out += " at " + self._format_time(self.last_time - self.start_time)
578         if self.suitesfailed:
579             out += ", %d errors" % (len(self.suitesfailed),)
580         out += "] %s" % name
581         if self.immediate:
582             sys.stdout.write(out + "\n")
583         else:
584             sys.stdout.write(out + ": ")
585
586     def output_msg(self, output):
587         if self.verbose:
588             sys.stdout.write(output)
589         elif self.name is not None:
590             self.test_output[self.name] += output
591         else:
592             sys.stdout.write(output)
593
594     def control_msg(self, output):
595         pass
596
597     def end_testsuite(self, name, result, reason):
598         out = ""
599         unexpected = False
600
601         if not name in self.test_output:
602             print "no output for name[%s]" % name
603
604         if result in ("success", "xfail"):
605             self.suites_ok+=1
606         else:
607             self.output_msg("ERROR: Testsuite[%s]\n" % name)
608             if reason is not None:
609                 self.output_msg("REASON: %s\n" % (reason,))
610             self.suitesfailed.append(name)
611             if self.immediate and not self.verbose and name in self.test_output:
612                 out += self.test_output[name]
613             unexpected = True
614
615         if not self.immediate:
616             if not unexpected:
617                 out += " ok\n"
618             else:
619                 out += " " + result.upper() + "\n"
620
621         sys.stdout.write(out)
622
623     def startTest(self, test):
624         pass
625
626     def addSuccess(self, test):
627         self.end_test(test.id(), "success", False)
628
629     def addError(self, test, err=None):
630         self.end_test(test.id(), "error", True, err)
631
632     def addFailure(self, test, err=None):
633         self.end_test(test.id(), "failure", True, err)
634
635     def addSkip(self, test, reason=None):
636         self.end_test(test.id(), "skip", False, reason)
637
638     def addExpectedFailure(self, test, err=None):
639         self.end_test(test.id(), "xfail", False, err)
640
641     def addUnexpectedSuccess(self, test):
642         self.end_test(test.id(), "uxsuccess", True)
643
644     def end_test(self, testname, result, unexpected, err=None):
645         if not unexpected:
646             self.test_output[self.name] = ""
647             if not self.immediate:
648                 sys.stdout.write({
649                     'failure': 'f',
650                     'xfail': 'X',
651                     'skip': 's',
652                     'success': '.'}.get(result, "?(%s)" % result))
653             return
654
655         if not self.name in self.test_output:
656             self.test_output[self.name] = ""
657
658         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
659         if err is not None:
660             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
661
662         if self.immediate and not self.verbose:
663             sys.stdout.write(self.test_output[self.name])
664             self.test_output[self.name] = ""
665
666         if not self.immediate:
667             sys.stdout.write({
668                'error': 'E',
669                'failure': 'F',
670                'uxsuccess': 'U',
671                'success': 'S'}.get(result, "?"))
672
673     def write_summary(self, path):
674         f = open(path, 'w+')
675
676         if self.suitesfailed:
677             f.write("= Failed tests =\n")
678
679             for suite in self.suitesfailed:
680                 f.write("== %s ==\n" % suite)
681                 if suite in self.test_output:
682                     f.write(self.test_output[suite]+"\n\n")
683
684             f.write("\n")
685
686         if not self.immediate and not self.verbose:
687             for suite in self.suitesfailed:
688                 print "=" * 78
689                 print "FAIL: %s" % suite
690                 if suite in self.test_output:
691                     print self.test_output[suite]
692                 print ""
693
694         f.write("= Skipped tests =\n")
695         for reason in self.skips.keys():
696             f.write(reason + "\n")
697             for name in self.skips[reason]:
698                 f.write("\t%s\n" % name)
699             f.write("\n")
700         f.close()
701
702         if (not self.suitesfailed and
703             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
704             not self.statistics['TESTS_UNEXPECTED_OK'] and
705             not self.statistics['TESTS_ERROR']):
706             ok = (self.statistics['TESTS_EXPECTED_OK'] +
707                   self.statistics['TESTS_EXPECTED_FAIL'])
708             print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
709         else:
710             print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
711                 self.statistics['TESTS_UNEXPECTED_FAIL'],
712                 self.statistics['TESTS_ERROR'],
713                 self.statistics['TESTS_UNEXPECTED_OK'],
714                 len(self.suitesfailed))
715
716     def skip_testsuite(self, name, reason="UNKNOWN"):
717         self.skips.setdefault(reason, []).append(name)
718         if self.totalsuites:
719             self.totalsuites-=1