X-Git-Url: http://git.samba.org/samba.git/?p=kai%2Fsamba.git;a=blobdiff_plain;f=wintest%2Fwintest.py;h=b8e6ea2dd19d69aa69e04402f86ecf1af52c0262;hp=d7ca5fe06753e393acfaf2dc07156ed6dc526603;hb=fd38dabdcc3848a7006cb1bb2a8cf56a24ec5d66;hpb=f6adad4d25b884ebdeccdf153d6dbbd016f5d65b diff --git a/wintest/wintest.py b/wintest/wintest.py index d7ca5fe0675..b8e6ea2dd19 100644 --- a/wintest/wintest.py +++ b/wintest/wintest.py @@ -11,6 +11,7 @@ class wintest(): def __init__(self): self.vars = {} self.list_mode = False + self.vms = None os.putenv('PYTHONUNBUFFERED', '1') def setvar(self, varname, value): @@ -19,17 +20,25 @@ class wintest(): def getvar(self, varname): '''return a substitution variable''' + if not varname in self.vars: + return None return self.vars[varname] def setwinvars(self, vm, prefix='WIN'): '''setup WIN_XX vars based on a vm name''' - for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']: + for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'REALM', 'DOMAIN', 'IP']: vname = '%s_%s' % (vm, v) if vname in self.vars: self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname)) else: self.vars.pop("%s_%s" % (prefix,v), None) + if self.getvar("WIN_REALM"): + self.setvar("WIN_REALM", self.getvar("WIN_REALM").upper()) + self.setvar("WIN_LCREALM", self.getvar("WIN_REALM").lower()) + dnsdomain = self.getvar("WIN_REALM") + self.setvar("WIN_BASEDN", "DC=" + dnsdomain.replace(".", ",DC=")) + def info(self, msg): '''print some information''' if not self.list_mode: @@ -57,6 +66,11 @@ class wintest(): '''set a list of tests to skip''' self.skiplist = skiplist.split(',') + def set_vms(self, vms): + '''set a list of VMs to test''' + if vms is not None: + self.vms = vms.split(',') + def skip(self, step): '''return True if we should skip a step''' if self.list_mode: @@ -94,6 +108,13 @@ class wintest(): '''see if a variable has been set''' return varname in self.vars + def have_vm(self, vmname): + '''see if a VM should be used''' + if not self.have_var(vmname + '_VM'): + return False + if self.vms is None: + return True + return vmname in self.vms def putenv(self, key, value): '''putenv with substitution''' @@ -115,6 +136,7 @@ class wintest(): f.close() def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True): + '''run a command''' cmd = self.substitute(cmd) if isinstance(cmd, list): self.info('$ ' + " ".join(cmd)) @@ -131,7 +153,9 @@ class wintest(): else: return subprocess.call(cmd, shell=shell, cwd=dir) + def run_child(self, cmd, dir="."): + '''create a child and return the Popen handle to it''' cwd = os.getcwd() cmd = self.substitute(cmd) if isinstance(cmd, list): @@ -143,7 +167,7 @@ class wintest(): else: shell=True os.chdir(dir) - ret = subprocess.Popen(cmd, shell=shell) + ret = subprocess.Popen(cmd, shell=shell, stderr=subprocess.STDOUT) os.chdir(cwd) return ret @@ -153,7 +177,7 @@ class wintest(): return self.run_cmd(cmd, output=True) def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False, - casefold=False): + casefold=True): '''check that command output contains the listed strings''' if isinstance(contains, str): @@ -163,6 +187,9 @@ class wintest(): self.info(out) for c in self.substitute(contains): if regex: + if casefold: + c = c.upper() + out = out.upper() m = re.search(c, out) if m is None: start = -1 @@ -186,7 +213,7 @@ class wintest(): out = out[end:] def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False, - ordered=False, regex=False, casefold=False): + ordered=False, regex=False, casefold=True): '''retry a command a number of times''' while retries > 0: try: @@ -195,21 +222,31 @@ class wintest(): return except: time.sleep(delay) - retries = retries - 1 + retries -= 1 + self.info("retrying (retries=%u delay=%u)" % (retries, delay)) raise RuntimeError("Failed to find %s" % contains) - def pexpect_spawn(self, cmd, timeout=60): + def pexpect_spawn(self, cmd, timeout=60, crlf=True, casefold=True): '''wrapper around pexpect spawn''' cmd = self.substitute(cmd) self.info("$ " + cmd) ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout) def sendline_sub(line): - line = self.substitute(line).replace('\n', '\r\n') - return ret.old_sendline(line + '\r') + line = self.substitute(line) + if crlf: + line = line.replace('\n', '\r\n') + '\r' + return ret.old_sendline(line) - def expect_sub(line, timeout=ret.timeout): + def expect_sub(line, timeout=ret.timeout, casefold=casefold): line = self.substitute(line) + if casefold: + if isinstance(line, list): + for i in range(len(line)): + if isinstance(line[i], str): + line[i] = '(?i)' + line[i] + elif isinstance(line, str): + line = '(?i)' + line return ret.old_expect(line, timeout=timeout) ret.old_sendline = ret.sendline @@ -221,8 +258,11 @@ class wintest(): def get_nameserver(self): '''Get the current nameserver from /etc/resolv.conf''' - child = self.pexpect_spawn('cat /etc/resolv.conf') - child.expect('nameserver') + child = self.pexpect_spawn('cat /etc/resolv.conf', crlf=False) + i = child.expect(['Generated by wintest', 'nameserver']) + if i == 0: + child.expect('your original resolv.conf') + child.expect('nameserver') child.expect('\d+.\d+.\d+.\d+') return child.after @@ -231,6 +271,11 @@ class wintest(): self.setvar('VMNAME', vmname) self.run_cmd("${VM_POWEROFF}", checkfail=checkfail) + def vm_reset(self, vmname): + '''reset a VM''' + self.setvar('VMNAME', vmname) + self.run_cmd("${VM_RESET}") + def vm_restore(self, vmname, snapshot): '''restore a VM''' self.setvar('VMNAME', vmname) @@ -297,6 +342,33 @@ class wintest(): self.setvar('WIN_DEFAULT_GATEWAY', child.after) child.expect("C:") + def get_is_dc(self, child): + '''check if a windows machine is a domain controller''' + child.sendline("dcdiag") + i = child.expect(["is not a Directory Server", + "is not recognized as an internal or external command", + "Home Server = ", + "passed test Replications"]) + if i == 0: + return False + if i == 1 or i == 3: + child.expect("C:") + child.sendline("net config Workstation") + child.expect("Workstation domain") + child.expect('[\S]+') + domain = child.after + i = child.expect(["Workstation Domain DNS Name", "Logon domain"]) + '''If we get the Logon domain first, we are not in an AD domain''' + if i == 1: + return False + if domain.upper() == self.getvar("WIN_DOMAIN").upper(): + return True + + child.expect('[\S]+') + hostname = child.after + if hostname.upper() == self.getvar("WIN_HOSTNAME").upper(): + return True + def run_tlntadmn(self, child): '''remove the annoying telnet restrictions''' child.sendline('tlntadmn config maxconn=1024') @@ -310,7 +382,9 @@ class wintest(): child.expect("C:") if i == 1: child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL') - child.expect("Ok") + i = child.expect(["Ok", "The following command was not found"]) + if i != 0: + self.info("Firewall disable failed - ignoring") child.expect("C:") def set_dns(self, child): @@ -326,6 +400,9 @@ class wintest(): connected, but don't use DHCP, and force the DNS server to our DNS server. This allows DNS updates to run""" self.get_ipconfig(child) + if self.getvar("WIN_IPV4_ADDRESS") != self.getvar("WIN_IP"): + raise RuntimeError("ipconfig address %s != nmblookup address %s" % (self.getvar("WIN_IPV4_ADDRESS"), + self.getvar("WIN_IP"))) child.sendline('netsh') child.expect('netsh>') child.sendline('offline') @@ -343,13 +420,33 @@ class wintest(): child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5) return True - - def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True, run_tlntadmn=True): + + + def resolve_ip(self, hostname, retries=60, delay=5): + '''resolve an IP given a hostname, assuming NBT''' + while retries > 0: + child = self.pexpect_spawn("bin/nmblookup %s" % hostname) + i = child.expect(['\d+.\d+.\d+.\d+', "Lookup failed"]) + if i == 0: + return child.after + retries -= 1 + time.sleep(delay) + self.info("retrying (retries=%u delay=%u)" % (retries, delay)) + raise RuntimeError("Failed to resolve IP of %s" % hostname) + + + def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, + disable_firewall=True, run_tlntadmn=True): '''open a telnet connection to a windows server, return the pexpect child''' set_route = False set_dns = False + if self.getvar('WIN_IP'): + ip = self.getvar('WIN_IP') + else: + ip = self.resolve_ip(hostname) + self.setvar('WIN_IP', ip) while retries > 0: - child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'") + child = self.pexpect_spawn("telnet " + ip + " -l '" + username + "'") i = child.expect(["Welcome to Microsoft Telnet Service", "Denying new connections due to the limit on number of connections", "No more connections are allowed to telnet server", @@ -361,6 +458,7 @@ class wintest(): child.close() time.sleep(delay) retries -= 1 + self.info("retrying (retries=%u delay=%u)" % (retries, delay)) continue child.expect("password:") child.sendline(password) @@ -375,6 +473,7 @@ class wintest(): child.close() time.sleep(delay) retries -= 1 + self.info("retrying (retries=%u delay=%u)" % (retries, delay)) continue if set_dns: set_dns = False @@ -411,7 +510,46 @@ class wintest(): if len(s) > 0: s[1] = s[1].upper() username = '@'.join(s) - child = self.pexpect_spawn('kinit -V ' + username) - child.expect("Password for") + child = self.pexpect_spawn('kinit ' + username) + child.expect("Password") child.sendline(password) - child.expect("Authenticated to Kerberos") + child.expect(pexpect.EOF) + child.close() + if child.exitstatus != 0: + raise RuntimeError("kinit failed with status %d" % child.exitstatus) + + def get_domains(self): + '''return a dictionary of DNS domains and IPs for named.conf''' + ret = {} + for v in self.vars: + if v[-6:] == "_REALM": + base = v[:-6] + if base + '_IP' in self.vars: + ret[self.vars[base + '_REALM']] = self.vars[base + '_IP'] + return ret + + def wait_reboot(self, retries=3): + '''wait for a VM to reboot''' + + # first wait for it to shutdown + self.port_wait("${WIN_IP}", 139, wait_for_fail=True, delay=6) + + # now wait for it to come back. If it fails to come back + # then try resetting it + while retries > 0: + try: + self.port_wait("${WIN_IP}", 139) + return + except: + retries -= 1 + self.vm_reset("${WIN_VM}") + self.info("retrying reboot (retries=%u)" % retries) + raise RuntimeError(self.substitute("VM ${WIN_VM} failed to reboot")) + + def get_vms(self): + '''return a dictionary of all the configured VM names''' + ret = [] + for v in self.vars: + if v[-3:] == "_VM": + ret.append(self.vars[v]) + return ret