# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from __future__ import print_function
__all__ = ['parse_results']
import datetime
import os
from samba import subunit
from samba.subunit.run import TestProtocolClient
-from samba.subunit import iso8601
import unittest
-from samba.compat import binary_type
+try:
+ from dateutil.parser import isoparse as iso_parse_date
+except ImportError:
+ try:
+ from iso8601 import parse_date as iso_parse_date;
+ except ImportError:
+ print('Install either python-dateutil >= 2.7.1 or python-iso8601')
VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
exitcode = 0
open_tests = {}
- while fh:
- l = fh.readline()
- if l == "":
- break
+ for l in fh:
parts = l.split(None, 1)
if not len(parts) == 2 or not l.startswith(parts[0]):
msg_ops.output_msg(l)
elif command == "time":
msg_ops.control_msg(l)
try:
- dt = iso8601.parse_date(arg.rstrip("\n"))
+ dt = iso_parse_date(arg.rstrip("\n"))
except TypeError as e:
print("Unable to parse time line: %s" % arg.rstrip("\n"))
else:
elif command in VALID_RESULTS:
msg_ops.control_msg(l)
result = command
- grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
+ grp = re.match(r"(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
(testname, hasreason) = (grp.group(1), grp.group(2))
if hasreason:
reason = ""
# reason may be specified in next lines
terminated = False
- while fh:
- l = fh.readline()
- if l == "":
- break
+ for l in fh:
msg_ops.control_msg(l)
- if l[-2:] == "]\n":
- reason += l[:-2]
+ if l == "]\n":
terminated = True
break
else:
reason += l
- if isinstance(reason, binary_type):
+ if isinstance(reason, bytes):
remote_error = subunit.RemoteError(reason.decode("utf-8"))
else:
remote_error = subunit.RemoteError(reason)
if not terminated:
statistics['TESTS_ERROR'] += 1
- msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
+ msg_ops.addError(subunit.RemotedTestCase(testname),
+ subunit.RemoteError(u"result (%s) reason (%s) interrupted" % (result, reason)))
return 1
else:
reason = None
def read_test_regexes(*names):
- ret = {}
+ ret = []
files = []
for name in names:
# if we are given a directory, we read all the files it contains
files.append(name)
for filename in files:
- f = open(filename, 'r')
- try:
+ with open(filename, 'r') as f:
for l in f:
l = l.strip()
if l == "" or l[0] == "#":
continue
if "#" in l:
(regex, reason) = l.split("#", 1)
- ret[regex.strip()] = reason.strip()
+ ret.append(re.compile(regex.strip()))
else:
- ret[l] = None
- finally:
- f.close()
+ ret.append(re.compile(l))
+
return ret
def find_in_list(regexes, fullname):
- for regex, reason in regexes.items():
- if re.match(regex, fullname):
- if reason is None:
- return ""
- return reason
- return None
+ for regex in regexes:
+ if regex.match(fullname):
+ return True
+ return False
class ImmediateFail(Exception):
self.error_added += 1
self.total_error += 1
self._ops.addError(test, err)
+ self._ops.writeOutcome(test)
self.output = None
if self.fail_immediately:
raise ImmediateFail()
self.seen_output = True
test = self._add_prefix(test)
self._ops.addSkip(test, reason)
+ self._ops.writeOutcome(test)
self.output = None
def addExpectedFailure(self, test, err=None):
test = self._add_prefix(test)
self._ops.addExpectedFailure(test, err)
+ self._ops.writeOutcome(test)
self.output = None
def addUnexpectedSuccess(self, test):
self.uxsuccess_added += 1
self.total_uxsuccess += 1
self._ops.addUnexpectedSuccess(test)
+ self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
self.output = None
def addFailure(self, test, err=None):
test = self._add_prefix(test)
- xfail_reason = find_in_list(self.expected_failures, test.id())
- if xfail_reason is None:
- xfail_reason = find_in_list(self.flapping, test.id())
- if xfail_reason is not None:
+ xfail = find_in_list(self.expected_failures, test.id())
+ if not xfail:
+ xfail = find_in_list(self.flapping, test.id())
+ if xfail:
self.xfail_added += 1
self.total_xfail += 1
self._ops.addExpectedFailure(test, err)
+ self._ops.writeOutcome(test)
else:
self.fail_added += 1
self.total_fail += 1
self._ops.addFailure(test, err)
+ self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
def addSuccess(self, test):
test = self._add_prefix(test)
- xfail_reason = find_in_list(self.expected_failures, test.id())
- if xfail_reason is not None:
+ xfail = find_in_list(self.expected_failures, test.id())
+ if xfail:
self.uxsuccess_added += 1
self.total_uxsuccess += 1
self._ops.addUnexpectedSuccess(test)
+ self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
raise ImmediateFail()
else:
self._ops.addSuccess(test)
+ self._ops.writeOutcome(test)
self.output = None
def skip_testsuite(self, name, reason=None):
if expected_failures is not None:
self.expected_failures = expected_failures
else:
- self.expected_failures = {}
+ self.expected_failures = []
if flapping is not None:
self.flapping = flapping
else:
- self.flapping = {}
+ self.flapping = []
self.strip_ok_output = strip_ok_output
self.xfail_added = 0
self.fail_added = 0