make perftest: for performance testing
[sfrench/samba-autobuild/.git] / selftest / subunithelper.py
index 6f1fdcee127284e057282ba0d1c9ec854e73bbf4..c17036defbab6a24925d753bfeae7485257da4b9 100644 (file)
 
 __all__ = ['parse_results']
 
+import datetime
 import re
 import sys
-import subunit
-import subunit.iso8601
-import testtools
-from testtools import content, content_type
+from samba import subunit
+from samba.subunit.run import TestProtocolClient
+from samba.subunit import iso8601
+import unittest
 
-VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
+VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
+                     'knownfail', 'error', 'xfail', 'skip-testsuite',
+                     'testsuite-failure', 'testsuite-xfail',
+                     'testsuite-success', 'testsuite-error',
+                     'uxsuccess', 'testsuite-uxsuccess'])
 
-class TestsuiteEnabledTestResult(testtools.testresult.TestResult):
+class TestsuiteEnabledTestResult(unittest.TestResult):
 
     def start_testsuite(self, name):
         raise NotImplementedError(self.start_testsuite)
@@ -57,7 +62,7 @@ def parse_results(msg_ops, statistics, fh):
         elif command == "time":
             msg_ops.control_msg(l)
             try:
-                dt = subunit.iso8601.parse_date(arg.rstrip("\n"))
+                dt = iso8601.parse_date(arg.rstrip("\n"))
             except TypeError, e:
                 print "Unable to parse time line: %s" % arg.rstrip("\n")
             else:
@@ -120,7 +125,7 @@ def parse_results(msg_ops, statistics, fh):
                     msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
                 else:
                     statistics['TESTS_UNEXPECTED_OK']+=1
-                    msg_ops.addUnexpectedSuccess(test, remote_error)
+                    msg_ops.addUnexpectedSuccess(test)
                     exitcode = 1
             elif result in ("failure", "fail"):
                 try:
@@ -191,7 +196,17 @@ def parse_results(msg_ops, statistics, fh):
     return exitcode
 
 
-class SubunitOps(subunit.TestProtocolClient,TestsuiteEnabledTestResult):
+class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
+
+    def progress(self, count, whence):
+        if whence == subunit.PROGRESS_POP:
+            self._stream.write("progress: pop\n")
+        elif whence == subunit.PROGRESS_PUSH:
+            self._stream.write("progress: push\n")
+        elif whence == subunit.PROGRESS_SET:
+            self._stream.write("progress: %d\n" % count)
+        elif whence == subunit.PROGRESS_CUR:
+            raise NotImplementedError
 
     # The following are Samba extensions:
     def start_testsuite(self, name):
@@ -247,7 +262,7 @@ class ImmediateFail(Exception):
         super(ImmediateFail, self).__init__("test failed and fail_immediately set")
 
 
-class FilterOps(testtools.testresult.TestResult):
+class FilterOps(unittest.TestResult):
 
     def control_msg(self, msg):
         pass # We regenerate control messages, so ignore this
@@ -273,47 +288,40 @@ class FilterOps(testtools.testresult.TestResult):
         self._ops.startTest(test)
 
     def _add_prefix(self, test):
-        prefix = ""
-        suffix = ""
-        if self.prefix is not None:
-            prefix = self.prefix
-        if self.suffix is not None:
-            suffix = self.suffix
+        return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
 
-        return subunit.RemotedTestCase(prefix + test.id() + suffix)
-
-    def addError(self, test, details=None):
+    def addError(self, test, err=None):
         test = self._add_prefix(test)
         self.error_added+=1
         self.total_error+=1
-        self._ops.addError(test, details)
+        self._ops.addError(test, err)
         self.output = None
         if self.fail_immediately:
             raise ImmediateFail()
 
-    def addSkip(self, test, details=None):
+    def addSkip(self, test, reason=None):
         self.seen_output = True
         test = self._add_prefix(test)
