Merge from Subversion r50.
authorMartin Pool <mbp@samba.org>
Fri, 4 Apr 2003 03:16:27 +0000 (03:16 +0000)
committerMartin Pool <mbp@samba.org>
Fri, 4 Apr 2003 03:16:27 +0000 (03:16 +0000)
source/stf/comfychair.py

index 8ff77269556d92f69ca53638013688d986410a84..522f9bedeba44058e57a6ca66b7d95ae04e35147 100644 (file)
@@ -31,14 +31,9 @@ For more information, see the file README.comfychair.
 To run a test suite based on ComfyChair, just run it as a program.
 """
 
-# TODO: Put everything into a temporary directory?
-
-# TODO: Have a means for tests to customize the display of their
-# failure messages.  In particular, if a shell command failed, then
-# give its stderr.
-
 import sys, re
 
+
 class TestCase:
     """A base class for tests.  This class defines required functions which
     can optionally be overridden by subclasses.  It also provides some
@@ -47,6 +42,43 @@ class TestCase:
     def __init__(self):
         self.test_log = ""
         self.background_pids = []
+        self._cleanups = []
+        self._enter_rundir()
+        self._save_environment()
+        self.add_cleanup(self.teardown)
+
+
+    # --------------------------------------------------
+    # Save and restore directory
+    def _enter_rundir(self):
+        import os
+        self.basedir = os.getcwd()
+        self.add_cleanup(self._restore_directory)
+        self.rundir = os.path.join(self.basedir,
+                                   'testtmp', 
+                                   self.__class__.__name__)
+        self.tmpdir = os.path.join(self.rundir, 'tmp')
+        os.system("rm -fr %s" % self.rundir)
+        os.makedirs(self.tmpdir)
+        os.system("mkdir -p %s" % self.rundir)
+        os.chdir(self.rundir)
+
+    def _restore_directory(self):
+        import os
+        os.chdir(self.basedir)
+
+    # --------------------------------------------------
+    # Save and restore environment
+    def _save_environment(self):
+        import os
+        self._saved_environ = os.environ.copy()
+        self.add_cleanup(self._restore_environment)
+
+    def _restore_environment(self):
+        import os
+        os.environ.clear()
+        os.environ.update(self._saved_environ)
+
     
     def setup(self):
         """Set up test fixture."""
@@ -60,6 +92,12 @@ class TestCase:
         """Run the test."""
         pass
 
+
+    def add_cleanup(self, c):
+        """Queue a cleanup to be run when the test is complete."""
+        self._cleanups.append(c)
+        
+
     def fail(self, reason = ""):
         """Say the test failed."""
         raise AssertionError(reason)
@@ -138,9 +176,14 @@ why."""
 
     def runcmd_background(self, cmd):
         import os
-        name = cmd[0]
         self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
-        pid = os.spawnvp(os.P_NOWAIT, name, cmd)
+        pid = os.fork()
+        if pid == 0:
+            # child
+            try:
+                os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
+            finally:
+                os._exit(127)
         self.test_log = self.test_log + "pid: %d\n" % pid
         return pid
 
@@ -148,44 +191,78 @@ why."""
     def runcmd(self, cmd, expectedResult = 0):
         """Run a command, fail if the command returns an unexpected exit
         code.  Return the output produced."""
-        rc, output = self.runcmd_unchecked(cmd)
+        rc, output, stderr = self.runcmd_unchecked(cmd)
         if rc != expectedResult:
-            raise AssertionError("command returned %d; expected %s: \"%s\"" %
-                                 (rc, expectedResult, cmd))
+            raise AssertionError("""command returned %d; expected %s: \"%s\"
+stdout:
+%s
+stderr:
+%s""" % (rc, expectedResult, cmd, output, stderr))
+
+        return output, stderr
+
+
+    def run_captured(self, cmd):
+        """Run a command, capturing stdout and stderr.
+
+        Based in part on popen2.py
+
+        Returns (waitstatus, stdout, stderr)."""
+        import os, types
+        pid = os.fork()
+        if pid == 0:
+            # child
+            try: 
+                pid = os.getpid()
+                openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
+
+                outfd = os.open('%d.out' % pid, openmode, 0666)
+                os.dup2(outfd, 1)
+                os.close(outfd)
+
+                errfd = os.open('%d.err' % pid, openmode, 0666)
+                os.dup2(errfd, 2)
+                os.close(errfd)
+
+                if isinstance(cmd, types.StringType):
+                    cmd = ['/bin/sh', '-c', cmd]
+
+                os.execvp(cmd[0], cmd)
+            finally:
+                os._exit(127)
+        else:
+            # parent
+            exited_pid, waitstatus = os.waitpid(pid, 0)
+            stdout = open('%d.out' % pid).read()
+            stderr = open('%d.err' % pid).read()
+            return waitstatus, stdout, stderr
 
-        return output
 
     def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
-        """Invoke a command; return (exitcode, stdout)"""
-        import os, popen2
-        pobj = popen2.Popen4(cmd)
-        output = pobj.fromchild.read()
-        waitstatus = pobj.wait()
+        """Invoke a command; return (exitcode, stdout, stderr)"""
+        import os
+        waitstatus, stdout, stderr = self.run_captured(cmd)
         assert not os.WIFSIGNALED(waitstatus), \
