r39: * importing .cvsignore files
[samba.git] / source3 / stf / comfychair.py
1 #! /usr/bin/env python
2
3 # Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
4 # Copyright (C) 2003 by Tim Potter <tpot@samba.org>
5
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License as
8 # published by the Free Software Foundation; either version 2 of the
9 # License, or (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 # USA
20
21 """comfychair: a Python-based instrument of software torture.
22
23 Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
24 Copyright (C) 2003 by Tim Potter <tpot@samba.org>
25
26 This is a test framework designed for testing programs written in
27 Python, or (through a fork/exec interface) any other language.
28
29 For more information, see the file README.comfychair.
30
31 To run a test suite based on ComfyChair, just run it as a program.
32 """
33
34 import sys, re
35
36
37 class TestCase:
38     """A base class for tests.  This class defines required functions which
39     can optionally be overridden by subclasses.  It also provides some
40     utility functions for"""
41
42     def __init__(self):
43         self.test_log = ""
44         self.background_pids = []
45         self._cleanups = []
46         self._enter_rundir()
47         self._save_environment()
48         self.add_cleanup(self.teardown)
49
50
51     # --------------------------------------------------
52     # Save and restore directory
53     def _enter_rundir(self):
54         import os
55         self.basedir = os.getcwd()
56         self.add_cleanup(self._restore_directory)
57         self.rundir = os.path.join(self.basedir,
58                                    'testtmp', 
59                                    self.__class__.__name__)
60         self.tmpdir = os.path.join(self.rundir, 'tmp')
61         os.system("rm -fr %s" % self.rundir)
62         os.makedirs(self.tmpdir)
63         os.system("mkdir -p %s" % self.rundir)
64         os.chdir(self.rundir)
65
66     def _restore_directory(self):
67         import os
68         os.chdir(self.basedir)
69
70     # --------------------------------------------------
71     # Save and restore environment
72     def _save_environment(self):
73         import os
74         self._saved_environ = os.environ.copy()
75         self.add_cleanup(self._restore_environment)
76
77     def _restore_environment(self):
78         import os
79         os.environ.clear()
80         os.environ.update(self._saved_environ)
81
82     
83     def setup(self):
84         """Set up test fixture."""
85         pass
86
87     def teardown(self):
88         """Tear down test fixture."""
89         pass
90
91     def runtest(self):
92         """Run the test."""
93         pass
94
95
96     def add_cleanup(self, c):
97         """Queue a cleanup to be run when the test is complete."""
98         self._cleanups.append(c)
99         
100
101     def fail(self, reason = ""):
102         """Say the test failed."""
103         raise AssertionError(reason)
104
105
106     #############################################################
107     # Requisition methods
108
109     def require(self, predicate, message):
110         """Check a predicate for running this test.
111
112 If the predicate value is not true, the test is skipped with a message explaining
113 why."""
114         if not predicate:
115             raise NotRunError, message
116
117     def require_root(self):
118         """Skip this test unless run by root."""
119         import os
120         self.require(os.getuid() == 0,
121                      "must be root to run this test")
122
123     #############################################################
124     # Assertion methods
125
126     def assert_(self, expr, reason = ""):
127         if not expr:
128             raise AssertionError(reason)
129
130     def assert_equal(self, a, b):
131         if not a == b:
132             raise AssertionError("assertEquals failed: %s" % `(a, b)`)
133             
134     def assert_notequal(self, a, b):
135         if a == b:
136             raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
137
138     def assert_re_match(self, pattern, s):
139         """Assert that a string matches a particular pattern
140
141         Inputs:
142           pattern      string: regular expression
143           s            string: to be matched
144
145         Raises:
146           AssertionError if not matched
147           """
148         if not re.match(pattern, s):
149             raise AssertionError("string does not match regexp\n"
150                                  "    string: %s\n"
151                                  "    re: %s" % (`s`, `pattern`))
152
153     def assert_re_search(self, pattern, s):
154         """Assert that a string *contains* a particular pattern
155
156         Inputs:
157           pattern      string: regular expression
158           s            string: to be searched
159
160         Raises:
161           AssertionError if not matched
162           """
163         if not re.search(pattern, s):
164             raise AssertionError("string does not contain regexp\n"
165                                  "    string: %s\n"
166                                  "    re: %s" % (`s`, `pattern`))
167
168
169     def assert_no_file(self, filename):
170         import os.path
171         assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
172
173
174     #############################################################
175     # Methods for running programs
176
177     def runcmd_background(self, cmd):
178         import os
179         self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
180         pid = os.fork()
181         if pid == 0:
182             # child
183             try:
184                 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
185             finally:
186                 os._exit(127)
187         self.test_log = self.test_log + "pid: %d\n" % pid
188         return pid
189
190
191     def runcmd(self, cmd, expectedResult = 0):
192         """Run a command, fail if the command returns an unexpected exit
193         code.  Return the output produced."""
194         rc, output, stderr = self.runcmd_unchecked(cmd)
195         if rc != expectedResult:
196             raise AssertionError("""command returned %d; expected %s: \"%s\"
197 stdout:
198 %s
199 stderr:
200 %s""" % (rc, expectedResult, cmd, output, stderr))
201
202         return output, stderr
203
204
205     def run_captured(self, cmd):
206         """Run a command, capturing stdout and stderr.
207
208         Based in part on popen2.py
209
210         Returns (waitstatus, stdout, stderr)."""
211         import os, types
212         pid = os.fork()
213         if pid == 0:
214             # child
215             try: 
216                 pid = os.getpid()
217                 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
218
219                 outfd = os.open('%d.out' % pid, openmode, 0666)
220                 os.dup2(outfd, 1)
221                 os.close(outfd)
222
223                 errfd = os.open('%d.err' % pid, openmode, 0666)
224                 os.dup2(errfd, 2)
225                 os.close(errfd)
226
227                 if isinstance(cmd, types.StringType):
228                     cmd = ['/bin/sh', '-c', cmd]
229
230                 os.execvp(cmd[0], cmd)
231             finally:
232                 os._exit(127)
233         else:
234             # parent
235             exited_pid, waitstatus = os.waitpid(pid, 0)
236             stdout = open('%d.out' % pid).read()
237             stderr = open('%d.err' % pid).read()
238             return waitstatus, stdout, stderr
239
240
241     def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
242         """Invoke a command; return (exitcode, stdout, stderr)"""
243         import os
244         waitstatus, stdout, stderr = self.run_captured(cmd)
245         assert not os.WIFSIGNALED(waitstatus), \
246                ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
247         rc = os.WEXITSTATUS(waitstatus)
248         self.test_log = self.test_log + ("""Run command: %s
249 Wait status: %#x (exit code %d, signal %d)
250 stdout:
251 %s
252 stderr:
253 %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
254          stdout, stderr))
255         if skip_on_noexec and rc == 127:
256             # Either we could not execute the command or the command
257             # returned exit code 127.  According to system(3) we can't
258             # tell the difference.
259             raise NotRunError, "could not execute %s" % `cmd`
260         return rc, stdout, stderr
261     
262
263     def explain_failure(self, exc_info = None):
264         print "test_log:"
265         print self.test_log
266
267
268     def log(self, msg):
269         """Log a message to the test log.  This message is displayed if
270         the test fails, or when the runtests function is invoked with
271         the verbose option."""
272         self.test_log = self.test_log + msg + "\n"
273
274
275 class NotRunError(Exception):
276     """Raised if a test must be skipped because of missing resources"""
277     def __init__(self, value = None):
278         self.value = value
279
280
281 def _report_error(case, debugger):
282     """Ask the test case to explain failure, and optionally run a debugger
283
284     Input:
285       case         TestCase instance
286       debugger     if true, a debugger function to be applied to the traceback
287 """
288     import sys
289     ex = sys.exc_info()
290     print "-----------------------------------------------------------------"
291     if ex:
292         import traceback
293         traceback.print_exc(file=sys.stdout)
294     case.explain_failure()
295     print "-----------------------------------------------------------------"
296
297     if debugger:
298         tb = ex[2]
299         debugger(tb)
300
301
302 def runtests(test_list, verbose = 0, debugger = None):
303     """Run a series of tests.
304
305     Inputs:
306       test_list    sequence of TestCase classes
307       verbose      print more information as testing proceeds
308       debugger     debugger object to be applied to errors
309
310     Returns:
311       unix return code: 0 for success, 1 for failures, 2 for test failure
312     """
313     import traceback
314     ret = 0
315     for test_class in test_list:
316         print "%-30s" % _test_name(test_class),
317         # flush now so that long running tests are easier to follow
318         sys.stdout.flush()
319
320         obj = None
321         try:
322             try: # run test and show result
323                 obj = test_class()
324                 obj.setup()
325                 obj.runtest()
326                 print "OK"
327             except KeyboardInterrupt:
328                 print "INTERRUPT"
329                 _report_error(obj, debugger)
330                 ret = 2
331                 break
332             except NotRunError, msg:
333                 print "NOTRUN, %s" % msg.value
334             except:
335                 print "FAIL"
336                 _report_error(obj, debugger)
337                 ret = 1
338         finally:
339             while obj and obj._cleanups:
340                 try:
341                     apply(obj._cleanups.pop())
342                 except KeyboardInterrupt:
343                     print "interrupted during teardown"
344                     _report_error(obj, debugger)
345                     ret = 2
346                     break
347                 except:
348                     print "error during teardown"
349                     _report_error(obj, debugger)
350                     ret = 1
351         # Display log file if we're verbose
352         if ret == 0 and verbose:
353             obj.explain_failure()
354             
355     return ret
356
357
358 def _test_name(test_class):
359     """Return a human-readable name for a test class.
360     """
361     try:
362         return test_class.__name__
363     except:
364         return `test_class`
365
366
367 def print_help():
368     """Help for people running tests"""
369     import sys
370     print """%s: software test suite based on ComfyChair
371
372 usage:
373     To run all tests, just run this program.  To run particular tests,
374     list them on the command line.
375
376 options:
377     --help              show usage message
378     --list              list available tests
379     --verbose, -v       show more information while running tests
380     --post-mortem, -p   enter Python debugger on error
381 """ % sys.argv[0]
382
383
384 def print_list(test_list):
385     """Show list of available tests"""
386     for test_class in test_list:
387         print "    %s" % _test_name(test_class)
388
389
390 def main(tests, extra_tests=[]):
391     """Main entry point for test suites based on ComfyChair.
392
393     inputs:
394       tests       Sequence of TestCase subclasses to be run by default.
395       extra_tests Sequence of TestCase subclasses that are available but
396                   not run by default.
397
398 Test suites should contain this boilerplate:
399
400     if __name__ == '__main__':
401         comfychair.main(tests)
402
403 This function handles standard options such as --help and --list, and
404 by default runs all tests in the suggested order.
405
406 Calls sys.exit() on completion.
407 """
408     from sys import argv
409     import getopt, sys
410
411     opt_verbose = 0
412     debugger = None
413
414     opts, args = getopt.getopt(argv[1:], 'pv',
415                                ['help', 'list', 'verbose', 'post-mortem'])
416     for opt, opt_arg in opts:
417         if opt == '--help':
418             print_help()
419             return
420         elif opt == '--list':
421             print_list(tests + extra_tests)
422             return
423         elif opt == '--verbose' or opt == '-v':
424             opt_verbose = 1
425         elif opt == '--post-mortem' or opt == '-p':
426             import pdb
427             debugger = pdb.post_mortem
428
429     if args:
430         all_tests = tests + extra_tests
431         by_name = {}
432         for t in all_tests:
433             by_name[_test_name(t)] = t
434         which_tests = []
435         for name in args:
436             which_tests.append(by_name[name])
437     else:
438         which_tests = tests
439
440     sys.exit(runtests(which_tests, verbose=opt_verbose,
441                       debugger=debugger))
442
443
444 if __name__ == '__main__':
445     print __doc__