import atexit
from cStringIO import StringIO
-import datetime
-import iso8601
import os
import sys
import signal
-import shutil
import subprocess
import subunit
import traceback
subunithelper,
testlist,
)
+from selftest.client import write_clientconf
from selftest.run import (
- expand_environment_strings,
expand_command_list,
expand_command_run,
+ exported_envvars_str,
+ now,
+ run_testsuite_command,
)
from selftest.target import (
EnvironmentManager,
includes = ()
excludes = ()
-def now():
- return datetime.datetime.utcnow().replace(tzinfo=iso8601.iso8601.Utc())
-
def read_excludes(fn):
excludes.extend(testlist.read_test_regexes(fn))
os.unlink(pcap_file)
-def run_testsuite(envname, name, cmd):
+def run_testsuite(name, cmd, subunit_ops, env=None):
"""Run a single testsuite.
- :param envname: Name of the environment to ru nin
+ :param env: Environment to run in
:param name: Name of the testsuite
:param cmd: Name of the (fully expanded) command to run
:return: exitcode of the command
"""
pcap_file = setup_pcap(name)
- subunit_ops.start_testsuite(name)
- subunit_ops.progress(None, subunit.PROGRESS_PUSH)
- subunit_ops.time(now())
- try:
- exitcode = subprocess.call(cmd, shell=True)
- except Exception, e:
- subunit_ops.time(now())
- subunit_ops.progress(None, subunit.PROGRESS_POP)
- subunit_ops.end_testsuite(name, "error", "Unable to run %r: %s" % (cmd, e))
+ exitcode = run_testsuite_command(name, cmd, subunit_ops, env)
+ if exitcode is None:
sys.exit(1)
- subunit_ops.time(now())
- subunit_ops.progress(None, subunit.PROGRESS_POP)
-
- envlog = env_manager.getlog_env(envname)
- if envlog != "":
- sys.stdout.write("envlog: %s\n" % envlog)
-
- sys.stdout.write("command: %s\n" % cmd)
- sys.stdout.write("expanded command: %s\n" % expand_environment_strings(cmd, os.environ))
-
- if exitcode == 0:
- subunit_ops.end_testsuite(name, "success")
- else:
- subunit_ops.end_testsuite(name, "failure", "Exit code was %d" % exitcode)
-
cleanup_pcap(pcap_file, exitcode)
if not opts.socket_wrapper_keep_pcap and pcap_file is not None:
return exitcode
+
if opts.list and opts.testenv:
sys.stderr.write("--list and --testenv are mutually exclusive\n")
sys.exit(1)
prefix = os.path.normpath(opts.prefix)
-if prefix == "":
- raise Exception("using an empty prefix isn't allowed")
-
# Ensure we have the test prefix around.
#
# We need restrictive permissions on this as some subdirectories in this tree
srcdir_abs = os.path.abspath(opts.srcdir)
-if prefix_abs == "":
- raise Exception("using an empty absolute prefix isn't allowed")
if prefix_abs == "/":
- raise Exception("using '/' as absolute prefix isn't allowed")
+ raise Exception("using '/' as absolute prefix is a bad idea")
os.environ["PREFIX"] = prefix
os.environ["KRB5CCNAME"] = os.path.join(prefix, "krb5ticket")
# must terminate in this time, and testenv will only stay alive this
# long
-server_maxtime = 7500
if os.environ.get("SMBD_MAXTIME", ""):
server_maxtime = int(os.environ["SMBD_MAXTIME"])
+else:
+ server_maxtime = 7500
def has_socket_wrapper(bindir):
conffile = os.path.join(clientdir, "client.conf")
os.environ["SMB_CONF_PATH"] = conffile
-def write_clientconf(conffile, clientdir, vars):
- if not os.path.isdir(clientdir):
- os.mkdir(clientdir, 0777)
-
- for n in ["private", "lockdir", "statedir", "cachedir"]:
- p = os.path.join(clientdir, n)
- if os.path.isdir(p):
- shutil.rmtree(p)
- os.mkdir(p, 0777)
-
- # this is ugly, but the ncalrpcdir needs exactly 0755
- # otherwise tests fail.
- mask = os.umask(0022)
-
- for n in ["ncalrpcdir", "ncalrpcdir/np"]:
- p = os.path.join(clientdir, n)
- if os.path.isdir(p):
- shutil.rmtree(p)
- os.mkdir(p, 0777)
- os.umask(mask)
-
- settings = {
- "netbios name": "client",
- "private dir": os.path.join(clientdir, "private"),
- "lock dir": os.path.join(clientdir, "lockdir"),
- "state directory": os.path.join(clientdir, "statedir"),
- "cache directory": os.path.join(clientdir, "cachedir"),
- "ncalrpc dir": os.path.join(clientdir, "ncalrpcdir"),
- "name resolve order": "file bcast",
- "panic action": os.path.join(os.path.dirname(__file__), "gdb_backtrace \%d"),
- "max xmit": "32K",
- "notify:inotify": "false",
- "ldb:nosync": "true",
- "system:anonymous": "true",
- "client lanman auth": "Yes",
- "log level": "1",
- "torture:basedir": clientdir,
- # We don't want to pass our self-tests if the PAC code is wrong
- "gensec:require_pac": "true",
- "resolv:host file": os.path.join(prefix_abs, "dns_host_file"),
- # We don't want to run 'speed' tests for very long
- "torture:timelimit": "1",
- }
-
- if "DOMAIN" in vars:
- settings["workgroup"] = vars["DOMAIN"]
- if "REALM" in vars:
- settings["realm"] = vars["REALM"]
- if opts.socket_wrapper:
- settings["interfaces"] = interfaces
-
- f = open(conffile, 'w')
- try:
- f.write("[global]\n")
- for item in settings.iteritems():
- f.write("\t%s = %s\n" % item)
- finally:
- f.close()
-
todo = []
if not opts.testlist:
os.environ["SELFTEST_MAXTIME"] = str(torture_maxtime)
-def open_file_or_pipe(path, mode):
- if path.endswith("|"):
- return os.popen(path[:-1], mode)
- return open(path, mode)
-
available = []
for fn in opts.testlist:
- inf = open_file_or_pipe(fn, 'r')
- try:
- for testsuite in testlist.read_testlist(inf, sys.stdout):
- if not testlist.should_run_test(tests, testsuite):
- continue
- name = testsuite[0]
- if includes is not None and testlist.find_in_list(includes, name) is not None:
- continue
- available.append(testsuite)
- finally:
- inf.close()
+ for testsuite in testlist.read_testlist_file(fn):
+ if not testlist.should_run_test(tests, testsuite):
+ continue
+ name = testsuite[0]
+ if (includes is not None and
+ testlist.find_in_list(includes, name) is not None):
+ continue
+ available.append(testsuite)
if opts.load_list:
restricted_mgr = testlist.RestrictedTestManager.from_path(opts.load_list)
else:
restricted_mgr = None
-
for testsuite in available:
name = testsuite[0]
skipreason = skip(name)
"VAMPIRE_DC_NETBIOSNAME",
"VAMPIRE_DC_NETBIOSALIAS",
+ # domain controller stuff for Vampired DC
+ "PROMOTED_DC_SERVER",
+ "PROMOTED_DC_SERVER_IP",
+ "PROMOTED_DC_NETBIOSNAME",
+ "PROMOTED_DC_NETBIOSALIAS",
+
# server stuff
"SERVER",
"SERVER_IP",
"LOCAL_PATH"
]
-def exported_envvars_str(testenv_vars):
- out = ""
-
- for n in exported_envvars:
- if not n in testenv_vars:
- continue
- out += "%s=%s\n" % (n, testenv_vars[n])
-
- return out
-
-
def switch_env(name, prefix):
if ":" in name:
(envname, option) = name.split(":", 1)
elif name in os.environ:
del os.environ[name]
- return testenv_vars
+ return env
# This 'global' file needs to be empty when we start
dns_host_file_path = os.path.join(prefix_abs, "dns_host_file")
if opts.testenv:
testenv_name = os.environ.get("SELFTEST_TESTENV", testenv_default)
- testenv_vars = switch_env(testenv_name, prefix)
+ env = switch_env(testenv_name, prefix)
+ testenv_vars = env.get_vars()
os.environ["PIDDIR"] = testenv_vars["PIDDIR"]
os.environ["ENVNAME"] = testenv_name
- envvarstr = exported_envvars_str(testenv_vars)
+ envvarstr = exported_envvars_str(testenv_vars, exported_envvars)
term = os.environ.get("TERMINAL", "xterm -e")
cmd = """'echo -e "
else:
for (name, envname, cmd, supports_loadfile, supports_idlist, subtests) in todo:
try:
- envvars = switch_env(envname, prefix)
+ env = switch_env(envname, prefix)
except UnsupportedEnvironment:
subunit_ops.start_testsuite(name)
subunit_ops.end_testsuite(name, "skip",
cmd, tmpf = expand_command_run(cmd, supports_loadfile, supports_idlist,
subtests)
- run_testsuite(envname, name, cmd)
+ run_testsuite(name, cmd, subunit_ops, env=env)
if tmpf is not None:
os.remove(tmpf)
if opts.resetup_environment:
env_manager.teardown_env(envname)
+ env_manager.teardown_all()
sys.stdout.write("\n")
-if not opts.list:
- env_manager.teardown_all()
-
# if there were any valgrind failures, show them
for fn in os.listdir(prefix):
if fn.startswith("valgrind.log"):