-               ("%s terminated with signal %d", cmd, os.WTERMSIG(waitstatus))
+               ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
         rc = os.WEXITSTATUS(waitstatus)
         self.test_log = self.test_log + ("""Run command: %s
 Wait status: %#x (exit code %d, signal %d)
-Output:
+stdout:
+%s
+stderr:
 %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
-         output))
+         stdout, stderr))
         if skip_on_noexec and rc == 127:
             # Either we could not execute the command or the command
             # returned exit code 127.  According to system(3) we can't
             # tell the difference.
             raise NotRunError, "could not execute %s" % `cmd`
-        return rc, output
+        return rc, stdout, stderr
+    
 
     def explain_failure(self, exc_info = None):
-        import traceback
-        # Move along, nothing to see here
-        if not exc_info and self.test_log == "":
-            return
-        print "-----------------------------------------------------------------"
-        if exc_info:
-            traceback.print_exc(file=sys.stdout)
+        print "test_log:"
         print self.test_log
-        print "-----------------------------------------------------------------"
 
 
     def log(self, msg):
@@ -201,14 +278,34 @@ class NotRunError(Exception):
         self.value = value
 
 
-def runtests(test_list, verbose = 0):
-    """Run a series of tests.
+def _report_error(case, debugger):
+    """Ask the test case to explain failure, and optionally run a debugger
 
-    Eventually, this routine will also examine sys.argv[] to handle
-    extra options.
+    Input:
+      case         TestCase instance
+      debugger     if true, a debugger function to be applied to the traceback
+"""
+    import sys
+    ex = sys.exc_info()
+    print "-----------------------------------------------------------------"
+    if ex:
+        import traceback
+        traceback.print_exc(file=sys.stdout)
+    case.explain_failure()
+    print "-----------------------------------------------------------------"
+
+    if debugger:
+        tb = ex[2]
+        debugger(tb)
+
+
+def runtests(test_list, verbose = 0, debugger = None):
+    """Run a series of tests.
 
     Inputs:
-      test_list    sequence of callable test objects
+      test_list    sequence of TestCase classes
+      verbose      print more information as testing proceeds
+      debugger     debugger object to be applied to errors
 
     Returns:
       unix return code: 0 for success, 1 for failures, 2 for test failure
@@ -220,37 +317,37 @@ def runtests(test_list, verbose = 0):
         # flush now so that long running tests are easier to follow
         sys.stdout.flush()
 
+        obj = None
         try:
             try: # run test and show result
                 obj = test_class()
-                if hasattr(obj, "setup"):
-                    obj.setup()
+                obj.setup()
                 obj.runtest()
                 print "OK"
             except KeyboardInterrupt:
                 print "INTERRUPT"
-                obj.explain_failure(sys.exc_info())
+                _report_error(obj, debugger)
                 ret = 2
                 break
             except NotRunError, msg:
                 print "NOTRUN, %s" % msg.value
             except:
                 print "FAIL"
-                obj.explain_failure(sys.exc_info())
+                _report_error(obj, debugger)
                 ret = 1
         finally:
-            try:
-                if hasattr(obj, "teardown"):
-                    obj.teardown()
-            except KeyboardInterrupt:
-                print "interrupted during teardown"
-                obj.explain_failure(sys.exc_info())
-                ret = 2
-                break
-            except:
-                print "error during teardown"
-                obj.explain_failure(sys.exc_info())
-                ret = 1
+            while obj and obj._cleanups:
+                try:
+                    apply(obj._cleanups.pop())
+                except KeyboardInterrupt:
+                    print "interrupted during teardown"
+                    _report_error(obj, debugger)
+                    ret = 2
+                    break
+                except:
+                    print "error during teardown"
+                    _report_error(obj, debugger)
+                    ret = 1
         # Display log file if we're verbose
         if ret == 0 and verbose:
             obj.explain_failure()
@@ -277,9 +374,10 @@ usage:
     list them on the command line.
 
 options:
-    --help           show usage message
-    --list           list available tests
-    --verbose        show more information while running tests
+    --help              show usage message
+    --list              list available tests
+    --verbose, -v       show more information while running tests
+    --post-mortem, -p   enter Python debugger on error
 """ % sys.argv[0]
 
 
@@ -289,9 +387,14 @@ def print_list(test_list):
         print "    %s" % _test_name(test_class)
 
 
-def main(tests):
+def main(tests, extra_tests=[]):
     """Main entry point for test suites based on ComfyChair.
 
+    inputs:
+      tests       Sequence of TestCase subclasses to be run by default.
+      extra_tests Sequence of TestCase subclasses that are available but
+                  not run by default.
+
 Test suites should contain this boilerplate:
 
     if __name__ == '__main__':
@@ -305,28 +408,37 @@ Calls sys.exit() on completion.
     from sys import argv
     import getopt, sys
 
-    verbose = 0
-
-    opts, args = getopt.getopt(argv[1:], '', ['help', 'list', 'verbose'])
-    if ('--help', '') in opts:
-        print_help()
-        return
-    elif ('--list', '') in opts:
-        print_list(tests)
-        return 
+    opt_verbose = 0
+    debugger = None
 
-    if ('--verbose', '') in opts:
-        verbose = 1
+    opts, args = getopt.getopt(argv[1:], 'pv',
+                               ['help', 'list', 'verbose', 'post-mortem'])
+    for opt, opt_arg in opts:
+        if opt == '--help':
+            print_help()
+            return
+        elif opt == '--list':
+            print_list(tests + extra_tests)
+            return
+        elif opt == '--verbose' or opt == '-v':
+            opt_verbose = 1
+        elif opt == '--post-mortem' or opt == '-p':
+            import pdb
+            debugger = pdb.post_mortem
 
     if args:
+        all_tests = tests + extra_tests
         by_name = {}
-        for t in tests:
+        for t in all_tests:
             by_name[_test_name(t)] = t
-        which_tests = [by_name[name] for name in args]
+        which_tests = []
+        for name in args:
+            which_tests.append(by_name[name])
     else:
         which_tests = tests
 
-    sys.exit(runtests(which_tests, verbose))
+    sys.exit(runtests(which_tests, verbose=opt_verbose,
+                      debugger=debugger))
 
 
 if __name__ == '__main__':