python selftest: enable samba.tests.s3param to run with python3
[samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 from __future__ import print_function
19 __all__ = ['parse_results']
20
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
29
30 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
31                      'knownfail', 'error', 'xfail', 'skip-testsuite',
32                      'testsuite-failure', 'testsuite-xfail',
33                      'testsuite-success', 'testsuite-error',
34                      'uxsuccess', 'testsuite-uxsuccess'])
35
36 class TestsuiteEnabledTestResult(unittest.TestResult):
37
38     def start_testsuite(self, name):
39         raise NotImplementedError(self.start_testsuite)
40
41
42 def parse_results(msg_ops, statistics, fh):
43     exitcode = 0
44     open_tests = {}
45
46     while fh:
47         l = fh.readline()
48         if l == "":
49             break
50         parts = l.split(None, 1)
51         if not len(parts) == 2 or not l.startswith(parts[0]):
52             msg_ops.output_msg(l)
53             continue
54         command = parts[0].rstrip(":")
55         arg = parts[1]
56         if command in ("test", "testing"):
57             msg_ops.control_msg(l)
58             name = arg.rstrip()
59             test = subunit.RemotedTestCase(name)
60             if name in open_tests:
61                 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
62             msg_ops.startTest(test)
63             open_tests[name] = test
64         elif command == "time":
65             msg_ops.control_msg(l)
66             try:
67                 dt = iso8601.parse_date(arg.rstrip("\n"))
68             except TypeError as e:
69                 print("Unable to parse time line: %s" % arg.rstrip("\n"))
70             else:
71                 msg_ops.time(dt)
72         elif command in VALID_RESULTS:
73             msg_ops.control_msg(l)
74             result = command
75             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
76             (testname, hasreason) = (grp.group(1), grp.group(2))
77             if hasreason:
78                 reason = ""
79                 # reason may be specified in next lines
80                 terminated = False
81                 while fh:
82                     l = fh.readline()
83                     if l == "":
84                         break
85                     msg_ops.control_msg(l)
86                     if l[-2:] == "]\n":
87                         reason += l[:-2]
88                         terminated = True
89                         break
90                     else:
91                         reason += l
92
93                 remote_error = subunit.RemoteError(reason.decode("utf-8"))
94
95                 if not terminated:
96                     statistics['TESTS_ERROR']+=1
97                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
98                     return 1
99             else:
100                 reason = None
101                 remote_error = subunit.RemoteError(u"No reason specified")
102             if result in ("success", "successful"):
103                 try:
104                     test = open_tests.pop(testname)
105                 except KeyError:
106                     statistics['TESTS_ERROR']+=1
107                     exitcode = 1
108                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
109                 else:
110                     statistics['TESTS_EXPECTED_OK']+=1
111                     msg_ops.addSuccess(test)
112             elif result in ("xfail", "knownfail"):
113                 try:
114                     test = open_tests.pop(testname)
115                 except KeyError:
116                     statistics['TESTS_ERROR']+=1
117                     exitcode = 1
118                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
119                 else:
120                     statistics['TESTS_EXPECTED_FAIL']+=1
121                     msg_ops.addExpectedFailure(test, remote_error)
122             elif result in ("uxsuccess", ):
123                 try:
124                     test = open_tests.pop(testname)
125                 except KeyError:
126                     statistics['TESTS_ERROR']+=1
127                     exitcode = 1
128                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
129                 else:
130                     statistics['TESTS_UNEXPECTED_OK']+=1
131                     msg_ops.addUnexpectedSuccess(test)
132                     exitcode = 1
133             elif result in ("failure", "fail"):
134                 try:
135                     test = open_tests.pop(testname)
136                 except KeyError:
137                     statistics['TESTS_ERROR']+=1
138                     exitcode = 1
139                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
140                 else:
141                     statistics['TESTS_UNEXPECTED_FAIL']+=1
142                     exitcode = 1
143                     msg_ops.addFailure(test, remote_error)
144             elif result == "skip":
145                 statistics['TESTS_SKIP']+=1
146                 # Allow tests to be skipped without prior announcement of test
147                 try:
148                     test = open_tests.pop(testname)
149                 except KeyError:
150                     test = subunit.RemotedTestCase(testname)
151                 msg_ops.addSkip(test, reason)
152             elif result == "error":
153                 statistics['TESTS_ERROR']+=1
154                 exitcode = 1
155                 try:
156                     test = open_tests.pop(testname)
157                 except KeyError:
158                     test = subunit.RemotedTestCase(testname)
159                 msg_ops.addError(test, remote_error)
160             elif result == "skip-testsuite":
161                 msg_ops.skip_testsuite(testname)
162             elif result == "testsuite-success":
163                 msg_ops.end_testsuite(testname, "success", reason)
164             elif result == "testsuite-failure":
165                 msg_ops.end_testsuite(testname, "failure", reason)
166                 exitcode = 1
167             elif result == "testsuite-xfail":
168                 msg_ops.end_testsuite(testname, "xfail", reason)
169             elif result == "testsuite-uxsuccess":
170                 msg_ops.end_testsuite(testname, "uxsuccess", reason)
171                 exitcode = 1
172             elif result == "testsuite-error":
173                 msg_ops.end_testsuite(testname, "error", reason)
174                 exitcode = 1
175             else:
176                 raise AssertionError("Recognized but unhandled result %r" %
177                     result)
178         elif command == "testsuite":
179             msg_ops.start_testsuite(arg.strip())
180         elif command == "progress":
181             arg = arg.strip()
182             if arg == "pop":
183                 msg_ops.progress(None, subunit.PROGRESS_POP)
184             elif arg == "push":
185                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
186             elif arg[0] in '+-':
187                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
188             else:
189                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
190         else:
191             msg_ops.output_msg(l)
192
193     while open_tests:
194         test = subunit.RemotedTestCase(open_tests.popitem()[1])
195         msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
196         statistics['TESTS_ERROR']+=1
197         exitcode = 1
198
199     return exitcode
200
201
202 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
203
204     def progress(self, count, whence):
205         if whence == subunit.PROGRESS_POP:
206             self._stream.write("progress: pop\n")
207         elif whence == subunit.PROGRESS_PUSH:
208             self._stream.write("progress: push\n")
209         elif whence == subunit.PROGRESS_SET:
210             self._stream.write("progress: %d\n" % count)
211         elif whence == subunit.PROGRESS_CUR:
212             raise NotImplementedError
213
214     # The following are Samba extensions:
215     def start_testsuite(self, name):
216         self._stream.write("testsuite: %s\n" % name)
217
218     def skip_testsuite(self, name, reason=None):
219         if reason:
220             self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
221         else:
222             self._stream.write("skip-testsuite: %s\n" % name)
223
224     def end_testsuite(self, name, result, reason=None):
225         if reason:
226             self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
227         else:
228             self._stream.write("testsuite-%s: %s\n" % (result, name))
229
230     def output_msg(self, msg):
231         self._stream.write(msg)
232
233
234 def read_test_regexes(*names):
235     ret = {}
236     files = []
237     for name in names:
238         # if we are given a directory, we read all the files it contains
239         # (except the ones that end with "~").
240         if os.path.isdir(name):
241             files.extend([os.path.join(name, x)
242                           for x in os.listdir(name)
243                           if x[-1] != '~'])
244         else:
245             files.append(name)
246
247     for filename in files:
248         f = open(filename, 'r')
249         try:
250             for l in f:
251                 l = l.strip()
252                 if l == "" or l[0] == "#":
253                     continue
254                 if "#" in l:
255                     (regex, reason) = l.split("#", 1)
256                     ret[regex.strip()] = reason.strip()
257                 else:
258                     ret[l] = None
259         finally:
260             f.close()
261     return ret
262
263
264 def find_in_list(regexes, fullname):
265     for regex, reason in regexes.iteritems():
266         if re.match(regex, fullname):
267             if reason is None:
268                 return ""
269             return reason
270     return None
271
272
273 class ImmediateFail(Exception):
274     """Raised to abort immediately."""
275
276     def __init__(self):
277         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
278
279
280 class FilterOps(unittest.TestResult):
281
282     def control_msg(self, msg):
283         pass # We regenerate control messages, so ignore this
284
285     def time(self, time):
286         self._ops.time(time)
287
288     def progress(self, delta, whence):
289         self._ops.progress(delta, whence)
290
291     def output_msg(self, msg):
292         if self.output is None:
293             sys.stdout.write(msg)
294         else:
295             self.output+=msg
296
297     def startTest(self, test):
298         self.seen_output = True
299         test = self._add_prefix(test)
300         if self.strip_ok_output:
301            self.output = ""
302
303         self._ops.startTest(test)
304
305     def _add_prefix(self, test):
306         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
307
308     def addError(self, test, err=None):
309         test = self._add_prefix(test)
310         self.error_added+=1
311         self.total_error+=1
312         self._ops.addError(test, err)
313         self.output = None
314         if self.fail_immediately:
315             raise ImmediateFail()
316
317     def addSkip(self, test, reason=None):
318         self.seen_output = True
319         test = self._add_prefix(test)
320         self._ops.addSkip(test, reason)
321         self.output = None
322
323     def addExpectedFailure(self, test, err=None):
324         test = self._add_prefix(test)
325         self._ops.addExpectedFailure(test, err)
326         self.output = None
327
328     def addUnexpectedSuccess(self, test):
329         test = self._add_prefix(test)
330         self.uxsuccess_added+=1
331         self.total_uxsuccess+=1
332         self._ops.addUnexpectedSuccess(test)
333         if self.output:
334             self._ops.output_msg(self.output)
335         self.output = None
336         if self.fail_immediately:
337             raise ImmediateFail()
338
339     def addFailure(self, test, err=None):
340         test = self._add_prefix(test)
341         xfail_reason = find_in_list(self.expected_failures, test.id())
342         if xfail_reason is None:
343             xfail_reason = find_in_list(self.flapping, test.id())
344         if xfail_reason is not None:
345             self.xfail_added+=1
346             self.total_xfail+=1
347             self._ops.addExpectedFailure(test, err)
348         else:
349             self.fail_added+=1
350             self.total_fail+=1
351             self._ops.addFailure(test, err)
352             if self.output:
353                 self._ops.output_msg(self.output)
354             if self.fail_immediately:
355                 raise ImmediateFail()
356         self.output = None
357
358     def addSuccess(self, test):
359         test = self._add_prefix(test)
360         xfail_reason = find_in_list(self.expected_failures, test.id())
361         if xfail_reason is not None:
362             self.uxsuccess_added += 1
363             self.total_uxsuccess += 1
364             self._ops.addUnexpectedSuccess(test)
365             if self.output:
366                 self._ops.output_msg(self.output)
367             if self.fail_immediately:
368                 raise ImmediateFail()
369         else:
370             self._ops.addSuccess(test)
371         self.output = None
372
373     def skip_testsuite(self, name, reason=None):
374         self._ops.skip_testsuite(name, reason)
375
376     def start_testsuite(self, name):
377         self._ops.start_testsuite(name)
378         self.error_added = 0
379         self.fail_added = 0
380         self.xfail_added = 0
381         self.uxsuccess_added = 0
382
383     def end_testsuite(self, name, result, reason=None):
384         xfail = False
385
386         if self.xfail_added > 0:
387             xfail = True
388         if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
389             xfail = False
390
391         if xfail and result in ("fail", "failure"):
392             result = "xfail"
393
394         if self.uxsuccess_added > 0 and result != "uxsuccess":
395             result = "uxsuccess"
396             if reason is None:
397                 reason = "Subunit/Filter Reason"
398             reason += "\n uxsuccess[%d]" % self.uxsuccess_added
399
400         if self.fail_added > 0 and result != "failure":
401             result = "failure"
402             if reason is None:
403                 reason = "Subunit/Filter Reason"
404             reason += "\n failures[%d]" % self.fail_added
405
406         if self.error_added > 0 and result != "error":
407             result = "error"
408             if reason is None:
409                 reason = "Subunit/Filter Reason"
410             reason += "\n errors[%d]" % self.error_added
411
412         self._ops.end_testsuite(name, result, reason)
413         if result not in ("success", "xfail"):
414             if self.output:
415                 self._ops.output_msg(self.output)
416             if self.fail_immediately:
417                 raise ImmediateFail()
418         self.output = None
419
420     def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
421                  strip_ok_output=False, fail_immediately=False,
422                  flapping=None):
423         self._ops = out
424         self.seen_output = False
425         self.output = None
426         self.prefix = prefix
427         self.suffix = suffix
428         if expected_failures is not None:
429             self.expected_failures = expected_failures
430         else:
431             self.expected_failures = {}
432         if flapping is not None:
433             self.flapping = flapping
434         else:
435             self.flapping = {}
436         self.strip_ok_output = strip_ok_output
437         self.xfail_added = 0
438         self.fail_added = 0
439         self.uxsuccess_added = 0
440         self.total_xfail = 0
441         self.total_error = 0
442         self.total_fail = 0
443         self.total_uxsuccess = 0
444         self.error_added = 0
445         self.fail_immediately = fail_immediately
446
447
448 class PerfFilterOps(unittest.TestResult):
449
450     def progress(self, delta, whence):
451         pass
452
453     def output_msg(self, msg):
454         pass
455
456     def control_msg(self, msg):
457         pass
458
459     def skip_testsuite(self, name, reason=None):
460         self._ops.skip_testsuite(name, reason)
461
462     def start_testsuite(self, name):
463         self.suite_has_time = False
464
465     def end_testsuite(self, name, result, reason=None):
466         pass
467
468     def _add_prefix(self, test):
469         return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
470
471     def time(self, time):
472         self.latest_time = time
473         #self._ops.output_msg("found time %s\n" % time)
474         self.suite_has_time = True
475
476     def get_time(self):
477         if self.suite_has_time:
478             return self.latest_time
479         return datetime.datetime.utcnow()
480
481     def startTest(self, test):
482         self.seen_output = True
483         test = self._add_prefix(test)
484         self.starts[test.id()] = self.get_time()
485
486     def addSuccess(self, test):
487         test = self._add_prefix(test)
488         tid = test.id()
489         if tid not in self.starts:
490             self._ops.addError(test, "%s succeeded without ever starting!" % tid)
491         delta = self.get_time() - self.starts[tid]
492         self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
493
494     def addFailure(self, test, err=''):
495         tid = test.id()
496         delta = self.get_time() - self.starts[tid]
497         self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
498                              (tid, delta.total_seconds(), err))
499
500     def addError(self, test, err=''):
501         tid = test.id()
502         delta = self.get_time() - self.starts[tid]
503         self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
504                              (tid, delta.total_seconds(), err))
505
506     def __init__(self, out, prefix='', suffix=''):
507         self._ops = out
508         self.prefix = prefix or ''
509         self.suffix = suffix or ''
510         self.starts = {}
511         self.seen_output = False
512         self.suite_has_time = False
513
514
515 class PlainFormatter(TestsuiteEnabledTestResult):
516
517     def __init__(self, verbose, immediate, statistics,
518             totaltests=None):
519         super(PlainFormatter, self).__init__()
520         self.verbose = verbose
521         self.immediate = immediate
522         self.statistics = statistics
523         self.start_time = None
524         self.test_output = {}
525         self.suitesfailed = []
526         self.suites_ok = 0
527         self.skips = {}
528         self.index = 0
529         self.name = None
530         self._progress_level = 0
531         self.totalsuites = totaltests
532         self.last_time = None
533
534     @staticmethod
535     def _format_time(delta):
536         minutes, seconds = divmod(delta.seconds, 60)
537         hours, minutes = divmod(minutes, 60)
538         ret = ""
539         if hours:
540             ret += "%dh" % hours
541         if minutes:
542             ret += "%dm" % minutes
543         ret += "%ds" % seconds
544         return ret
545
546     def progress(self, offset, whence):
547         if whence == subunit.PROGRESS_POP:
548             self._progress_level -= 1
549         elif whence == subunit.PROGRESS_PUSH:
550             self._progress_level += 1
551         elif whence == subunit.PROGRESS_SET:
552             if self._progress_level == 0:
553                 self.totalsuites = offset
554         elif whence == subunit.PROGRESS_CUR:
555             raise NotImplementedError
556
557     def time(self, dt):
558         if self.start_time is None:
559             self.start_time = dt
560         self.last_time = dt
561
562     def start_testsuite(self, name):
563         self.index += 1
564         self.name = name
565
566         if not self.verbose:
567             self.test_output[name] = ""
568
569         total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
570                        self.statistics['TESTS_EXPECTED_FAIL'] +
571                        self.statistics['TESTS_ERROR'] +
572                        self.statistics['TESTS_UNEXPECTED_FAIL'] +
573                        self.statistics['TESTS_UNEXPECTED_OK'])
574
575         out = "[%d(%d)" % (self.index, total_tests)
576         if self.totalsuites is not None:
577             out += "/%d" % self.totalsuites
578         if self.start_time is not None:
579             out += " at " + self._format_time(self.last_time - self.start_time)
580         if self.suitesfailed:
581             out += ", %d errors" % (len(self.suitesfailed),)
582         out += "] %s" % name
583         if self.immediate:
584             sys.stdout.write(out + "\n")
585         else:
586             sys.stdout.write(out + ": ")
587
588     def output_msg(self, output):
589         if self.verbose:
590             sys.stdout.write(output)
591         elif self.name is not None:
592             self.test_output[self.name] += output
593         else:
594             sys.stdout.write(output)
595
596     def control_msg(self, output):
597         pass
598
599     def end_testsuite(self, name, result, reason):
600         out = ""
601         unexpected = False
602
603         if not name in self.test_output:
604             print("no output for name[%s]" % name)
605
606         if result in ("success", "xfail"):
607             self.suites_ok+=1
608         else:
609             self.output_msg("ERROR: Testsuite[%s]\n" % name)
610             if reason is not None:
611                 self.output_msg("REASON: %s\n" % (reason,))
612             self.suitesfailed.append(name)
613             if self.immediate and not self.verbose and name in self.test_output:
614                 out += self.test_output[name]
615             unexpected = True
616
617         if not self.immediate:
618             if not unexpected:
619                 out += " ok\n"
620             else:
621                 out += " " + result.upper() + "\n"
622
623         sys.stdout.write(out)
624
625     def startTest(self, test):
626         pass
627
628     def addSuccess(self, test):
629         self.end_test(test.id(), "success", False)
630
631     def addError(self, test, err=None):
632         self.end_test(test.id(), "error", True, err)
633
634     def addFailure(self, test, err=None):
635         self.end_test(test.id(), "failure", True, err)
636
637     def addSkip(self, test, reason=None):
638         self.end_test(test.id(), "skip", False, reason)
639
640     def addExpectedFailure(self, test, err=None):
641         self.end_test(test.id(), "xfail", False, err)
642
643     def addUnexpectedSuccess(self, test):
644         self.end_test(test.id(), "uxsuccess", True)
645
646     def end_test(self, testname, result, unexpected, err=None):
647         if not unexpected:
648             self.test_output[self.name] = ""
649             if not self.immediate:
650                 sys.stdout.write({
651                     'failure': 'f',
652                     'xfail': 'X',
653                     'skip': 's',
654                     'success': '.'}.get(result, "?(%s)" % result))
655             return
656
657         if not self.name in self.test_output:
658             self.test_output[self.name] = ""
659
660         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
661         if err is not None:
662             self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
663
664         if self.immediate and not self.verbose:
665             sys.stdout.write(self.test_output[self.name])
666             self.test_output[self.name] = ""
667
668         if not self.immediate:
669             sys.stdout.write({
670                'error': 'E',
671                'failure': 'F',
672                'uxsuccess': 'U',
673                'success': 'S'}.get(result, "?"))
674
675     def write_summary(self, path):
676         f = open(path, 'w+')
677
678         if self.suitesfailed:
679             f.write("= Failed tests =\n")
680
681             for suite in self.suitesfailed:
682                 f.write("== %s ==\n" % suite)
683                 if suite in self.test_output:
684                     f.write(self.test_output[suite]+"\n\n")
685
686             f.write("\n")
687
688         if not self.immediate and not self.verbose:
689             for suite in self.suitesfailed:
690                 print("=" * 78)
691                 print("FAIL: %s" % suite)
692                 if suite in self.test_output:
693                     print(self.test_output[suite])
694                 print("")
695
696         f.write("= Skipped tests =\n")
697         for reason in self.skips.keys():
698             f.write(reason + "\n")
699             for name in self.skips[reason]:
700                 f.write("\t%s\n" % name)
701             f.write("\n")
702         f.close()
703
704         if (not self.suitesfailed and
705             not self.statistics['TESTS_UNEXPECTED_FAIL'] and
706             not self.statistics['TESTS_UNEXPECTED_OK'] and
707             not self.statistics['TESTS_ERROR']):
708             ok = (self.statistics['TESTS_EXPECTED_OK'] +
709                   self.statistics['TESTS_EXPECTED_FAIL'])
710             print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
711         else:
712             print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
713                 self.statistics['TESTS_UNEXPECTED_FAIL'],
714                 self.statistics['TESTS_ERROR'],
715                 self.statistics['TESTS_UNEXPECTED_OK'],
716                 len(self.suitesfailed)))
717
718     def skip_testsuite(self, name, reason="UNKNOWN"):
719         self.skips.setdefault(reason, []).append(name)
720         if self.totalsuites:
721             self.totalsuites-=1