cde6febdafa34d0d91cb29662b6d4453a1563ad8
[samba.git] / python / samba / subunit / run.py
1 #!/usr/bin/python
2 #
3 # Simple subunit testrunner for python
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
5
6 # Cobbled together from testtools and subunit:
7 # Copyright (C) 2005-2011 Robert Collins <robertc@robertcollins.net>
8 # Copyright (c) 2008-2011 testtools developers.
9 #
10 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
11 #  license at the users choice. A copy of both licenses are available in the
12 #  project source as Apache-2.0 and BSD. You may not use this file except in
13 #  compliance with one of these two licences.
14 #
15 #  Unless required by applicable law or agreed to in writing, software
16 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
17 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
18 #  license you chose for the specific language governing permissions and
19 #  limitations under that license.
20 #
21
22 """Run a unittest testcase reporting results as Subunit.
23
24   $ python -m samba.subunit.run mylib.tests.test_suite
25 """
26
27 from iso8601.iso8601 import Utc
28
29 import datetime
30 import os
31 import sys
32 import traceback
33 import unittest
34
35
36 # Whether or not to hide layers of the stack trace that are
37 # unittest/testtools internal code.  Defaults to True since the
38 # system-under-test is rarely unittest or testtools.
39 HIDE_INTERNAL_STACK = True
40
41
42 def write_traceback(stream, err, test):
43     """Converts a sys.exc_info()-style tuple of values into a string.
44
45     Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
46     """
47     def _is_relevant_tb_level(tb):
48         return '__unittest' in tb.tb_frame.f_globals
49
50     def _count_relevant_tb_levels(tb):
51         length = 0
52         while tb and not _is_relevant_tb_level(tb):
53             length += 1
54             tb = tb.tb_next
55         return length
56
57     exctype, value, tb = err
58     # Skip test runner traceback levels
59     if HIDE_INTERNAL_STACK:
60         while tb and _is_relevant_tb_level(tb):
61             tb = tb.tb_next
62
63     format_exception = traceback.format_exception
64
65     if (HIDE_INTERNAL_STACK and test.failureException
66         and isinstance(value, test.failureException)):
67         # Skip assert*() traceback levels
68         length = _count_relevant_tb_levels(tb)
69         msgLines = format_exception(exctype, value, tb, length)
70     else:
71         msgLines = format_exception(exctype, value, tb)
72     stream.writelines(msgLines)
73
74
75 class TestProtocolClient(unittest.TestResult):
76     """A TestResult which generates a subunit stream for a test run.
77
78     # Get a TestSuite or TestCase to run
79     suite = make_suite()
80     # Create a stream (any object with a 'write' method). This should accept
81     # bytes not strings: subunit is a byte orientated protocol.
82     stream = file('tests.log', 'wb')
83     # Create a subunit result object which will output to the stream
84     result = subunit.TestProtocolClient(stream)
85     # Optionally, to get timing data for performance analysis, wrap the
86     # serialiser with a timing decorator
87     result = subunit.test_results.AutoTimingTestResultDecorator(result)
88     # Run the test suite reporting to the subunit result object
89     suite.run(result)
90     # Close the stream.
91     stream.close()
92     """
93
94     def __init__(self, stream):
95         unittest.TestResult.__init__(self)
96         self._stream = stream
97
98     def addError(self, test, error=None):
99         """Report an error in test test.
100
101         :param error: Standard unittest positional argument form - an
102             exc_info tuple.
103         """
104         self._addOutcome("error", test, error=error)
105         if self.failfast:
106             self.stop()
107
108     def addExpectedFailure(self, test, error=None):
109         """Report an expected failure in test test.
110
111         :param error: Standard unittest positional argument form - an
112             exc_info tuple.
113         """
114         self._addOutcome("xfail", test, error=error)
115
116     def addFailure(self, test, error=None):
117         """Report a failure in test test.
118
119         :param error: Standard unittest positional argument form - an
120             exc_info tuple.
121         """
122         self._addOutcome("failure", test, error=error)
123         if self.failfast:
124             self.stop()
125
126     def _addOutcome(self, outcome, test, error=None, error_permitted=True):
127         """Report a failure in test test.
128
129         :param outcome: A string describing the outcome - used as the
130             event name in the subunit stream.
131         :param error: Standard unittest positional argument form - an
132             exc_info tuple.
133         :param error_permitted: If True then error must be supplied.
134             If False then error must not be supplied.
135         """
136         self._stream.write(("%s: " % outcome) + self._test_id(test))
137         if error_permitted:
138             if error is None:
139                 raise ValueError
140         else:
141             if error is not None:
142                 raise ValueError
143         if error is not None:
144             self._stream.write(" [\n")
145             write_traceback(self._stream, error, test)
146         else:
147             self._stream.write("\n")
148         if error is not None:
149             self._stream.write("]\n")
150
151     def addSkip(self, test, reason=None):
152         """Report a skipped test."""
153         if reason is None:
154             self._addOutcome("skip", test, error=None)
155         else:
156             self._stream.write("skip: %s [\n" % test.id())
157             self._stream.write("%s\n" % reason)
158             self._stream.write("]\n")
159
160     def addSuccess(self, test):
161         """Report a success in a test."""
162         self._addOutcome("successful", test, error_permitted=False)
163
164     def addUnexpectedSuccess(self, test):
165         """Report an unexpected success in test test.
166         """
167         self._addOutcome("uxsuccess", test, error_permitted=False)
168         if self.failfast:
169             self.stop()
170
171     def _test_id(self, test):
172         result = test.id()
173         if type(result) is not bytes:
174             result = result.encode('utf8')
175         return result
176
177     def startTest(self, test):
178         """Mark a test as starting its test run."""
179         super(TestProtocolClient, self).startTest(test)
180         self._stream.write("test: " + self._test_id(test) + "\n")
181         self._stream.flush()
182
183     def stopTest(self, test):
184         super(TestProtocolClient, self).stopTest(test)
185         self._stream.flush()
186
187     def time(self, a_datetime):
188         """Inform the client of the time.
189
190         ":param datetime: A datetime.datetime object.
191         """
192         time = a_datetime.astimezone(Utc())
193         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
194             time.year, time.month, time.day, time.hour, time.minute,
195             time.second, time.microsecond))
196
197
198 def _flatten_tests(suite_or_case, unpack_outer=False):
199     try:
200         tests = iter(suite_or_case)
201     except TypeError:
202         # Not iterable, assume it's a test case.
203         return [(suite_or_case.id(), suite_or_case)]
204     if (type(suite_or_case) in (unittest.TestSuite,) or
205         unpack_outer):
206         # Plain old test suite (or any others we may add).
207         result = []
208         for test in tests:
209             # Recurse to flatten.
210             result.extend(_flatten_tests(test))
211         return result
212     else:
213         # Find any old actual test and grab its id.
214         suite_id = None
215         tests = iterate_tests(suite_or_case)
216         for test in tests:
217             suite_id = test.id()
218             break
219         # If it has a sort_tests method, call that.
220         if getattr(suite_or_case, 'sort_tests', None) is not None:
221             suite_or_case.sort_tests()
222         return [(suite_id, suite_or_case)]
223
224
225 def sorted_tests(suite_or_case, unpack_outer=False):
226     """Sort suite_or_case while preserving non-vanilla TestSuites."""
227     tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
228     tests.sort()
229     return unittest.TestSuite([test for (sort_key, test) in tests])
230
231
232 def iterate_tests(test_suite_or_case):
233     """Iterate through all of the test cases in 'test_suite_or_case'."""
234     try:
235         suite = iter(test_suite_or_case)
236     except TypeError:
237         yield test_suite_or_case
238     else:
239         for test in suite:
240             for subtest in iterate_tests(test):
241                 yield subtest
242
243
244 defaultTestLoader = unittest.defaultTestLoader
245 defaultTestLoaderCls = unittest.TestLoader
246
247 if getattr(defaultTestLoader, 'discover', None) is None:
248     try:
249         import discover
250         defaultTestLoader = discover.DiscoveringTestLoader()
251         defaultTestLoaderCls = discover.DiscoveringTestLoader
252         have_discover = True
253     except ImportError:
254         have_discover = False
255 else:
256     have_discover = True
257
258
259 ####################
260 # Taken from python 2.7 and slightly modified for compatibility with
261 # older versions. Delete when 2.7 is the oldest supported version.
262 # Modifications:
263 #  - Use have_discover to raise an error if the user tries to use
264 #    discovery on an old version and doesn't have discover installed.
265 #  - If --catch is given check that installHandler is available, as
266 #    it won't be on old python versions.
267 #  - print calls have been been made single-source python3 compatibile.
268 #  - exception handling likewise.
269 #  - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE
270 #    removed.
271 #  - A tweak has been added to detect 'python -m *.run' and use a
272 #    better progName in that case.
273 #  - self.module is more comprehensively set to None when being invoked from
274 #    the commandline - __name__ is used as a sentinel value.
275 #  - --list has been added which can list tests (should be upstreamed).
276 #  - --load-list has been added which can reduce the tests used (should be
277 #    upstreamed).
278 #  - The limitation of using getopt is declared to the user.
279 #  - http://bugs.python.org/issue16709 is worked around, by sorting tests when
280 #    discover is used.
281
282 FAILFAST     = "  -f, --failfast   Stop on first failure\n"
283 CATCHBREAK   = "  -c, --catch      Catch control-C and display results\n"
284 BUFFEROUTPUT = "  -b, --buffer     Buffer stdout and stderr during test runs\n"
285
286 USAGE_AS_MAIN = """\
287 Usage: %(progName)s [options] [tests]
288
289 Options:
290   -h, --help       Show this message
291   -v, --verbose    Verbose output
292   -q, --quiet      Minimal output
293   -l, --list       List tests rather than executing them.
294   --load-list      Specifies a file containing test ids, only tests matching
295                    those ids are executed.
296 %(failfast)s%(catchbreak)s%(buffer)s
297 Examples:
298   %(progName)s test_module               - run tests from test_module
299   %(progName)s module.TestClass          - run tests from module.TestClass
300   %(progName)s module.Class.test_method  - run specified test method
301
302 All options must come before [tests].  [tests] can be a list of any number of
303 test modules, classes and test methods.
304
305 Alternative Usage: %(progName)s discover [options]
306
307 Options:
308   -v, --verbose    Verbose output
309 %(failfast)s%(catchbreak)s%(buffer)s  -s directory     Directory to start discovery ('.' default)
310   -p pattern       Pattern to match test files ('test*.py' default)
311   -t directory     Top level directory of project (default to
312                    start directory)
313   -l, --list       List tests rather than executing them.
314   --load-list      Specifies a file containing test ids, only tests matching
315                    those ids are executed.
316
317 For test discovery all test modules must be importable from the top
318 level directory of the project.
319 """
320
321
322 # NOT a TestResult, because we are implementing the interface, not inheriting
323 # it.
324 class TestResultDecorator(object):
325     """General pass-through decorator.
326
327     This provides a base that other TestResults can inherit from to
328     gain basic forwarding functionality. It also takes care of
329     handling the case where the target doesn't support newer methods
330     or features by degrading them.
331     """
332
333     def __init__(self, decorated):
334         """Create a TestResultDecorator forwarding to decorated."""
335         # Make every decorator degrade gracefully.
336         self.decorated = decorated
337
338     def startTest(self, test):
339         return self.decorated.startTest(test)
340
341     def startTestRun(self):
342         return self.decorated.startTestRun()
343
344     def stopTest(self, test):
345         return self.decorated.stopTest(test)
346
347     def stopTestRun(self):
348         return self.decorated.stopTestRun()
349
350     def addError(self, test, err=None):
351         return self.decorated.addError(test, err)
352
353     def addFailure(self, test, err=None):
354         return self.decorated.addFailure(test, err)
355
356     def addSuccess(self, test):
357         return self.decorated.addSuccess(test)
358
359     def addSkip(self, test, reason=None):
360         return self.decorated.addSkip(test, reason)
361
362     def addExpectedFailure(self, test, err=None):
363         return self.decorated.addExpectedFailure(test, err)
364
365     def addUnexpectedSuccess(self, test):
366         return self.decorated.addUnexpectedSuccess(test)
367
368     def _get_failfast(self):
369         return getattr(self.decorated, 'failfast', False)
370
371     def _set_failfast(self, value):
372         self.decorated.failfast = value
373     failfast = property(_get_failfast, _set_failfast)
374
375     def wasSuccessful(self):
376         return self.decorated.wasSuccessful()
377
378     @property
379     def shouldStop(self):
380         return self.decorated.shouldStop
381
382     def stop(self):
383         return self.decorated.stop()
384
385     @property
386     def testsRun(self):
387         return self.decorated.testsRun
388
389     def time(self, a_datetime):
390         return self.decorated.time(a_datetime)
391
392
393 class HookedTestResultDecorator(TestResultDecorator):
394     """A TestResult which calls a hook on every event."""
395
396     def __init__(self, decorated):
397         self.super = super(HookedTestResultDecorator, self)
398         self.super.__init__(decorated)
399
400     def startTest(self, test):
401         self._before_event()
402         return self.super.startTest(test)
403
404     def startTestRun(self):
405         self._before_event()
406         return self.super.startTestRun()
407
408     def stopTest(self, test):
409         self._before_event()
410         return self.super.stopTest(test)
411
412     def stopTestRun(self):
413         self._before_event()
414         return self.super.stopTestRun()
415
416     def addError(self, test, err=None):
417         self._before_event()
418         return self.super.addError(test, err)
419
420     def addFailure(self, test, err=None):
421         self._before_event()
422         return self.super.addFailure(test, err)
423
424     def addSuccess(self, test):
425         self._before_event()
426         return self.super.addSuccess(test)
427
428     def addSkip(self, test, reason=None):
429         self._before_event()
430         return self.super.addSkip(test, reason)
431
432     def addExpectedFailure(self, test, err=None):
433         self._before_event()
434         return self.super.addExpectedFailure(test, err)
435
436     def addUnexpectedSuccess(self, test):
437         self._before_event()
438         return self.super.addUnexpectedSuccess(test)
439
440     def wasSuccessful(self):
441         self._before_event()
442         return self.super.wasSuccessful()
443
444     @property
445     def shouldStop(self):
446         self._before_event()
447         return self.super.shouldStop
448
449     def stop(self):
450         self._before_event()
451         return self.super.stop()
452
453     def time(self, a_datetime):
454         self._before_event()
455         return self.super.time(a_datetime)
456
457
458 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
459     """Decorate a TestResult to add time events to a test run.
460
461     By default this will cause a time event before every test event,
462     but if explicit time data is being provided by the test run, then
463     this decorator will turn itself off to prevent causing confusion.
464     """
465
466     def __init__(self, decorated):
467         self._time = None
468         super(AutoTimingTestResultDecorator, self).__init__(decorated)
469
470     def _before_event(self):
471         time = self._time
472         if time is not None:
473             return
474         time = datetime.datetime.utcnow().replace(tzinfo=Utc())
475         self.decorated.time(time)
476
477     @property
478     def shouldStop(self):
479         return self.decorated.shouldStop
480
481     def time(self, a_datetime):
482         """Provide a timestamp for the current test activity.
483
484         :param a_datetime: If None, automatically add timestamps before every
485             event (this is the default behaviour if time() is not called at
486             all).  If not None, pass the provided time onto the decorated
487             result object and disable automatic timestamps.
488         """
489         self._time = a_datetime
490         return self.decorated.time(a_datetime)
491
492
493 class SubunitTestRunner(object):
494
495     def __init__(self, verbosity=None, failfast=None, buffer=None, stream=None):
496         """Create a SubunitTestRunner.
497
498         :param verbosity: Ignored.
499         :param failfast: Stop running tests at the first failure.
500         :param buffer: Ignored.
501         """
502         self.failfast = failfast
503         self.stream = stream or sys.stdout
504
505     def run(self, test):
506         "Run the given test case or test suite."
507         result = TestProtocolClient(self.stream)
508         result = AutoTimingTestResultDecorator(result)
509         if self.failfast is not None:
510             result.failfast = self.failfast
511         test(result)
512         return result
513
514
515 class TestProgram(object):
516     """A command-line program that runs a set of tests; this is primarily
517        for making test modules conveniently executable.
518     """
519     USAGE = USAGE_AS_MAIN
520
521     # defaults for testing
522     failfast = catchbreak = buffer = progName = None
523
524     def __init__(self, module=__name__, defaultTest=None, argv=None,
525                     testRunner=None, testLoader=defaultTestLoader,
526                     exit=True, verbosity=1, failfast=None, catchbreak=None,
527                     buffer=None, stdout=None):
528         if module == __name__:
529             self.module = None
530         elif isinstance(module, str):
531             self.module = __import__(module)
532             for part in module.split('.')[1:]:
533                 self.module = getattr(self.module, part)
534         else:
535             self.module = module
536         if argv is None:
537             argv = sys.argv
538         if stdout is None:
539             stdout = sys.stdout
540         if testRunner is None:
541             testRunner = SubunitTestRunner()
542
543         self.exit = exit
544         self.failfast = failfast
545         self.catchbreak = catchbreak
546         self.verbosity = verbosity
547         self.buffer = buffer
548         self.defaultTest = defaultTest
549         self.listtests = False
550         self.load_list = None
551         self.testRunner = testRunner
552         self.testLoader = testLoader
553         progName = argv[0]
554         if progName.endswith('%srun.py' % os.path.sep):
555             elements = progName.split(os.path.sep)
556             progName = '%s.run' % elements[-2]
557         else:
558             progName = os.path.basename(argv[0])
559         self.progName = progName
560         self.parseArgs(argv)
561         if self.load_list:
562             # TODO: preserve existing suites (like testresources does in
563             # OptimisingTestSuite.add, but with a standard protocol).
564             # This is needed because the load_tests hook allows arbitrary
565             # suites, even if that is rarely used.
566             source = open(self.load_list, 'rb')
567             try:
568                 lines = source.readlines()
569             finally:
570                 source.close()
571             test_ids = set(line.strip().decode('utf-8') for line in lines)
572             filtered = unittest.TestSuite()
573             for test in iterate_tests(self.test):
574                 if test.id() in test_ids:
575                     filtered.addTest(test)
576             self.test = filtered
577         if not self.listtests:
578             self.runTests()
579         else:
580             for test in iterate_tests(self.test):
581                 stdout.write('%s\n' % test.id())
582
583     def parseArgs(self, argv):
584         if len(argv) > 1 and argv[1].lower() == 'discover':
585             self._do_discovery(argv[2:])
586             return
587
588         import getopt
589         long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch', 'buffer',
590             'list', 'load-list=']
591         try:
592             options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts)
593             for opt, value in options:
594                 if opt in ('-h','-H','--help'):
595                     self.usageExit()
596                 if opt in ('-q','--quiet'):
597                     self.verbosity = 0
598                 if opt in ('-v','--verbose'):
599                     self.verbosity = 2
600                 if opt in ('-f','--failfast'):
601                     if self.failfast is None:
602                         self.failfast = True
603                     # Should this raise an exception if -f is not valid?
604                 if opt in ('-c','--catch'):
605                     if self.catchbreak is None:
606                         self.catchbreak = True
607                     # Should this raise an exception if -c is not valid?
608                 if opt in ('-b','--buffer'):
609                     if self.buffer is None:
610                         self.buffer = True
611                     # Should this raise an exception if -b is not valid?
612                 if opt in ('-l', '--list'):
613                     self.listtests = True
614                 if opt == '--load-list':
615                     self.load_list = value
616             if len(args) == 0 and self.defaultTest is None:
617                 # createTests will load tests from self.module
618                 self.testNames = None
619             elif len(args) > 0:
620                 self.testNames = args
621             else:
622                 self.testNames = (self.defaultTest,)
623             self.createTests()
624         except getopt.error:
625             self.usageExit(sys.exc_info()[1])
626
627     def createTests(self):
628         if self.testNames is None:
629             self.test = self.testLoader.loadTestsFromModule(self.module)
630         else:
631             self.test = self.testLoader.loadTestsFromNames(self.testNames,
632                                                            self.module)
633
634     def _do_discovery(self, argv, Loader=defaultTestLoaderCls):
635         # handle command line args for test discovery
636         if not have_discover:
637             raise AssertionError("Unable to use discovery, must use python 2.7 "
638                     "or greater, or install the discover package.")
639         self.progName = '%s discover' % self.progName
640         import optparse
641         parser = optparse.OptionParser()
642         parser.prog = self.progName
643         parser.add_option('-v', '--verbose', dest='verbose', default=False,
644                           help='Verbose output', action='store_true')
645         if self.failfast != False:
646             parser.add_option('-f', '--failfast', dest='failfast', default=False,
647                               help='Stop on first fail or error',
648                               action='store_true')
649         if self.catchbreak != False:
650             parser.add_option('-c', '--catch', dest='catchbreak', default=False,
651                               help='Catch ctrl-C and display results so far',
652                               action='store_true')
653         if self.buffer != False:
654             parser.add_option('-b', '--buffer', dest='buffer', default=False,
655                               help='Buffer stdout and stderr during tests',
656                               action='store_true')
657         parser.add_option('-s', '--start-directory', dest='start', default='.',
658                           help="Directory to start discovery ('.' default)")
659         parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
660                           help="Pattern to match tests ('test*.py' default)")
661         parser.add_option('-t', '--top-level-directory', dest='top', default=None,
662                           help='Top level directory of project (defaults to start directory)')
663         parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true",
664                           help='List tests rather than running them.')
665         parser.add_option('--load-list', dest='load_list', default=None,
666                           help='Specify a filename containing the test ids to use.')
667
668         options, args = parser.parse_args(argv)
669         if len(args) > 3:
670             self.usageExit()
671
672         for name, value in zip(('start', 'pattern', 'top'), args):
673             setattr(options, name, value)
674
675         # only set options from the parsing here
676         # if they weren't set explicitly in the constructor
677         if self.failfast is None:
678             self.failfast = options.failfast
679         if self.catchbreak is None:
680             self.catchbreak = options.catchbreak
681         if self.buffer is None:
682             self.buffer = options.buffer
683         self.listtests = options.listtests
684         self.load_list = options.load_list
685
686         if options.verbose:
687             self.verbosity = 2
688
689         start_dir = options.start
690         pattern = options.pattern
691         top_level_dir = options.top
692
693         loader = Loader()
694         # See http://bugs.python.org/issue16709
695         # While sorting here is intrusive, its better than being random.
696         # Rules for the sort:
697         # - standard suites are flattened, and the resulting tests sorted by
698         #   id.
699         # - non-standard suites are preserved as-is, and sorted into position
700         #   by the first test found by iterating the suite.
701         # We do this by a DSU process: flatten and grab a key, sort, strip the
702         # keys.
703         loaded = loader.discover(start_dir, pattern, top_level_dir)
704         self.test = sorted_tests(loaded)
705
706     def runTests(self):
707         if (self.catchbreak
708             and getattr(unittest, 'installHandler', None) is not None):
709             unittest.installHandler()
710         self.result = self.testRunner.run(self.test)
711         if self.exit:
712             sys.exit(not self.result.wasSuccessful())
713
714     def usageExit(self, msg=None):
715         if msg:
716             print (msg)
717         usage = {'progName': self.progName, 'catchbreak': '', 'failfast': '',
718                  'buffer': ''}
719         if self.failfast != False:
720             usage['failfast'] = FAILFAST
721         if self.catchbreak != False:
722             usage['catchbreak'] = CATCHBREAK
723         if self.buffer != False:
724             usage['buffer'] = BUFFEROUTPUT
725         usage_text = self.USAGE % usage
726         usage_lines = usage_text.split('\n')
727         usage_lines.insert(2, "Run a test suite with a subunit reporter.")
728         usage_lines.insert(3, "")
729         print('\n'.join(usage_lines))
730         sys.exit(2)
731
732
733 if __name__ == '__main__':
734     TestProgram(module=None, argv=sys.argv, stdout=sys.stdout)