3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
9 '''testing of Samba against windows VMs'''
13 self.list_mode = False
15 os.putenv('PYTHONUNBUFFERED', '1')
17 def setvar(self, varname, value):
18 '''set a substitution variable'''
19 self.vars[varname] = value
21 def getvar(self, varname):
22 '''return a substitution variable'''
23 if not varname in self.vars:
25 return self.vars[varname]
27 def setwinvars(self, vm, prefix='WIN'):
28 '''setup WIN_XX vars based on a vm name'''
29 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'REALM', 'DOMAIN', 'IP']:
30 vname = '%s_%s' % (vm, v)
31 if vname in self.vars:
32 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
34 self.vars.pop("%s_%s" % (prefix,v), None)
36 if self.getvar("WIN_REALM"):
37 self.setvar("WIN_REALM", self.getvar("WIN_REALM").upper())
38 self.setvar("WIN_LCREALM", self.getvar("WIN_REALM").lower())
39 dnsdomain = self.getvar("WIN_REALM")
40 self.setvar("WIN_BASEDN", "DC=" + dnsdomain.replace(".", ",DC="))
43 '''print some information'''
44 if not self.list_mode:
45 print(self.substitute(msg))
47 def load_config(self, fname):
48 '''load the config file'''
52 if len(line) == 0 or line[0] == '#':
54 colon = line.find(':')
56 raise RuntimeError("Invalid config line '%s'" % line)
57 varname = line[0:colon].strip()
58 value = line[colon+1:].strip()
59 self.setvar(varname, value)
61 def list_steps_mode(self):
62 '''put wintest in step listing mode'''
65 def set_skip(self, skiplist):
66 '''set a list of tests to skip'''
67 self.skiplist = skiplist.split(',')
69 def set_vms(self, vms):
70 '''set a list of VMs to test'''
72 self.vms = vms.split(',')
75 '''return True if we should skip a step'''
79 return step in self.skiplist
81 def substitute(self, text):
82 """Substitute strings of the form ${NAME} in text, replacing
83 with substitutions from vars.
85 if isinstance(text, list):
87 for i in range(len(ret)):
88 ret[i] = self.substitute(ret[i])
91 """We may have objects such as pexpect.EOF that are not strings"""
92 if not isinstance(text, str):
95 var_start = text.find("${")
98 var_end = text.find("}", var_start)
101 var_name = text[var_start+2:var_end]
102 if not var_name in self.vars:
103 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
104 text = text.replace("${%s}" % var_name, self.vars[var_name])
107 def have_var(self, varname):
108 '''see if a variable has been set'''
109 return varname in self.vars
111 def have_vm(self, vmname):
112 '''see if a VM should be used'''
113 if not self.have_var(vmname + '_VM'):
117 return vmname in self.vms
119 def putenv(self, key, value):
120 '''putenv with substitution'''
121 os.putenv(key, self.substitute(value))
123 def chdir(self, dir):
124 '''chdir with substitution'''
125 os.chdir(self.substitute(dir))
127 def del_files(self, dirs):
128 '''delete all files in the given directory'''
130 self.run_cmd("find %s -type f | xargs rm -f" % d)
132 def write_file(self, filename, text, mode='w'):
133 '''write to a file'''
134 f = open(self.substitute(filename), mode=mode)
135 f.write(self.substitute(text))
138 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
140 cmd = self.substitute(cmd)
141 if isinstance(cmd, list):
142 self.info('$ ' + " ".join(cmd))
144 self.info('$ ' + cmd)
146 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
147 if isinstance(cmd, list):
152 return subprocess.check_call(cmd, shell=shell, cwd=dir)
154 return subprocess.call(cmd, shell=shell, cwd=dir)
157 def run_child(self, cmd, dir="."):
158 '''create a child and return the Popen handle to it'''
160 cmd = self.substitute(cmd)
161 if isinstance(cmd, list):
162 self.info('$ ' + " ".join(cmd))
164 self.info('$ ' + cmd)
165 if isinstance(cmd, list):
170 ret = subprocess.Popen(cmd, shell=shell, stderr=subprocess.STDOUT)
174 def cmd_output(self, cmd):
175 '''return output from and command'''
176 cmd = self.substitute(cmd)
177 return self.run_cmd(cmd, output=True)
179 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
181 '''check that command output contains the listed strings'''
183 if isinstance(contains, str):
184 contains = [contains]
186 out = self.cmd_output(cmd)
188 for c in self.substitute(contains):
190 m = re.search(c, out)
198 start = out.upper().find(c.upper())
205 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
208 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
209 if ordered and start != -1:
212 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
213 ordered=False, regex=False, casefold=False):
214 '''retry a command a number of times'''
217 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
218 ordered=ordered, regex=regex, casefold=casefold)
223 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
224 raise RuntimeError("Failed to find %s" % contains)
226 def pexpect_spawn(self, cmd, timeout=60, crlf=True, casefold=True):
227 '''wrapper around pexpect spawn'''
228 cmd = self.substitute(cmd)
229 self.info("$ " + cmd)
230 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
232 def sendline_sub(line):
233 line = self.substitute(line)
235 line = line.replace('\n', '\r\n') + '\r'
236 return ret.old_sendline(line)
238 def expect_sub(line, timeout=ret.timeout, casefold=casefold):
239 line = self.substitute(line)
241 if isinstance(line, list):
242 for i in range(len(line)):
243 if isinstance(line[i], str):
244 line[i] = '(?i)' + line[i]
245 elif isinstance(line, str):
247 return ret.old_expect(line, timeout=timeout)
249 ret.old_sendline = ret.sendline
250 ret.sendline = sendline_sub
251 ret.old_expect = ret.expect
252 ret.expect = expect_sub
256 def get_nameserver(self):
257 '''Get the current nameserver from /etc/resolv.conf'''
258 child = self.pexpect_spawn('cat /etc/resolv.conf', crlf=False)
259 i = child.expect(['Generated by wintest', 'nameserver'])
261 child.expect('your original resolv.conf')
262 child.expect('nameserver')
263 child.expect('\d+.\d+.\d+.\d+')
266 def vm_poweroff(self, vmname, checkfail=True):
268 self.setvar('VMNAME', vmname)
269 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
271 def vm_reset(self, vmname):
273 self.setvar('VMNAME', vmname)
274 self.run_cmd("${VM_RESET}")
276 def vm_restore(self, vmname, snapshot):
278 self.setvar('VMNAME', vmname)
279 self.setvar('SNAPSHOT', snapshot)
280 self.run_cmd("${VM_RESTORE}")
282 def ping_wait(self, hostname):
283 '''wait for a hostname to come up on the network'''
284 hostname = self.substitute(hostname)
288 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
293 raise RuntimeError("Failed to ping %s" % hostname)
294 self.info("Host %s is up" % hostname)
296 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
297 '''wait for a host to come up on the network'''
298 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
299 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
301 def run_net_time(self, child):
302 '''run net time on windows'''
303 child.sendline("net time \\\\${HOSTNAME} /set")
304 child.expect("Do you want to set the local computer")
306 child.expect("The command completed successfully")
308 def run_date_time(self, child, time_tuple=None):
309 '''run date and time on windows'''
310 if time_tuple is None:
311 time_tuple = time.localtime()
312 child.sendline("date")
313 child.expect("Enter the new date:")
314 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
316 child.sendline(time.strftime("%d-%m-%y", time_tuple))
318 child.sendline(time.strftime("%m-%d-%y", time_tuple))
320 child.sendline("time")
321 child.expect("Enter the new time:")
322 child.sendline(time.strftime("%H:%M:%S", time_tuple))
325 def get_ipconfig(self, child):
326 '''get the IP configuration of the child'''
327 child.sendline("ipconfig /all")
328 child.expect('Ethernet adapter ')
329 child.expect("[\w\s]+")
330 self.setvar("WIN_NIC", child.after)
331 child.expect(['IPv4 Address', 'IP Address'])
332 child.expect('\d+.\d+.\d+.\d+')
333 self.setvar('WIN_IPV4_ADDRESS', child.after)
334 child.expect('Subnet Mask')
335 child.expect('\d+.\d+.\d+.\d+')
336 self.setvar('WIN_SUBNET_MASK', child.after)
337 child.expect('Default Gateway')
338 child.expect('\d+.\d+.\d+.\d+')
339 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
342 def get_is_dc(self, child):
343 child.sendline("dcdiag")
344 i = child.expect(["is not a Directory Server", "is not recognized as an internal or external command", "Home Server = "])
349 child.sendline("net config Workstation")
350 child.expect("Workstation domain")
351 child.expect('[\S]+')
353 i = child.expect(["Workstation Domain DNS Name", "Logon domain"])
354 '''If we get the Logon domain first, we are not in an AD domain'''
357 if domain.upper() == self.getvar("WIN_DOMAIN").upper():
360 child.expect('[\S]+')
361 hostname = child.after
362 if hostname.upper() == self.getvar("WIN_HOSTNAME").upper():
365 def run_tlntadmn(self, child):
366 '''remove the annoying telnet restrictions'''
367 child.sendline('tlntadmn config maxconn=1024')
368 child.expect("The settings were successfully updated")
371 def disable_firewall(self, child):
372 '''remove the annoying firewall'''
373 child.sendline('netsh advfirewall set allprofiles state off')
374 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
377 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
378 i = child.expect(["Ok", "The following command was not found"])
380 self.info("Firewall disable failed - ignoring")
383 def set_dns(self, child):
384 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
385 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
391 def set_ip(self, child):
392 """fix the IP address to the same value it had when we
393 connected, but don't use DHCP, and force the DNS server to our
394 DNS server. This allows DNS updates to run"""
395 self.get_ipconfig(child)
396 if self.getvar("WIN_IPV4_ADDRESS") != self.getvar("WIN_IP"):
397 raise RuntimeError("ipconfig address %s != nmblookup address %s" % (self.getvar("WIN_IPV4_ADDRESS"),
398 self.getvar("WIN_IP")))
399 child.sendline('netsh')
400 child.expect('netsh>')
401 child.sendline('offline')
402 child.expect('netsh>')
403 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
404 child.expect('netsh>')
405 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
406 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
408 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
409 child.expect('netsh>')
410 child.sendline('commit')
411 child.sendline('online')
412 child.sendline('exit')
414 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
418 def resolve_ip(self, hostname, retries=60, delay=5):
419 '''resolve an IP given a hostname, assuming NBT'''
421 child = self.pexpect_spawn("bin/nmblookup %s" % hostname)
422 i = child.expect(['\d+.\d+.\d+.\d+', "Lookup failed"])
427 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
428 raise RuntimeError("Failed to resolve IP of %s" % hostname)
431 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False,
432 disable_firewall=True, run_tlntadmn=True):
433 '''open a telnet connection to a windows server, return the pexpect child'''
436 if self.getvar('WIN_IP'):
437 ip = self.getvar('WIN_IP')
439 ip = self.resolve_ip(hostname)
440 self.setvar('WIN_IP', ip)
442 child = self.pexpect_spawn("telnet " + ip + " -l '" + username + "'")
443 i = child.expect(["Welcome to Microsoft Telnet Service",
444 "Denying new connections due to the limit on number of connections",
445 "No more connections are allowed to telnet server",
446 "Unable to connect to remote host",
448 "Connection refused",
454 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
456 child.expect("password:")
457 child.sendline(password)
458 i = child.expect(["C:",
459 "Denying new connections due to the limit on number of connections",
460 "No more connections are allowed to telnet server",
461 "Unable to connect to remote host",
463 "Connection refused",
469 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
473 if self.set_dns(child):
476 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
480 self.run_date_time(child, None)
483 self.run_tlntadmn(child)
486 self.disable_firewall(child)
487 disable_firewall = False
490 if self.set_ip(child):
495 raise RuntimeError("Failed to connect with telnet")
497 def kinit(self, username, password):
498 '''use kinit to setup a credentials cache'''
499 self.run_cmd("kdestroy")
500 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
501 username = self.substitute(username)
502 s = username.split('@')
505 username = '@'.join(s)
506 child = self.pexpect_spawn('kinit ' + username)
507 child.expect("Password")
508 child.sendline(password)
509 child.expect(pexpect.EOF)
511 if child.exitstatus != 0:
512 raise RuntimeError("kinit failed with status %d" % child.exitstatus)
514 def get_domains(self):
515 '''return a dictionary of DNS domains and IPs for named.conf'''
518 if v[-6:] == "_REALM":
520 if base + '_IP' in self.vars:
521 ret[self.vars[base + '_REALM']] = self.vars[base + '_IP']
524 def wait_reboot(self, retries=3):
525 '''wait for a VM to reboot'''
527 # first wait for it to shutdown
528 self.port_wait("${WIN_IP}", 139, wait_for_fail=True, delay=6)
530 # now wait for it to come back. If it fails to come back
531 # then try resetting it
534 self.port_wait("${WIN_IP}", 139)
538 self.vm_reset("${WIN_VM}")
539 self.info("retrying reboot (retries=%u)" % retries)
540 raise RuntimeError(self.substitute("VM ${WIN_VM} failed to reboot"))
543 '''return a dictionary of all the configured VM names'''
547 ret.append(self.vars[v])