r23798: updated old Temple Place FSF addresses to new URL
[samba.git] / source3 / stf / comfychair.py
1 #! /usr/bin/env python
2
3 # Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
4 # Copyright (C) 2003 by Tim Potter <tpot@samba.org>
5
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License as
8 # published by the Free Software Foundation; either version 3 of the
9 # License, or (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, see <http://www.gnu.org/licenses/>.
18
19 """comfychair: a Python-based instrument of software torture.
20
21 Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
22 Copyright (C) 2003 by Tim Potter <tpot@samba.org>
23
24 This is a test framework designed for testing programs written in
25 Python, or (through a fork/exec interface) any other language.
26
27 For more information, see the file README.comfychair.
28
29 To run a test suite based on ComfyChair, just run it as a program.
30 """
31
32 import sys, re
33
34
35 class TestCase:
36     """A base class for tests.  This class defines required functions which
37     can optionally be overridden by subclasses.  It also provides some
38     utility functions for"""
39
40     def __init__(self):
41         self.test_log = ""
42         self.background_pids = []
43         self._cleanups = []
44         self._enter_rundir()
45         self._save_environment()
46         self.add_cleanup(self.teardown)
47
48
49     # --------------------------------------------------
50     # Save and restore directory
51     def _enter_rundir(self):
52         import os
53         self.basedir = os.getcwd()
54         self.add_cleanup(self._restore_directory)
55         self.rundir = os.path.join(self.basedir,
56                                    'testtmp', 
57                                    self.__class__.__name__)
58         self.tmpdir = os.path.join(self.rundir, 'tmp')
59         os.system("rm -fr %s" % self.rundir)
60         os.makedirs(self.tmpdir)
61         os.system("mkdir -p %s" % self.rundir)
62         os.chdir(self.rundir)
63
64     def _restore_directory(self):
65         import os
66         os.chdir(self.basedir)
67
68     # --------------------------------------------------
69     # Save and restore environment
70     def _save_environment(self):
71         import os
72         self._saved_environ = os.environ.copy()
73         self.add_cleanup(self._restore_environment)
74
75     def _restore_environment(self):
76         import os
77         os.environ.clear()
78         os.environ.update(self._saved_environ)
79
80     
81     def setup(self):
82         """Set up test fixture."""
83         pass
84
85     def teardown(self):
86         """Tear down test fixture."""
87         pass
88
89     def runtest(self):
90         """Run the test."""
91         pass
92
93
94     def add_cleanup(self, c):
95         """Queue a cleanup to be run when the test is complete."""
96         self._cleanups.append(c)
97         
98
99     def fail(self, reason = ""):
100         """Say the test failed."""
101         raise AssertionError(reason)
102
103
104     #############################################################
105     # Requisition methods
106
107     def require(self, predicate, message):
108         """Check a predicate for running this test.
109
110 If the predicate value is not true, the test is skipped with a message explaining
111 why."""
112         if not predicate:
113             raise NotRunError, message
114
115     def require_root(self):
116         """Skip this test unless run by root."""
117         import os
118         self.require(os.getuid() == 0,
119                      "must be root to run this test")
120
121     #############################################################
122     # Assertion methods
123
124     def assert_(self, expr, reason = ""):
125         if not expr:
126             raise AssertionError(reason)
127
128     def assert_equal(self, a, b):
129         if not a == b:
130             raise AssertionError("assertEquals failed: %s" % `(a, b)`)
131             
132     def assert_notequal(self, a, b):
133         if a == b:
134             raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
135
136     def assert_re_match(self, pattern, s):
137         """Assert that a string matches a particular pattern
138
139         Inputs:
140           pattern      string: regular expression
141           s            string: to be matched
142
143         Raises:
144           AssertionError if not matched
145           """
146         if not re.match(pattern, s):
147             raise AssertionError("string does not match regexp\n"
148                                  "    string: %s\n"
149                                  "    re: %s" % (`s`, `pattern`))
150
151     def assert_re_search(self, pattern, s):
152         """Assert that a string *contains* a particular pattern
153
154         Inputs:
155           pattern      string: regular expression
156           s            string: to be searched
157
158         Raises:
159           AssertionError if not matched
160           """
161         if not re.search(pattern, s):
162             raise AssertionError("string does not contain regexp\n"
163                                  "    string: %s\n"
164                                  "    re: %s" % (`s`, `pattern`))
165
166
167     def assert_no_file(self, filename):
168         import os.path
169         assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
170
171
172     #############################################################
173     # Methods for running programs
174
175     def runcmd_background(self, cmd):
176         import os
177         self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
178         pid = os.fork()
179         if pid == 0:
180             # child
181             try:
182                 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
183             finally:
184                 os._exit(127)
185         self.test_log = self.test_log + "pid: %d\n" % pid
186         return pid
187
188
189     def runcmd(self, cmd, expectedResult = 0):
190         """Run a command, fail if the command returns an unexpected exit
191         code.  Return the output produced."""
192         rc, output, stderr = self.runcmd_unchecked(cmd)
193         if rc != expectedResult:
194             raise AssertionError("""command returned %d; expected %s: \"%s\"
195 stdout:
196 %s
197 stderr:
198 %s""" % (rc, expectedResult, cmd, output, stderr))
199
200         return output, stderr
201
202
203     def run_captured(self, cmd):
204         """Run a command, capturing stdout and stderr.
205
206         Based in part on popen2.py
207
208         Returns (waitstatus, stdout, stderr)."""
209         import os, types
210         pid = os.fork()
211         if pid == 0:
212             # child
213             try: 
214                 pid = os.getpid()
215                 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
216
217                 outfd = os.open('%d.out' % pid, openmode, 0666)
218                 os.dup2(outfd, 1)
219                 os.close(outfd)
220
221                 errfd = os.open('%d.err' % pid, openmode, 0666)
222                 os.dup2(errfd, 2)
223                 os.close(errfd)
224
225                 if isinstance(cmd, types.StringType):
226                     cmd = ['/bin/sh', '-c', cmd]
227
228                 os.execvp(cmd[0], cmd)
229             finally:
230                 os._exit(127)
231         else:
232             # parent
233             exited_pid, waitstatus = os.waitpid(pid, 0)
234             stdout = open('%d.out' % pid).read()
235             stderr = open('%d.err' % pid).read()
236             return waitstatus, stdout, stderr
237
238
239     def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
240         """Invoke a command; return (exitcode, stdout, stderr)"""
241         import os
242         waitstatus, stdout, stderr = self.run_captured(cmd)
243         assert not os.WIFSIGNALED(waitstatus), \
244                ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
245         rc = os.WEXITSTATUS(waitstatus)
246         self.test_log = self.test_log + ("""Run command: %s
247 Wait status: %#x (exit code %d, signal %d)
248 stdout:
249 %s
250 stderr:
251 %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
252          stdout, stderr))
253         if skip_on_noexec and rc == 127:
254             # Either we could not execute the command or the command
255             # returned exit code 127.  According to system(3) we can't
256             # tell the difference.
257             raise NotRunError, "could not execute %s" % `cmd`
258         return rc, stdout, stderr
259     
260
261     def explain_failure(self, exc_info = None):
262         print "test_log:"
263         print self.test_log
264
265
266     def log(self, msg):
267         """Log a message to the test log.  This message is displayed if
268         the test fails, or when the runtests function is invoked with
269         the verbose option."""
270         self.test_log = self.test_log + msg + "\n"
271
272
273 class NotRunError(Exception):
274     """Raised if a test must be skipped because of missing resources"""
275     def __init__(self, value = None):
276         self.value = value
277
278
279 def _report_error(case, debugger):
280     """Ask the test case to explain failure, and optionally run a debugger
281
282     Input:
283       case         TestCase instance
284       debugger     if true, a debugger function to be applied to the traceback
285 """
286     import sys
287     ex = sys.exc_info()
288     print "-----------------------------------------------------------------"
289     if ex:
290         import traceback
291         traceback.print_exc(file=sys.stdout)
292     case.explain_failure()
293     print "-----------------------------------------------------------------"
294
295     if debugger:
296         tb = ex[2]
297         debugger(tb)
298
299
300 def runtests(test_list, verbose = 0, debugger = None):
301     """Run a series of tests.
302
303     Inputs:
304       test_list    sequence of TestCase classes
305       verbose      print more information as testing proceeds
306       debugger     debugger object to be applied to errors
307
308     Returns:
309       unix return code: 0 for success, 1 for failures, 2 for test failure
310     """
311     import traceback
312     ret = 0
313     for test_class in test_list:
314         print "%-30s" % _test_name(test_class),
315         # flush now so that long running tests are easier to follow
316         sys.stdout.flush()
317
318         obj = None
319         try:
320             try: # run test and show result
321                 obj = test_class()
322                 obj.setup()
323                 obj.runtest()
324                 print "OK"
325             except KeyboardInterrupt:
326                 print "INTERRUPT"
327                 _report_error(obj, debugger)
328                 ret = 2
329                 break
330             except NotRunError, msg:
331                 print "NOTRUN, %s" % msg.value
332             except:
333                 print "FAIL"
334                 _report_error(obj, debugger)
335                 ret = 1
336         finally:
337             while obj and obj._cleanups:
338                 try:
339                     apply(obj._cleanups.pop())
340                 except KeyboardInterrupt:
341                     print "interrupted during teardown"
342                     _report_error(obj, debugger)
343                     ret = 2
344                     break
345                 except:
346                     print "error during teardown"
347                     _report_error(obj, debugger)
348                     ret = 1
349         # Display log file if we're verbose
350         if ret == 0 and verbose:
351             obj.explain_failure()
352             
353     return ret
354
355
356 def _test_name(test_class):
357     """Return a human-readable name for a test class.
358     """
359     try:
360         return test_class.__name__
361     except:
362         return `test_class`
363
364
365 def print_help():
366     """Help for people running tests"""
367     import sys
368     print """%s: software test suite based on ComfyChair
369
370 usage:
371     To run all tests, just run this program.  To run particular tests,
372     list them on the command line.
373
374 options:
375     --help              show usage message
376     --list              list available tests
377     --verbose, -v       show more information while running tests
378     --post-mortem, -p   enter Python debugger on error
379 """ % sys.argv[0]
380
381
382 def print_list(test_list):
383     """Show list of available tests"""
384     for test_class in test_list:
385         print "    %s" % _test_name(test_class)
386
387
388 def main(tests, extra_tests=[]):
389     """Main entry point for test suites based on ComfyChair.
390
391     inputs:
392       tests       Sequence of TestCase subclasses to be run by default.
393       extra_tests Sequence of TestCase subclasses that are available but
394                   not run by default.
395
396 Test suites should contain this boilerplate:
397
398     if __name__ == '__main__':
399         comfychair.main(tests)
400
401 This function handles standard options such as --help and --list, and
402 by default runs all tests in the suggested order.
403
404 Calls sys.exit() on completion.
405 """
406     from sys import argv
407     import getopt, sys
408
409     opt_verbose = 0
410     debugger = None
411
412     opts, args = getopt.getopt(argv[1:], 'pv',
413                                ['help', 'list', 'verbose', 'post-mortem'])
414     for opt, opt_arg in opts:
415         if opt == '--help':
416             print_help()
417             return
418         elif opt == '--list':
419             print_list(tests + extra_tests)
420             return
421         elif opt == '--verbose' or opt == '-v':
422             opt_verbose = 1
423         elif opt == '--post-mortem' or opt == '-p':
424             import pdb
425             debugger = pdb.post_mortem
426
427     if args:
428         all_tests = tests + extra_tests
429         by_name = {}
430         for t in all_tests:
431             by_name[_test_name(t)] = t
432         which_tests = []
433         for name in args:
434             which_tests.append(by_name[name])
435     else:
436         which_tests = tests
437
438     sys.exit(runtests(which_tests, verbose=opt_verbose,
439                       debugger=debugger))
440
441
442 if __name__ == '__main__':
443     print __doc__