-        self._ops.addSkip(test, details)
+        self._ops.addSkip(test, reason)
         self.output = None
 
-    def addExpectedFailure(self, test, details=None):
+    def addExpectedFailure(self, test, err=None):
         test = self._add_prefix(test)
-        self._ops.addExpectedFailure(test, details)
+        self._ops.addExpectedFailure(test, err)
         self.output = None
 
-    def addUnexpectedSuccess(self, test, details=None):
+    def addUnexpectedSuccess(self, test):
         test = self._add_prefix(test)
         self.uxsuccess_added+=1
         self.total_uxsuccess+=1
-        self._ops.addUnexpectedSuccess(test, details)
+        self._ops.addUnexpectedSuccess(test)
         if self.output:
             self._ops.output_msg(self.output)
         self.output = None
         if self.fail_immediately:
             raise ImmediateFail()
 
-    def addFailure(self, test, details=None):
+    def addFailure(self, test, err=None):
         test = self._add_prefix(test)
         xfail_reason = find_in_list(self.expected_failures, test.id())
         if xfail_reason is None:
@@ -321,39 +329,30 @@ class FilterOps(testtools.testresult.TestResult):
         if xfail_reason is not None:
             self.xfail_added+=1
             self.total_xfail+=1
-            if details is not None:
-                details = subunit.RemoteError(unicode(details[1]) + xfail_reason.decode("utf-8"))
-            else:
-                details = subunit.RemoteError(xfail_reason.decode("utf-8"))
-            self._ops.addExpectedFailure(test, details)
+            self._ops.addExpectedFailure(test, err)
         else:
             self.fail_added+=1
             self.total_fail+=1
-            self._ops.addFailure(test, details)
+            self._ops.addFailure(test, err)
             if self.output:
                 self._ops.output_msg(self.output)
             if self.fail_immediately:
                 raise ImmediateFail()
         self.output = None
 
-    def addSuccess(self, test, details=None):
+    def addSuccess(self, test):
         test = self._add_prefix(test)
         xfail_reason = find_in_list(self.expected_failures, test.id())
         if xfail_reason is not None:
             self.uxsuccess_added += 1
             self.total_uxsuccess += 1
-            if details is None:
-                details = {}
-            details['reason'] = content.Content(
-                content_type.ContentType("text", "plain",
-                    {"charset": "utf8"}), lambda: xfail_reason)
-            self._ops.addUnexpectedSuccess(test, details)
+            self._ops.addUnexpectedSuccess(test)
             if self.output:
                 self._ops.output_msg(self.output)
             if self.fail_immediately:
                 raise ImmediateFail()
         else:
-            self._ops.addSuccess(test, details)
+            self._ops.addSuccess(test)
         self.output = None
 
     def skip_testsuite(self, name, reason=None):
@@ -431,6 +430,73 @@ class FilterOps(testtools.testresult.TestResult):
         self.fail_immediately = fail_immediately
 
 
