__all__ = ['parse_results']
+import datetime
import re
import sys
+import os
from samba import subunit
from samba.subunit.run import TestProtocolClient
-from subunit import iso8601
+from samba.subunit import iso8601
import unittest
-VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess', 'testsuite-uxsuccess']
+VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
+ 'knownfail', 'error', 'xfail', 'skip-testsuite',
+ 'testsuite-failure', 'testsuite-xfail',
+ 'testsuite-success', 'testsuite-error',
+ 'uxsuccess', 'testsuite-uxsuccess'])
class TestsuiteEnabledTestResult(unittest.TestResult):
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
statistics['TESTS_UNEXPECTED_OK']+=1
- msg_ops.addUnexpectedSuccess(test, remote_error)
+ msg_ops.addUnexpectedSuccess(test)
exitcode = 1
elif result in ("failure", "fail"):
try:
class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
def progress(self, count, whence):
- pass
+ if whence == subunit.PROGRESS_POP:
+ self._stream.write("progress: pop\n")
+ elif whence == subunit.PROGRESS_PUSH:
+ self._stream.write("progress: push\n")
+ elif whence == subunit.PROGRESS_SET:
+ self._stream.write("progress: %d\n" % count)
+ elif whence == subunit.PROGRESS_CUR:
+ raise NotImplementedError
# The following are Samba extensions:
def start_testsuite(self, name):
self._stream.write(msg)
-def read_test_regexes(name):
+def read_test_regexes(*names):
ret = {}
- f = open(name, 'r')
- try:
- for l in f:
- l = l.strip()
- if l == "" or l[0] == "#":
- continue
- if "#" in l:
- (regex, reason) = l.split("#", 1)
- ret[regex.strip()] = reason.strip()
- else:
- ret[l] = None
- finally:
- f.close()
+ files = []
+ for name in names:
+ # if we are given a directory, we read all the files it contains
+ # (except the ones that end with "~").
+ if os.path.isdir(name):
+ files.extend([os.path.join(name, x)
+ for x in os.listdir(name)
+ if x[-1] != '~'])
+ else:
+ files.append(name)
+
+ for filename in files:
+ f = open(filename, 'r')
+ try:
+ for l in f:
+ l = l.strip()
+ if l == "" or l[0] == "#":
+ continue
+ if "#" in l:
+ (regex, reason) = l.split("#", 1)
+ ret[regex.strip()] = reason.strip()
+ else:
+ ret[l] = None
+ finally:
+ f.close()
return ret
self._ops.startTest(test)
def _add_prefix(self, test):
- prefix = ""
- suffix = ""
- if self.prefix is not None:
- prefix = self.prefix
- if self.suffix is not None:
- suffix = self.suffix
-
- return subunit.RemotedTestCase(prefix + test.id() + suffix)
+ return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
def addError(self, test, err=None):
test = self._add_prefix(test)
self._ops.addExpectedFailure(test, err)
self.output = None
- def addUnexpectedSuccess(self, test, err=None):
+ def addUnexpectedSuccess(self, test):
test = self._add_prefix(test)
self.uxsuccess_added+=1
self.total_uxsuccess+=1
- self._ops.addUnexpectedSuccess(test, err)
+ self._ops.addUnexpectedSuccess(test)
if self.output:
self._ops.output_msg(self.output)
self.output = None
if xfail_reason is not None:
self.uxsuccess_added += 1
self.total_uxsuccess += 1
- self._ops.addUnexpectedSuccess(test, subunit.RemoteError(xfail_reason))
+ self._ops.addUnexpectedSuccess(test)
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
self.fail_immediately = fail_immediately
+class PerfFilterOps(unittest.TestResult):
+
+ def progress(self, delta, whence):
+ pass
+
+ def output_msg(self, msg):
+ pass
+
+ def control_msg(self, msg):
+ pass
+
+ def skip_testsuite(self, name, reason=None):
+ self._ops.skip_testsuite(name, reason)
+
+ def start_testsuite(self, name):
+ self.suite_has_time = False
+
+ def end_testsuite(self, name, result, reason=None):
+ pass
+
+ def _add_prefix(self, test):
+ return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
+
+ def time(self, time):
+ self.latest_time = time
+ #self._ops.output_msg("found time %s\n" % time)
+ self.suite_has_time = True
+
+ def get_time(self):
+ if self.suite_has_time:
+ return self.latest_time
+ return datetime.datetime.utcnow()
+
+ def startTest(self, test):
+ self.seen_output = True
+ test = self._add_prefix(test)
+ self.starts[test.id()] = self.get_time()
+
+ def addSuccess(self, test):
+ test = self._add_prefix(test)
+ tid = test.id()
+ if tid not in self.starts:
+ self._ops.addError(test, "%s succeeded without ever starting!" % tid)
+ delta = self.get_time() - self.starts[tid]
+ self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
+
+ def addFailure(self, test, err=''):
+ tid = test.id()
+ delta = self.get_time() - self.starts[tid]
+ self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
+ (tid, delta.total_seconds(), err))
+
+ def addError(self, test, err=''):
+ tid = test.id()
+ delta = self.get_time() - self.starts[tid]
+ self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
+ (tid, delta.total_seconds(), err))
+
+ def __init__(self, out, prefix='', suffix=''):
+ self._ops = out
+ self.prefix = prefix or ''
+ self.suffix = suffix or ''
+ self.starts = {}
+ self.seen_output = False
+ self.suite_has_time = False
+
+
class PlainFormatter(TestsuiteEnabledTestResult):
def __init__(self, verbose, immediate, statistics,
self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
if err is not None:
- self.test_output[self.name] += "REASON: %s\n" % err.encode("utf-8").strip()
+ self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
if self.immediate and not self.verbose:
sys.stdout.write(self.test_output[self.name])