ctdb-daemon: Don't leak memory if not using recovery lock
[obnox/samba/samba-obnox.git] / selftest / subunithelper.py
index 6f1fdcee127284e057282ba0d1c9ec854e73bbf4..948256db9ad91a4814f71aa98f07f8d34d9f667e 100644 (file)
@@ -19,14 +19,14 @@ __all__ = ['parse_results']
 
 import re
 import sys
-import subunit
-import subunit.iso8601
-import testtools
-from testtools import content, content_type
+from samba import subunit
+from samba.subunit.run import TestProtocolClient
+from samba.subunit import iso8601
+import unittest
 
 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
 
-class TestsuiteEnabledTestResult(testtools.testresult.TestResult):
+class TestsuiteEnabledTestResult(unittest.TestResult):
 
     def start_testsuite(self, name):
         raise NotImplementedError(self.start_testsuite)
@@ -57,7 +57,7 @@ def parse_results(msg_ops, statistics, fh):
         elif command == "time":
             msg_ops.control_msg(l)
             try:
-                dt = subunit.iso8601.parse_date(arg.rstrip("\n"))
+                dt = iso8601.parse_date(arg.rstrip("\n"))
             except TypeError, e:
                 print "Unable to parse time line: %s" % arg.rstrip("\n")
             else:
@@ -120,7 +120,7 @@ def parse_results(msg_ops, statistics, fh):
                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
                 else:
                     statistics['TESTS_UNEXPECTED_OK']+=1
-                    msg_ops.addUnexpectedSuccess(test, remote_error)
+                    msg_ops.addUnexpectedSuccess(test)
                     exitcode = 1
             elif result in ("failure", "fail"):
                 try:
@@ -191,7 +191,17 @@ def parse_results(msg_ops, statistics, fh):
     return exitcode
 
 
-class SubunitOps(subunit.TestProtocolClient,TestsuiteEnabledTestResult):
+class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
+
+    def progress(self, count, whence):
+        if whence == subunit.PROGRESS_POP:
+            self._stream.write("progress: pop\n")
+        elif whence == subunit.PROGRESS_PUSH:
+            self._stream.write("progress: push\n")
+        elif whence == subunit.PROGRESS_SET:
+            self._stream.write("progress: %d\n" % count)
+        elif whence == subunit.PROGRESS_CUR:
+            raise NotImplementedError
 
     # The following are Samba extensions:
     def start_testsuite(self, name):
@@ -247,7 +257,7 @@ class ImmediateFail(Exception):
         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
 
 
-class FilterOps(testtools.testresult.TestResult):
+class FilterOps(unittest.TestResult):
 
     def control_msg(self, msg):
         pass # We regenerate control messages, so ignore this
@@ -282,38 +292,38 @@ class FilterOps(testtools.testresult.TestResult):
 
         return subunit.RemotedTestCase(prefix + test.id() + suffix)
 
-    def addError(self, test, details=None):
+    def addError(self, test, err=None):
         test = self._add_prefix(test)
         self.error_added+=1
         self.total_error+=1
-        self._ops.addError(test, details)
+        self._ops.addError(test, err)
         self.output = None
         if self.fail_immediately:
             raise ImmediateFail()
 
-    def addSkip(self, test, details=None):
+    def addSkip(self, test, reason=None):
         self.seen_output = True
         test = self._add_prefix(test)
-        self._ops.addSkip(test, details)
+        self._ops.addSkip(test, reason)
         self.output = None
 
-    def addExpectedFailure(self, test, details=None):
+    def addExpectedFailure(self, test, err=None):
         test = self._add_prefix(test)
-        self._ops.addExpectedFailure(test, details)
+        self._ops.addExpectedFailure(test, err)
         self.output = None
 
-    def addUnexpectedSuccess(self, test, details=None):
+    def addUnexpectedSuccess(self, test):
         test = self._add_prefix(test)
         self.uxsuccess_added+=1
         self.total_uxsuccess+=1
-        self._ops.addUnexpectedSuccess(test, details)
+        self._ops.addUnexpectedSuccess(test)
         if self.output:
             self._ops.output_msg(self.output)
         self.output = None
         if self.fail_immediately:
             raise ImmediateFail()
 
-    def addFailure(self, test, details=None):
+    def addFailure(self, test, err=None):
         test = self._add_prefix(test)
         xfail_reason = find_in_list(self.expected_failures, test.id())
         if xfail_reason is None:
