subunit: Update to latest upstream version.
[samba.git] / lib / subunit / python / subunit / tests / test_test_results.py
index 94d22748e80889376bf639d6f361492d38138f03..236dfa22e51f0b15474ff5aae236b7feef86c48a 100644 (file)
 #  limitations under that license.
 #
 
+import csv
 import datetime
+import sys
 import unittest
 
 from testtools import TestCase
+from testtools.compat import StringIO
+from testtools.content import (
+    text_content,
+    TracebackContent,
+    )
 from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
 import subunit.iso8601 as iso8601
 import subunit.test_results
 
+import testtools
+
 
 class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
 
@@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
 
 class TestTagCollapsingDecorator(TestCase):
 
-    def test_tags_forwarded_outside_of_tests(self):
+    def test_tags_collapsed_outside_of_tests(self):
         result = ExtendedTestResult()
         tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
-        tag_collapser.tags(set(['a', 'b']), set())
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
         self.assertEquals(
-            [('tags', set(['a', 'b']), set([]))], result._events)
+            [('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ], result._events)
+
+    def test_tags_collapsed_outside_of_tests_are_flushed(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
+        tag_collapser.addSuccess(self)
+        tag_collapser.stopTest(self)
+        tag_collapser.stopTestRun()
+        self.assertEquals(
+            [('startTestRun',),
+             ('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ('addSuccess', self),
+             ('stopTest', self),
+             ('stopTestRun',),
+             ], result._events)
+
+    def test_tags_forwarded_after_tests(self):
+        test = subunit.RemotedTestCase('foo')
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.startTest(test)
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        tag_collapser.tags(set(['a']), set(['b']))
+        tag_collapser.stopTestRun()
+        self.assertEqual(
+            [('startTestRun',),
+             ('startTest', test),
+             ('addSuccess', test),
+             ('stopTest', test),
+             ('tags', set(['a']), set(['b'])),
+             ('stopTestRun',),
+             ],
+            result._events)
 
     def test_tags_collapsed_inside_of_tests(self):
         result = ExtendedTestResult()
@@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase):
              ('stopTest', test)],
             result._events)
 
+    def test_tags_sent_before_result(self):
+        # Because addSuccess and friends tend to send subunit output
+        # immediately, and because 'tags:' before a result line means
+        # something different to 'tags:' after a result line, we need to be
+        # sure that tags are emitted before 'addSuccess' (or whatever).
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['a']), set()),
+             ('addSuccess', test),
+             ('stopTest', test)],
+            result._events)
+
 
 class TestTimeCollapsingDecorator(TestCase):
 
@@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase):
              ('stopTest', foo)], result._events)
 
 
+class TestByTestResultTests(testtools.TestCase):
+
+    def setUp(self):
+        super(TestByTestResultTests, self).setUp()
+        self.log = []
+        self.result = subunit.test_results.TestByTestResult(self.on_test)
+        if sys.version_info >= (3, 0):
+            self.result._now = iter(range(5)).__next__
+        else:
+            self.result._now = iter(range(5)).next
+
+    def assertCalled(self, **kwargs):
+        defaults = {
+            'test': self,
+            'tags': set(),
+            'details': None,
+            'start_time': 0,
+            'stop_time': 1,
+            }
+        defaults.update(kwargs)
+        self.assertEqual([defaults], self.log)
+
+    def on_test(self, **kwargs):
+        self.log.append(kwargs)
+
+    def test_no_tests_nothing_reported(self):
+        self.result.startTestRun()
+        self.result.stopTestRun()
+        self.assertEqual([], self.log)
+
+    def test_add_success(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success')
+
+    def test_add_success_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_tags(self):
+        if not getattr(self.result, 'tags', None):
+            self.skipTest("No tags in testtools")
+        self.result.tags(['foo'], [])
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', tags=set(['foo']))
+
+    def test_add_error(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addError(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='error',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_error_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addError(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='error', details=details)
+
+    def test_add_failure(self):
+        self.result.startTest(self)
+        try:
+            self.fail("intentional failure")
+        except self.failureException:
+            failure = sys.exc_info()
+        self.result.addFailure(self, failure)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='failure',
+            details={'traceback': TracebackContent(failure, self)})
+
+    def test_add_failure_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='failure', details=details)
+
+    def test_add_xfail(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addExpectedFailure(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='xfail',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_xfail_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addExpectedFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='xfail', details=details)
+
+    def test_add_unexpected_success(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addUnexpectedSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_add_skip_reason(self):
+        self.result.startTest(self)
+        reason = self.getUniqueString()
+        self.result.addSkip(self, reason)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='skip', details={'reason': text_content(reason)})
+
+    def test_add_skip_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSkip(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='skip', details=details)
+
+    def test_twice(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self, details={'foo': 'bar'})
+        self.result.stopTest(self)
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertEqual(
+            [{'test': self,
+              'status': 'success',
+              'start_time': 0,
+              'stop_time': 1,
+              'tags': set(),
+              'details': {'foo': 'bar'}},
+             {'test': self,
+              'status': 'success',
+              'start_time': 2,
+              'stop_time': 3,
+              'tags': set(),
+              'details': None},
+             ],
+            self.log)
+
+
+class TestCsvResult(testtools.TestCase):
+
+    def parse_stream(self, stream):
+        stream.seek(0)
+        reader = csv.reader(stream)
+        return list(reader)
+
+    def test_csv_output(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        if sys.version_info >= (3, 0):
+            result._now = iter(range(5)).__next__
+        else:
+            result._now = iter(range(5)).next
+        result.startTestRun()
+        result.startTest(self)
+        result.addSuccess(self)
+        result.stopTest(self)
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time'],
+             [self.id(), 'success', '0', '1'],
+             ],
+            self.parse_stream(stream))
+
+    def test_just_header_when_no_tests(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        result.startTestRun()
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time']],
+            self.parse_stream(stream))
+
+    def test_no_output_before_events(self):
+        stream = StringIO()
+        subunit.test_results.CsvResult(stream)
+        self.assertEqual([], self.parse_stream(stream))
+
+
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
     result = loader.loadTestsFromName(__name__)