-#!/usr/bin/python
+#!/usr/bin/python -u
# Bootstrap Samba and run a number of tests against it.
# Copyright (C) 2005-2012 Jelmer Vernooij <jelmer@samba.org>
# Copyright (C) 2007-2009 Stefan Metzmacher <metze@samba.org>
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import atexit
+from cStringIO import StringIO
import os
import sys
import signal
+import subprocess
+import subunit
+import traceback
import warnings
import optparse
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+
+from selftest import (
+ socket_wrapper,
+ subunithelper,
+ testlist,
+ )
+from selftest.client import write_clientconf
+from selftest.run import (
+ expand_command_list,
+ expand_command_run,
+ exported_envvars_str,
+ now,
+ run_testsuite_command,
+ )
+from selftest.target import (
+ EnvironmentManager,
+ NoneTarget,
+ UnsupportedEnvironment,
+ )
+
includes = ()
excludes = ()
def read_excludes(fn):
- excludes.extend(read_test_regexes(fn))
+ excludes.extend(testlist.read_test_regexes(fn))
def read_includes(fn):
- includes.extend(read_test_regexes(fn))
+ includes.extend(testlist.read_test_regexes(fn))
parser = optparse.OptionParser("TEST-REGEXES")
-parser.add_option("--target", type="choice", choices=["samba", "samba3"], default="samba", help="Samba version to target")
-parser.add_option("--quick", help="run quick overall test")
-parser.add_option("--verbose", help="be verbose")
-parser.add_option("--list", help="list available tests")
-parser.add_option("--socket-wrapper", help="enable socket wrapper")
+parser.add_option("--target", type="choice", choices=["samba", "samba3", "none"], default="samba", help="Samba version to target")
+parser.add_option("--quick", help="run quick overall test", action="store_true", default=False)
+parser.add_option("--list", help="list available tests", action="store_true", default=False)
+parser.add_option("--socket-wrapper", help="enable socket wrapper", action="store_true", default=False)
parser.add_option("--socket-wrapper-pcap", help="save traffic to pcap directories", type="str")
-parser.add_option("--socket-wrapper-keep-pcap", help="keep all pcap files, not just those for tests that failed")
-parser.add_option("--one", help="abort when the first test fails")
+parser.add_option("--socket-wrapper-keep-pcap", help="keep all pcap files, not just those for tests that failed", action="store_true", default=False)
+parser.add_option("--one", help="abort when the first test fails", action="store_true", default=False)
parser.add_option("--exclude", action="callback", help="Add file to exclude files", callback=read_excludes)
parser.add_option("--include", action="callback", help="Add file to include files", callback=read_includes)
-parser.add_option("--testenv", help="run a shell in the requested test environment")
-parser.add_option("--resetup-environment", help="Re-setup environment")
+parser.add_option("--testenv", help="run a shell in the requested test environment", action="store_true", default=False)
+parser.add_option("--resetup-environment", help="Re-setup environment", action="store_true", default=False)
parser.add_option("--binary-mapping", help="Map binaries to use", type=str)
parser.add_option("--load-list", help="Load list of tests to load from a file", type=str)
parser.add_option("--prefix", help="prefix to run tests in", type=str, default="./st")
parser.add_option("--srcdir", type=str, default=".", help="source directory")
parser.add_option("--bindir", type=str, default="./bin", help="binaries directory")
parser.add_option("--testlist", type=str, action="append", help="file to read available tests from")
-parser.add_option("--ldap", help="back samba onto specified ldap server", choices=["openldap", "fedora-ds"], type=str)
+parser.add_option("--ldap", help="back samba onto specified ldap server", choices=["openldap", "fedora-ds"], type="choice")
opts, args = parser.parse_args()
-def pipe_handler(sig):
- sys.stderr.write("Exiting early because of SIGPIPE.\n")
+subunit_ops = subunithelper.SubunitOps(sys.stdout)
+
+def handle_signal(sig, frame):
+ sys.stderr.write("Exiting early because of signal %s.\n" % sig)
sys.exit(1)
-signal.signal(signal.SIGPIPE, pipe_handler)
+for sig in (signal.SIGINT, signal.SIGQUIT, signal.SIGTERm, signal.SIGPIPE):
+ signal.signal(sig, handle_signal)
def skip(name):
- return find_in_list(excludes, name)
+ return testlist.find_in_list(excludes, name)
def setup_pcap(name):
if (not opts.socket_wrapper_pcap or
- not os.environ.get("SOCKET_WRAPPER_PCAP_DIR"):
+ not os.environ.get("SOCKET_WRAPPER_PCAP_DIR")):
return
- fname = name
- fname =~ s%[^abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\-]%_%g;
+ fname = "".join([x for x in name if x.isalnum() or x == '-'])
- pcap_file = os.path.join(os.environ["SOCKET_WRAPPER_PCAP_DIR"], "%s.pcap" %
- fname)
+ pcap_file = os.path.join(
+ os.environ["SOCKET_WRAPPER_PCAP_DIR"], "%s.pcap" % fname)
socket_wrapper.setup_pcap(pcap_file)
return pcap_file
os.unlink(pcap_file)
-# expand strings from %ENV
-def expand_environment_strings(s):
- # we use a reverse sort so we do the longer ones first
- foreach my $k (sort { $b cmp $a } keys %ENV) {
- $s =~ s/\$$k/$ENV{$k}/g;
- }
- return $s;
-
+def run_testsuite(name, cmd, subunit_ops, env=None):
+ """Run a single testsuite.
-def run_testsuite(envname, name, cmd, i, totalsuites):
+ :param env: Environment to run in
+ :param name: Name of the testsuite
+ :param cmd: Name of the (fully expanded) command to run
+ :return: exitcode of the command
+ """
pcap_file = setup_pcap(name)
- Subunit::start_testsuite(name);
- Subunit::progress_push();
- Subunit::report_time(time());
- os.system(cmd)
- Subunit::report_time(time());
- Subunit::progress_pop();
-
- if ($? == -1) {
- Subunit::progress_pop();
- Subunit::end_testsuite(name, "error", "Unable to run $cmd: $!");
- exit(1);
- } elsif ($? & 127) {
- Subunit::end_testsuite(name, "error",
- sprintf("%s died with signal %d, %s coredump\n", $cmd, ($? & 127), ($? & 128) ? 'with' : 'without'));
- exit(1);
- }
-
- my $exitcode = $? >> 8;
-
- envlog = env_manager.getlog_env(envname)
- if envlog != "":
- sys.stdout.write("envlog: %s\n" % envlog)
-
- sys.stdout.write("command: %s\n" % cmd)
- sys.stdout.write("expanded command: %s\n" % expand_environment_strings(cmd))
-
- if exitcode == 0:
- Subunit::end_testsuite(name, "success")
- else:
- Subunit::end_testsuite(name, "failure", "Exit code was %d" % exitcode)
+ exitcode = run_testsuite_command(name, cmd, subunit_ops, env)
+ if exitcode is None:
+ sys.exit(1)
cleanup_pcap(pcap_file, exitcode)
return exitcode
+
if opts.list and opts.testenv:
sys.stderr.write("--list and --testenv are mutually exclusive\n")
sys.exit(1)
-# we want unbuffered output
-$| = 1;
-
tests = args
-# quick hack to disable rpc validation when using valgrind - its way too slow
+# quick hack to disable rpc validation when using valgrind - it is way too slow
if not os.environ.get("VALGRIND"):
- os.environ"VALIDATE"] = "validate"
- os.environ["MALLOC_CHECK_"] = 2
+ os.environ["VALIDATE"] = "validate"
+ os.environ["MALLOC_CHECK_"] = "2"
# make all our python scripts unbuffered
os.environ["PYTHONUNBUFFERED"] = "1"
-bindir_abs = os.path.abspath(bindir)
+bindir_abs = os.path.abspath(opts.bindir)
# Backwards compatibility:
if os.environ.get("TEST_LDAP") == "yes":
ldap = "openldap"
torture_maxtime = int(os.getenv("TORTURE_MAXTIME", "1200"))
-if ldap:
+if opts.ldap:
# LDAP is slow
torture_maxtime *= 2
-$prefix =~ s+//+/+;
-$prefix =~ s+/./+/+;
-$prefix =~ s+/$++;
-
-if prefix == "":
- raise Exception("using an empty prefix isn't allowed")
+prefix = os.path.normpath(opts.prefix)
# Ensure we have the test prefix around.
#
-# We need restrictive
-# permissions on this as some subdirectories in this tree will have
-# wider permissions (ie 0777) and this would allow other users on the
+# We need restrictive permissions on this as some subdirectories in this tree
+# will have wider permissions (ie 0777) and this would allow other users on the
# host to subvert the test process.
if not os.path.isdir(prefix):
os.mkdir(prefix, 0700)
os.chmod(prefix, 0700)
prefix_abs = os.path.abspath(prefix)
-tmpdir_abs = os.path.abspath(os.path.join(prefix, "tmp"))
+tmpdir_abs = os.path.abspath(os.path.join(prefix_abs, "tmp"))
if not os.path.isdir(tmpdir_abs):
os.mkdir(tmpdir_abs, 0777)
-srcdir_abs = os.path.abspath(srcdir)
+srcdir_abs = os.path.abspath(opts.srcdir)
-if prefix_abs == "":
- raise Exception("using an empty absolute prefix isn't allowed")
if prefix_abs == "/":
- raise Exception("using '/' as absolute prefix isn't allowed")
+ raise Exception("using '/' as absolute prefix is a bad idea")
os.environ["PREFIX"] = prefix
os.environ["KRB5CCNAME"] = os.path.join(prefix, "krb5ticket")
os.environ["PREFIX_ABS"] = prefix_abs
-os.environ["SRCDIR"] = srcdir
+os.environ["SRCDIR"] = opts.srcdir
os.environ["SRCDIR_ABS"] = srcdir_abs
os.environ["BINDIR"] = bindir_abs
if name in os.environ:
os.environ[name] = "%s:%s" % (newpath, os.environ[name])
else:
- .environ[name] = newpath
+ os.environ[name] = newpath
prefix_pathvar("PKG_CONFIG_PATH", os.path.join(bindir_abs, "pkgconfig"))
prefix_pathvar("PYTHONPATH", os.path.join(bindir_abs, "python"))
socket_wrapper_dir = socket_wrapper.setup_dir(os.path.join(prefix_abs, "w"), opts.socket_wrapper_pcap)
sys.stdout.write("SOCKET_WRAPPER_DIR=%s\n" % socket_wrapper_dir)
elif not opts.list:
- if sys.getuid() != 0:
+ if os.getuid() != 0:
warnings.warn("not using socket wrapper, but also not running as root. Will not be able to listen on proper ports")
testenv_default = "none"
if opts.binary_mapping:
binary_mapping = dict([l.split(":") for l in opts.binary_mapping.split(",")])
+ os.environ["BINARY_MAPPING"] = opts.binary_mapping
else:
binary_mapping = {}
-
-os.environ["BINARY_MAPPING"] = opts.binary_mapping
+ os.environ["BINARY_MAPPING"] = ""
# After this many seconds, the server will self-terminate. All tests
# must terminate in this time, and testenv will only stay alive this
# long
-server_maxtime = 7500
if os.environ.get("SMBD_MAXTIME", ""):
server_maxtime = int(os.environ["SMBD_MAXTIME"])
+else:
+ server_maxtime = 7500
+
+
+def has_socket_wrapper(bindir):
+ """Check if Samba has been built with socket wrapper support.
+ """
+ f = StringIO()
+ subprocess.check_call([os.path.join(bindir, "smbd"), "-b"], stdout=f)
+ for l in f.readlines():
+ if "SOCKET_WRAPPER" in l:
+ return True
+ return False
+
if not opts.list:
if opts.target == "samba":
- if opts.socket_wrapper and `$bindir/smbd -b | grep SOCKET_WRAPPER` eq "":
- die("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....")
+ if opts.socket_wrapper and not has_socket_wrapper(opts.bindir):
+ sys.stderr.write("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....\n")
+ sys.exit(1)
testenv_default = "dc"
- require target::Samba
- target = new Samba($bindir, binary_mapping, $ldap, $srcdir, $server_maxtime)
+ from selftest.target.samba import Samba
+ target = Samba(opts.bindir, binary_mapping, ldap, opts.srcdir, server_maxtime)
elif opts.target == "samba3":
- if opts.socket_wrapper and `$bindir/smbd -b | grep SOCKET_WRAPPER` eq "":
- die("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....")
+ if opts.socket_wrapper and not has_socket_wrapper(opts.bindir):
+ sys.stderr.write("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....\n")
+ sys.exit(1)
testenv_default = "member"
- require target::Samba3
- $target = new Samba3($bindir, binary_mapping, $srcdir_abs, $server_maxtime)
+ from selftest.target.samba3 import Samba3
+ target = Samba3(opts.bindir, binary_mapping, srcdir_abs, server_maxtime)
+ elif opts.target == "none":
+ testenv_default = "none"
+ target = NoneTarget()
+
+ env_manager = EnvironmentManager(target)
+ atexit.register(env_manager.teardown_all)
interfaces = ",".join([
"127.0.0.11/8",
conffile = os.path.join(clientdir, "client.conf")
os.environ["SMB_CONF_PATH"] = conffile
-def write_clientconf(conffile, clientdir, vars):
- if not os.path.isdir(clientdir):
- os.mkdir(clientdir, 0777)
-
- for n in ["private", "lockdir", "statedir", "cachedir"]:
- p = os.path.join(clientdir, n)
- shutil.rmtree(p)
- os.mkdir(p, 0777)
-
- # this is ugly, but the ncalrpcdir needs exactly 0755
- # otherwise tests fail.
- mask = os.umask(0022)
-
- for n in ["ncalrpcdir", "ncalrpcdir/np"]:
- p = os.path.join(clientdir, n)
- shutil.rmtree(p)
- os.mkdir(p, 0777)
- os.umask(mask)
-
- settings = {
- "netbios name": "client",
- "private dir": os.path.join(clientdir, "private"),
- "lock dir": os.path.join(clientdir, "lockdir"),
- "state directory": os.path.join(clientdir, "statedir"),
- "cache directory": os.path.join(clientdir, "cachedir"),
- "ncalrpc dir": os.path.join(clientdir, "ncalrpcdir"),
- "name resolve order": "file bcast",
- "panic action": os.path.join(RealBin, "gdb_backtrace \%d"),
- "max xmit": "32K",
- "notify:inotify": "false",
- "ldb:nosync": "true",
- "system:anonymous": "true",
- "client lanman auth": "Yes",
- "log level": "1",
- "torture:basedir": clientdir,
- #We don't want to pass our self-tests if the PAC code is wrong
- "gensec:require_pac": "true",
- "resolv:host file": os.path.join(prefix_abs, "dns_host_file"),
- #We don't want to run 'speed' tests for very long
- "torture:timelimit": "1",
- }
-
- if "DOMAIN" in vars:
- settings["workgroup"] = vars["DOMAIN"]
- if "REALM" in vars:
- settings["realm"] = vars["REALM"]
- if opts.socket_wrapper:
- settings["interfaces"] = interfaces
-
- f = open(conffile, 'w')
- try:
- f.write("[global]\n")
- for item in settings.iteritems():
- f.write("\t%s = %s\n" % item)
- finally:
- f.close()
-
todo = []
-if testlists == []:
+if not opts.testlist:
sys.stderr.write("No testlists specified\n")
sys.exit(1)
os.environ["SELFTEST_INTERFACES"] = interfaces
else:
os.environ["SELFTEST_INTERFACES"] = ""
-if opts.verbose:
- os.environ["SELFTEST_VERBOSE"] = "1"
-else:
- os.environ["SELFTEST_VERBOSE"] = ""
if opts.quick:
os.environ["SELFTEST_QUICK"] = "1"
else:
os.environ["SELFTEST_QUICK"] = ""
os.environ["SELFTEST_MAXTIME"] = str(torture_maxtime)
+
available = []
-for fn in testlists:
- for testsuite in read_testlist(fn):
- if not should_run_test(tests, testsuite):
+for fn in opts.testlist:
+ for testsuite in testlist.read_testlist_file(fn):
+ if not testlist.should_run_test(tests, testsuite):
continue
name = testsuite[0]
- if includes is not None and find_in_list(includes, name) is not None:
+ if (includes is not None and
+ testlist.find_in_list(includes, name) is not None):
continue
available.append(testsuite)
if opts.load_list:
- individual_tests = {}
- restricted = []
- f = open(opts.load_list, 'r')
- try:
- restricted_mgr = RestrictedTestManager(read_restricted_test_list(f))
- finally:
- f.close()
+ restricted_mgr = testlist.RestrictedTestManager.from_path(opts.load_list)
else:
restricted_mgr = None
- individual_tests = None
-
for testsuite in available:
name = testsuite[0]
match = None
if skipreason is not None:
if not opts.list:
- Subunit::skip_testsuite(name, skipreason)
+ subunit_ops.skip_testsuite(name, skipreason)
else:
todo.append(testsuite + (match,))
suitestotal = len(todo)
if not opts.list:
- Subunit::progress($suitestotal)
- Subunit::report_time(time())
-
-i = 0
-$| = 1;
+ subunit_ops.progress(suitestotal, subunit.PROGRESS_SET)
+ subunit_ops.time(now())
exported_envvars = [
# domain stuff
"VAMPIRE_DC_NETBIOSNAME",
"VAMPIRE_DC_NETBIOSALIAS",
+ # domain controller stuff for Vampired DC
+ "PROMOTED_DC_SERVER",
+ "PROMOTED_DC_SERVER_IP",
+ "PROMOTED_DC_NETBIOSNAME",
+ "PROMOTED_DC_NETBIOSALIAS",
+
# server stuff
"SERVER",
"SERVER_IP",
"LOCAL_PATH"
]
-def handle_sigdie(signame):
- env_manager.teardown_all()
- sys.stderr.write("Received signal %s" % signame)
- sys.exit(1)
-
-signal.signal(signal.SIGINT, handle_sigdie)
-signal.signal(signal.SIGQUIT, handle_sigdie)
-signal.signal(signal.SIGTERM, handle_sigdie)
-
-def exported_envvars_str(testenv_vars):
- out = ""
-
- for n in exported_envvars:
- if not n in testenv_vars:
- continue
- out += "%s=%s\n" % (n, testenv_vars[n])
-
- return out
-
-
def switch_env(name, prefix):
if ":" in name:
(envname, option) = name.split(":", 1)
for name in exported_envvars:
if name in testenv_vars:
os.environ[name] = testenv_vars[name]
- else:
+ elif name in os.environ:
del os.environ[name]
- return testenv_vars
+ return env
# This 'global' file needs to be empty when we start
-os.unlink(os.path.join(prefix_abs, "dns_host_file"))
+dns_host_file_path = os.path.join(prefix_abs, "dns_host_file")
+if os.path.exists(dns_host_file_path):
+ os.unlink(dns_host_file_path)
if opts.testenv:
testenv_name = os.environ.get("SELFTEST_TESTENV", testenv_default)
- testenv_vars = switch_env(testenv_name, prefix)
+ env = switch_env(testenv_name, prefix)
+ testenv_vars = env.get_vars()
os.environ["PIDDIR"] = testenv_vars["PIDDIR"]
os.environ["ENVNAME"] = testenv_name
- envvarstr = exported_envvars_str(testenv_vars)
+ envvarstr = exported_envvars_str(testenv_vars, exported_envvars)
term = os.environ.get("TERMINAL", "xterm -e")
- os.system("$term 'echo -e \"
-Welcome to the Samba4 Test environment '$testenv_name'
+ cmd = """'echo -e "
+Welcome to the Samba4 Test environment '%(testenv_name)'
This matches the client environment used in make test
server is pid `cat \$PIDDIR/samba.pid`
SMB_CONF_PATH=\$SMB_CONF_PATH
$envvarstr
-\" && LD_LIBRARY_PATH=$ENV{LD_LIBRARY_PATH} bash'");
+\" && LD_LIBRARY_PATH=%(LD_LIBRARY_PATH)s $(SHELL)'""" % {
+ "testenv_name": testenv_name,
+ "LD_LIBRARY_PATH": os.environ["LD_LIBRARY_PATH"]}
+ subprocess.call(term + ' ' + cmd, shell=True)
env_manager.teardown_env(testenv_name)
elif opts.list:
for (name, envname, cmd, supports_loadfile, supports_idlist, subtests) in todo:
- if not "$LISTOPT" in cmd:
+ cmd = expand_command_list(cmd)
+ if cmd is None:
warnings.warn("Unable to list tests in %s" % name)
continue
- cmd = cmd.replace("$LISTOPT", "--list")
-
- os.system(cmd)
-
- if ($? == -1) {
- die("Unable to run $cmd: $!");
- } elsif ($? & 127) {
- die(snprintf("%s died with signal %d, %s coredump\n", $cmd, ($? & 127), ($? & 128) ? 'with' : 'without'));
- }
+ exitcode = subprocess.call(cmd, shell=True)
- my $exitcode = $? >> 8;
if exitcode != 0:
sys.stderr.write("%s exited with exit code %s\n" % (cmd, exitcode))
sys.exit(1)
else:
for (name, envname, cmd, supports_loadfile, supports_idlist, subtests) in todo:
try:
- envvars = switch_env(envname, prefix)
- except Exception:
- Subunit::start_testsuite(name);
- Subunit::end_testsuite(name, "error",
- "unable to set up environment %s" % envname);
+ env = switch_env(envname, prefix)
+ except UnsupportedEnvironment:
+ subunit_ops.start_testsuite(name)
+ subunit_ops.end_testsuite(name, "skip",
+ "environment %s is unknown in this test backend - skipping" % envname)
continue
- if envvars is None:
- Subunit::start_testsuite(name);
- Subunit::end_testsuite(name, "skip",
- "environment is unknown in this test backend - skipping" % envname)
+ except Exception, e:
+ subunit_ops.start_testsuite(name)
+ traceback.print_exc()
+ subunit_ops.end_testsuite(name, "error",
+ "unable to set up environment %s: %s" % (envname, e))
continue
- # Generate a file with the individual tests to run, if the
- # test runner for this test suite supports it.
- if subtests is not None:
- if supports_loadfile:
- (fd, listid_file) = tempfile.mkstemp()
- # FIXME: Remove tempfile afterwards
- f = os.fdopen(fd)
- try:
- for test in subtests:
- f.write(test+"\n")
- finally:
- f.close()
- cmd = cmd.replace("$LOADLIST", "--load-list=%s" % listid_file)
- elif supports_idlist:
- cmd += " ".join(subtests)
-
- run_testsuite(envname, name, cmd, i, suitestotal)
-
- if opts.resetup_env:
+ cmd, tmpf = expand_command_run(cmd, supports_loadfile, supports_idlist,
+ subtests)
+
+ run_testsuite(name, cmd, subunit_ops, env=env)
+
+ if tmpf is not None:
+ os.remove(tmpf)
+
+ if opts.resetup_environment:
env_manager.teardown_env(envname)
+ env_manager.teardown_all()
sys.stdout.write("\n")
-env_manager.teardown_all()
-
# if there were any valgrind failures, show them
for fn in os.listdir(prefix):
if fn.startswith("valgrind.log"):