@@ -321,39 +331,30 @@ class FilterOps(testtools.testresult.TestResult):
         if xfail_reason is not None:
             self.xfail_added+=1
             self.total_xfail+=1
-            if details is not None:
-                details = subunit.RemoteError(unicode(details[1]) + xfail_reason.decode("utf-8"))
-            else:
-                details = subunit.RemoteError(xfail_reason.decode("utf-8"))
-            self._ops.addExpectedFailure(test, details)
+            self._ops.addExpectedFailure(test, err)
         else:
             self.fail_added+=1
             self.total_fail+=1
-            self._ops.addFailure(test, details)
+            self._ops.addFailure(test, err)
             if self.output:
                 self._ops.output_msg(self.output)
             if self.fail_immediately:
                 raise ImmediateFail()
         self.output = None
 
-    def addSuccess(self, test, details=None):
+    def addSuccess(self, test):
         test = self._add_prefix(test)
         xfail_reason = find_in_list(self.expected_failures, test.id())
         if xfail_reason is not None:
             self.uxsuccess_added += 1
             self.total_uxsuccess += 1
-            if details is None:
-                details = {}
-            details['reason'] = content.Content(
-                content_type.ContentType("text", "plain",
-                    {"charset": "utf8"}), lambda: xfail_reason)
-            self._ops.addUnexpectedSuccess(test, details)
+            self._ops.addUnexpectedSuccess(test)
             if self.output:
                 self._ops.output_msg(self.output)
             if self.fail_immediately:
                 raise ImmediateFail()
         else:
-            self._ops.addSuccess(test, details)
+            self._ops.addSuccess(test)
         self.output = None
 
     def skip_testsuite(self, name, reason=None):
@@ -485,11 +486,17 @@ class PlainFormatter(TestsuiteEnabledTestResult):
         if not self.verbose:
             self.test_output[name] = ""
 
-        out = "[%d" % self.index
+        total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
+                       self.statistics['TESTS_EXPECTED_FAIL'] +
+                       self.statistics['TESTS_ERROR'] +
+                       self.statistics['TESTS_UNEXPECTED_FAIL'] +
+                       self.statistics['TESTS_UNEXPECTED_OK'])
+
+        out = "[%d(%d)" % (self.index, total_tests)
         if self.totalsuites is not None:
             out += "/%d" % self.totalsuites
         if self.start_time is not None:
-            out += " in " + self._format_time(self.last_time - self.start_time)
+            out += " at " + self._format_time(self.last_time - self.start_time)
         if self.suitesfailed:
             out += ", %d errors" % (len(self.suitesfailed),)
         out += "] %s" % name
@@ -541,22 +548,22 @@ class PlainFormatter(TestsuiteEnabledTestResult):
     def addSuccess(self, test):
         self.end_test(test.id(), "success", False)
 
-    def addError(self, test, details=None):
-        self.end_test(test.id(), "error", True, details)
+    def addError(self, test, err=None):
+        self.end_test(test.id(), "error", True, err)
 
-    def addFailure(self, test, details=None):
-        self.end_test(test.id(), "failure", True, details)
+    def addFailure(self, test, err=None):
+        self.end_test(test.id(), "failure", True, err)
 
-    def addSkip(self, test, details=None):
-        self.end_test(test.id(), "skip", False, details)
+    def addSkip(self, test, reason=None):
+        self.end_test(test.id(), "skip", False, reason)
 
-    def addExpectedFailure(self, test, details=None):
-        self.end_test(test.id(), "xfail", False, details)
+    def addExpectedFailure(self, test, err=None):
+        self.end_test(test.id(), "xfail", False, err)
 
-    def addUnexpectedSuccess(self, test, details=None):
-        self.end_test(test.id(), "uxsuccess", True, details)
+    def addUnexpectedSuccess(self, test):
+        self.end_test(test.id(), "uxsuccess", True)
 
-    def end_test(self, testname, result, unexpected, details=None):
+    def end_test(self, testname, result, unexpected, err=None):
         if not unexpected:
             self.test_output[self.name] = ""
             if not self.immediate:
@@ -571,8 +578,8 @@ class PlainFormatter(TestsuiteEnabledTestResult):
             self.test_output[self.name] = ""
 
         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
-        if details is not None:
-            self.test_output[self.name] += "REASON: %s\n" % (unicode(details[1]).encode("utf-8").strip(),)
+        if err is not None:
+            self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
 
         if self.immediate and not self.verbose:
             sys.stdout.write(self.test_output[self.name])