+class PerfFilterOps(unittest.TestResult):
+
+    def progress(self, delta, whence):
+        pass
+
+    def output_msg(self, msg):
+        pass
+
+    def control_msg(self, msg):
+        pass
+
+    def skip_testsuite(self, name, reason=None):
+        self._ops.skip_testsuite(name, reason)
+
+    def start_testsuite(self, name):
+        self.suite_has_time = False
+
+    def end_testsuite(self, name, result, reason=None):
+        pass
+
+    def _add_prefix(self, test):
+        return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
+
+    def time(self, time):
+        self.latest_time = time
+        #self._ops.output_msg("found time %s\n" % time)
+        self.suite_has_time = True
+
+    def get_time(self):
+        if self.suite_has_time:
+            return self.latest_time
+        return datetime.datetime.utcnow()
+
+    def startTest(self, test):
+        self.seen_output = True
+        test = self._add_prefix(test)
+        self.starts[test.id()] = self.get_time()
+
+    def addSuccess(self, test):
+        test = self._add_prefix(test)
+        tid = test.id()
+        if tid not in self.starts:
+            self._ops.addError(test, "%s succeeded without ever starting!" % tid)
+        delta = self.get_time() - self.starts[tid]
+        self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
+
+    def addFailure(self, test, err=''):
+        tid = test.id()
+        delta = self.get_time() - self.starts[tid]
+        self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
+                             (tid, delta.total_seconds(), err))
+
+    def addError(self, test, err=''):
+        tid = test.id()
+        delta = self.get_time() - self.starts[tid]
+        self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
+                             (tid, delta.total_seconds(), err))
+
+    def __init__(self, out, prefix='', suffix=''):
+        self._ops = out
+        self.prefix = prefix or ''
+        self.suffix = suffix or ''
+        self.starts = {}
+        self.seen_output = False
+        self.suite_has_time = False
+
+
 class PlainFormatter(TestsuiteEnabledTestResult):
 
     def __init__(self, verbose, immediate, statistics,
@@ -485,11 +551,17 @@ class PlainFormatter(TestsuiteEnabledTestResult):
         if not self.verbose:
             self.test_output[name] = ""
 
-        out = "[%d" % self.index
+        total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
+                       self.statistics['TESTS_EXPECTED_FAIL'] +
+                       self.statistics['TESTS_ERROR'] +
+                       self.statistics['TESTS_UNEXPECTED_FAIL'] +
+                       self.statistics['TESTS_UNEXPECTED_OK'])
+
+        out = "[%d(%d)" % (self.index, total_tests)
         if self.totalsuites is not None:
             out += "/%d" % self.totalsuites
         if self.start_time is not None:
-            out += " in " + self._format_time(self.last_time - self.start_time)
+            out += " at " + self._format_time(self.last_time - self.start_time)
         if self.suitesfailed:
             out += ", %d errors" % (len(self.suitesfailed),)
         out += "] %s" % name
@@ -541,22 +613,22 @@ class PlainFormatter(TestsuiteEnabledTestResult):
     def addSuccess(self, test):
         self.end_test(test.id(), "success", False)
 
-    def addError(self, test, details=None):
-        self.end_test(test.id(), "error", True, details)
+    def addError(self, test, err=None):
+        self.end_test(test.id(), "error", True, err)
 
-    def addFailure(self, test, details=None):
-        self.end_test(test.id(), "failure", True, details)
+    def addFailure(self, test, err=None):
+        self.end_test(test.id(), "failure", True, err)
 
-    def addSkip(self, test, details=None):
-        self.end_test(test.id(), "skip", False, details)
+    def addSkip(self, test, reason=None):
+        self.end_test(test.id(), "skip", False, reason)
 
-    def addExpectedFailure(self, test, details=None):
-        self.end_test(test.id(), "xfail", False, details)
+    def addExpectedFailure(self, test, err=None):
+        self.end_test(test.id(), "xfail", False, err)
 
-    def addUnexpectedSuccess(self, test, details=None):
-        self.end_test(test.id(), "uxsuccess", True, details)
+    def addUnexpectedSuccess(self, test):
+        self.end_test(test.id(), "uxsuccess", True)
 
-    def end_test(self, testname, result, unexpected, details=None):
+    def end_test(self, testname, result, unexpected, err=None):
         if not unexpected:
             self.test_output[self.name] = ""
             if not self.immediate:
@@ -571,8 +643,8 @@ class PlainFormatter(TestsuiteEnabledTestResult):
             self.test_output[self.name] = ""
 
         self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
-        if details is not None:
-            self.test_output[self.name] += "REASON: %s\n" % (unicode(details[1]).encode("utf-8").strip(),)
+        if err is not None:
+            self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
 
         if self.immediate and not self.verbose:
             sys.stdout.write(self.test_output[self.name])