#! /usr/bin/env python
# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
+# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; either version 2 of the
+# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
-# USA
+# along with this program; if not, see <http://www.gnu.org/licenses/>.
"""comfychair: a Python-based instrument of software torture.
Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
+Copyright (C) 2003 by Tim Potter <tpot@samba.org>
This is a test framework designed for testing programs written in
-Python, or (through a fork/exec interface) any other language. It is
-similar in design to the very nice 'svntest' system used by
-Subversion, but has no Subversion-specific features.
+Python, or (through a fork/exec interface) any other language.
-It is somewhat similar to PyUnit, except:
+For more information, see the file README.comfychair.
- - it allows capture of detailed log messages from a test, to be
- optionally displayed if the test fails.
-
- - it allows execution of a specified subset of tests
-
- - it avoids Java idioms that are not so useful in Python
-
-WRITING TESTS:
-
- Each test case is a callable object, typically a function. Its
- documentation string describes the test, and the first line of the
- docstring should be a brief name.
-
- The test should return 0 for pass, or non-zero for failure.
- Alternatively they may raise an exception.
-
- Tests may import this "comfychair" module to get some useful
- utilities, but that is not strictly required.
-
+To run a test suite based on ComfyChair, just run it as a program.
"""
-# TODO: Put everything into a temporary directory?
-
-# TODO: Have a means for tests to customize the display of their
-# failure messages. In particular, if a shell command failed, then
-# give its stderr.
-
import sys, re
+
class TestCase:
"""A base class for tests. This class defines required functions which
can optionally be overridden by subclasses. It also provides some
def __init__(self):
self.test_log = ""
self.background_pids = []
+ self._cleanups = []
+ self._enter_rundir()
+ self._save_environment()
+ self.add_cleanup(self.teardown)
+
+
+ # --------------------------------------------------
+ # Save and restore directory
+ def _enter_rundir(self):
+ import os
+ self.basedir = os.getcwd()
+ self.add_cleanup(self._restore_directory)
+ self.rundir = os.path.join(self.basedir,
+ 'testtmp',
+ self.__class__.__name__)
+ self.tmpdir = os.path.join(self.rundir, 'tmp')
+ os.system("rm -fr %s" % self.rundir)
+ os.makedirs(self.tmpdir)
+ os.system("mkdir -p %s" % self.rundir)
+ os.chdir(self.rundir)
+
+ def _restore_directory(self):
+ import os
+ os.chdir(self.basedir)
+
+ # --------------------------------------------------
+ # Save and restore environment
+ def _save_environment(self):
+ import os
+ self._saved_environ = os.environ.copy()
+ self.add_cleanup(self._restore_environment)
+
+ def _restore_environment(self):
+ import os
+ os.environ.clear()
+ os.environ.update(self._saved_environ)
+
- def setUp(self):
+ def setup(self):
"""Set up test fixture."""
pass
- def tearDown(self):
+ def teardown(self):
"""Tear down test fixture."""
pass
- def runTest(self):
+ def runtest(self):
"""Run the test."""
pass
+
+ def add_cleanup(self, c):
+ """Queue a cleanup to be run when the test is complete."""
+ self._cleanups.append(c)
+
+
def fail(self, reason = ""):
"""Say the test failed."""
raise AssertionError(reason)
+
+ #############################################################
+ # Requisition methods
+
+ def require(self, predicate, message):
+ """Check a predicate for running this test.
+
+If the predicate value is not true, the test is skipped with a message explaining
+why."""
+ if not predicate:
+ raise NotRunError, message
+
+ def require_root(self):
+ """Skip this test unless run by root."""
+ import os
+ self.require(os.getuid() == 0,
+ "must be root to run this test")
+
+ #############################################################
+ # Assertion methods
+
def assert_(self, expr, reason = ""):
if not expr:
raise AssertionError(reason)
+ def assert_equal(self, a, b):
+ if not a == b:
+ raise AssertionError("assertEquals failed: %s" % `(a, b)`)
+
+ def assert_notequal(self, a, b):
+ if a == b:
+ raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
+
def assert_re_match(self, pattern, s):
"""Assert that a string matches a particular pattern
AssertionError if not matched
"""
if not re.match(pattern, s):
- raise AssertionError("string %s does not match regexp %s" % (`s`, `pattern`))
+ raise AssertionError("string does not match regexp\n"
+ " string: %s\n"
+ " re: %s" % (`s`, `pattern`))
- def assert_regexp(self, pattern, s):
+ def assert_re_search(self, pattern, s):
"""Assert that a string *contains* a particular pattern
Inputs:
AssertionError if not matched
"""
if not re.search(pattern, s):
- raise AssertionError("string %s does not contain regexp %s" % (`s`, `pattern`))
+ raise AssertionError("string does not contain regexp\n"
+ " string: %s\n"
+ " re: %s" % (`s`, `pattern`))
def assert_no_file(self, filename):
assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
- def runCmdNoWait(self, cmd):
+ #############################################################
+ # Methods for running programs
+
+ def runcmd_background(self, cmd):
import os
- name = cmd[0]
self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
- pid = os.spawnvp(os.P_NOWAIT, name, cmd)
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
+ finally:
+ os._exit(127)
self.test_log = self.test_log + "pid: %d\n" % pid
return pid
- def runCmd(self, cmd, expectedResult = 0):
+ def runcmd(self, cmd, expectedResult = 0):
"""Run a command, fail if the command returns an unexpected exit
code. Return the output produced."""
- rc, output = self.runCmdUnchecked(cmd)
+ rc, output, stderr = self.runcmd_unchecked(cmd)
if rc != expectedResult:
- raise AssertionError("command returned %d; expected %s: \"%s\"" %
- (rc, expectedResult, cmd))
+ raise AssertionError("""command returned %d; expected %s: \"%s\"
+stdout:
+%s
+stderr:
+%s""" % (rc, expectedResult, cmd, output, stderr))
- return output
+ return output, stderr
- def runCmdUnchecked(self, cmd, skip_on_noexec = 0):
- """Invoke a command; return (exitcode, stdout)"""
- import os, popen2
- pobj = popen2.Popen4(cmd)
- output = pobj.fromchild.read()
- waitstatus = pobj.wait()
+
+ def run_captured(self, cmd):
+ """Run a command, capturing stdout and stderr.
+
+ Based in part on popen2.py
+
+ Returns (waitstatus, stdout, stderr)."""
+ import os, types
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ pid = os.getpid()
+ openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
+
+ outfd = os.open('%d.out' % pid, openmode, 0666)
+ os.dup2(outfd, 1)
+ os.close(outfd)
+
+ errfd = os.open('%d.err' % pid, openmode, 0666)
+ os.dup2(errfd, 2)
+ os.close(errfd)
+
+ if isinstance(cmd, types.StringType):
+ cmd = ['/bin/sh', '-c', cmd]
+
+ os.execvp(cmd[0], cmd)
+ finally:
+ os._exit(127)
+ else:
+ # parent
+ exited_pid, waitstatus = os.waitpid(pid, 0)
+ stdout = open('%d.out' % pid).read()
+ stderr = open('%d.err' % pid).read()
+ return waitstatus, stdout, stderr
+
+
+ def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
+ """Invoke a command; return (exitcode, stdout, stderr)"""
+ import os
+ waitstatus, stdout, stderr = self.run_captured(cmd)
assert not os.WIFSIGNALED(waitstatus), \
- ("%s terminated with signal %d", cmd, os.WTERMSIG(waitstatus))
+ ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
rc = os.WEXITSTATUS(waitstatus)
self.test_log = self.test_log + ("""Run command: %s
-Wait status: %#x
-Output:
-%s""" % (cmd, waitstatus, output))
+Wait status: %#x (exit code %d, signal %d)
+stdout:
+%s
+stderr:
+%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
+ stdout, stderr))
if skip_on_noexec and rc == 127:
# Either we could not execute the command or the command
# returned exit code 127. According to system(3) we can't
# tell the difference.
- raise NotRunError, "could not execute %s" % cmd
- return rc, output
+ raise NotRunError, "could not execute %s" % `cmd`
+ return rc, stdout, stderr
+
- def explainFailure(self, exc_info = None):
- import traceback
- # Move along, nothing to see here
- if not exc_info and self.test_log == "":
- return
- print "-----------------------------------------------------------------"
- if exc_info:
- traceback.print_exc(file=sys.stdout)
+ def explain_failure(self, exc_info = None):
+ print "test_log:"
print self.test_log
- print "-----------------------------------------------------------------"
- def require(self, predicate, message):
- """Check a predicate for running this test.
-
-If the predicate value is not true, the test is skipped with a message explaining
-why."""
- if not predicate:
- raise NotRunError, message
-
- def require_root(self):
- """Skip this test unless run by root."""
- import os
- self.require(os.getuid() == 0,
- "must be root to run this test")
def log(self, msg):
"""Log a message to the test log. This message is displayed if
the verbose option."""
self.test_log = self.test_log + msg + "\n"
+
class NotRunError(Exception):
+ """Raised if a test must be skipped because of missing resources"""
def __init__(self, value = None):
self.value = value
-def test_name(test):
- """Return a human-readable name for a test.
- Inputs:
- test some kind of callable test object
+def _report_error(case, debugger):
+ """Ask the test case to explain failure, and optionally run a debugger
- Returns:
- name string: a short printable name
- """
- try:
- return test.__name__
- except:
- return `test`
+ Input:
+ case TestCase instance
+ debugger if true, a debugger function to be applied to the traceback
+"""
+ import sys
+ ex = sys.exc_info()
+ print "-----------------------------------------------------------------"
+ if ex:
+ import traceback
+ traceback.print_exc(file=sys.stdout)
+ case.explain_failure()
+ print "-----------------------------------------------------------------"
-def runtests(test_list, verbose = 0):
- """Run a series of tests.
+ if debugger:
+ tb = ex[2]
+ debugger(tb)
- Eventually, this routine will also examine sys.argv[] to handle
- extra options.
+
+def runtests(test_list, verbose = 0, debugger = None):
+ """Run a series of tests.
Inputs:
- test_list sequence of callable test objects
+ test_list sequence of TestCase classes
+ verbose print more information as testing proceeds
+ debugger debugger object to be applied to errors
Returns:
unix return code: 0 for success, 1 for failures, 2 for test failure
"""
import traceback
ret = 0
- for test in test_list:
- print "%-60s" % test_name(test),
+ for test_class in test_list:
+ print "%-30s" % _test_name(test_class),
# flush now so that long running tests are easier to follow
sys.stdout.flush()
+ obj = None
try:
try: # run test and show result
- obj = test()
- if hasattr(obj, "setUp"):
- obj.setUp()
- obj.runTest()
+ obj = test_class()
+ obj.setup()
+ obj.runtest()
print "OK"
except KeyboardInterrupt:
print "INTERRUPT"
- obj.explainFailure(sys.exc_info())
+ _report_error(obj, debugger)
ret = 2
break
except NotRunError, msg:
print "NOTRUN, %s" % msg.value
except:
print "FAIL"
- obj.explainFailure(sys.exc_info())
+ _report_error(obj, debugger)
ret = 1
finally:
- try:
- if hasattr(obj, "tearDown"):
- obj.tearDown()
- except KeyboardInterrupt:
- print "interrupted during tearDown"
- obj.explainFailure(sys.exc_info())
- ret = 2
- break
- except:
- print "error during tearDown"
- obj.explainFailure(sys.exc_info())
- ret = 1
+ while obj and obj._cleanups:
+ try:
+ apply(obj._cleanups.pop())
+ except KeyboardInterrupt:
+ print "interrupted during teardown"
+ _report_error(obj, debugger)
+ ret = 2
+ break
+ except:
+ print "error during teardown"
+ _report_error(obj, debugger)
+ ret = 1
# Display log file if we're verbose
if ret == 0 and verbose:
- obj.explainFailure()
+ obj.explain_failure()
return ret
+
+def _test_name(test_class):
+ """Return a human-readable name for a test class.
+ """
+ try:
+ return test_class.__name__
+ except:
+ return `test_class`
+
+
+def print_help():
+ """Help for people running tests"""
+ import sys
+ print """%s: software test suite based on ComfyChair
+
+usage:
+ To run all tests, just run this program. To run particular tests,
+ list them on the command line.
+
+options:
+ --help show usage message
+ --list list available tests
+ --verbose, -v show more information while running tests
+ --post-mortem, -p enter Python debugger on error
+""" % sys.argv[0]
+
+
+def print_list(test_list):
+ """Show list of available tests"""
+ for test_class in test_list:
+ print " %s" % _test_name(test_class)
+
+
+def main(tests, extra_tests=[]):
+ """Main entry point for test suites based on ComfyChair.
+
+ inputs:
+ tests Sequence of TestCase subclasses to be run by default.
+ extra_tests Sequence of TestCase subclasses that are available but
+ not run by default.
+
+Test suites should contain this boilerplate:
+
+ if __name__ == '__main__':
+ comfychair.main(tests)
+
+This function handles standard options such as --help and --list, and
+by default runs all tests in the suggested order.
+
+Calls sys.exit() on completion.
+"""
+ from sys import argv
+ import getopt, sys
+
+ opt_verbose = 0
+ debugger = None
+
+ opts, args = getopt.getopt(argv[1:], 'pv',
+ ['help', 'list', 'verbose', 'post-mortem'])
+ for opt, opt_arg in opts:
+ if opt == '--help':
+ print_help()
+ return
+ elif opt == '--list':
+ print_list(tests + extra_tests)
+ return
+ elif opt == '--verbose' or opt == '-v':
+ opt_verbose = 1
+ elif opt == '--post-mortem' or opt == '-p':
+ import pdb
+ debugger = pdb.post_mortem
+
+ if args:
+ all_tests = tests + extra_tests
+ by_name = {}
+ for t in all_tests:
+ by_name[_test_name(t)] = t
+ which_tests = []
+ for name in args:
+ which_tests.append(by_name[name])
+ else:
+ which_tests = tests
+
+ sys.exit(runtests(which_tests, verbose=opt_verbose,
+ debugger=debugger))
+
+
if __name__ == '__main__':
print __doc__