3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
9 '''testing of Samba against windows VMs'''
13 self.list_mode = False
14 os.putenv('PYTHONUNBUFFERED', '1')
16 def setvar(self, varname, value):
17 '''set a substitution variable'''
18 self.vars[varname] = value
20 def getvar(self, varname):
21 '''return a substitution variable'''
22 if not varname in self.vars:
24 return self.vars[varname]
26 def setwinvars(self, vm, prefix='WIN'):
27 '''setup WIN_XX vars based on a vm name'''
28 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN', 'IP']:
29 vname = '%s_%s' % (vm, v)
30 if vname in self.vars:
31 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
33 self.vars.pop("%s_%s" % (prefix,v), None)
36 '''print some information'''
37 if not self.list_mode:
38 print(self.substitute(msg))
40 def load_config(self, fname):
41 '''load the config file'''
45 if len(line) == 0 or line[0] == '#':
47 colon = line.find(':')
49 raise RuntimeError("Invalid config line '%s'" % line)
50 varname = line[0:colon].strip()
51 value = line[colon+1:].strip()
52 self.setvar(varname, value)
54 def list_steps_mode(self):
55 '''put wintest in step listing mode'''
58 def set_skip(self, skiplist):
59 '''set a list of tests to skip'''
60 self.skiplist = skiplist.split(',')
63 '''return True if we should skip a step'''
67 return step in self.skiplist
69 def substitute(self, text):
70 """Substitute strings of the form ${NAME} in text, replacing
71 with substitutions from vars.
73 if isinstance(text, list):
75 for i in range(len(ret)):
76 ret[i] = self.substitute(ret[i])
79 """We may have objects such as pexpect.EOF that are not strings"""
80 if not isinstance(text, str):
83 var_start = text.find("${")
86 var_end = text.find("}", var_start)
89 var_name = text[var_start+2:var_end]
90 if not var_name in self.vars:
91 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
92 text = text.replace("${%s}" % var_name, self.vars[var_name])
95 def have_var(self, varname):
96 '''see if a variable has been set'''
97 return varname in self.vars
100 def putenv(self, key, value):
101 '''putenv with substitution'''
102 os.putenv(key, self.substitute(value))
104 def chdir(self, dir):
105 '''chdir with substitution'''
106 os.chdir(self.substitute(dir))
108 def del_files(self, dirs):
109 '''delete all files in the given directory'''
111 self.run_cmd("find %s -type f | xargs rm -f" % d)
113 def write_file(self, filename, text, mode='w'):
114 '''write to a file'''
115 f = open(self.substitute(filename), mode=mode)
116 f.write(self.substitute(text))
119 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
120 cmd = self.substitute(cmd)
121 if isinstance(cmd, list):
122 self.info('$ ' + " ".join(cmd))
124 self.info('$ ' + cmd)
126 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
127 if isinstance(cmd, list):
132 return subprocess.check_call(cmd, shell=shell, cwd=dir)
134 return subprocess.call(cmd, shell=shell, cwd=dir)
136 def run_child(self, cmd, dir="."):
138 cmd = self.substitute(cmd)
139 if isinstance(cmd, list):
140 self.info('$ ' + " ".join(cmd))
142 self.info('$ ' + cmd)
143 if isinstance(cmd, list):
148 ret = subprocess.Popen(cmd, shell=shell)
152 def cmd_output(self, cmd):
153 '''return output from and command'''
154 cmd = self.substitute(cmd)
155 return self.run_cmd(cmd, output=True)
157 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
159 '''check that command output contains the listed strings'''
161 if isinstance(contains, str):
162 contains = [contains]
164 out = self.cmd_output(cmd)
166 for c in self.substitute(contains):
168 m = re.search(c, out)
176 start = out.upper().find(c.upper())
183 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
186 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
187 if ordered and start != -1:
190 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
191 ordered=False, regex=False, casefold=False):
192 '''retry a command a number of times'''
195 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
196 ordered=ordered, regex=regex, casefold=casefold)
200 retries = retries - 1
201 raise RuntimeError("Failed to find %s" % contains)
203 def pexpect_spawn(self, cmd, timeout=60):
204 '''wrapper around pexpect spawn'''
205 cmd = self.substitute(cmd)
206 self.info("$ " + cmd)
207 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
209 def sendline_sub(line):
210 line = self.substitute(line).replace('\n', '\r\n')
211 return ret.old_sendline(line + '\r')
213 def expect_sub(line, timeout=ret.timeout):
214 line = self.substitute(line)
215 return ret.old_expect(line, timeout=timeout)
217 ret.old_sendline = ret.sendline
218 ret.sendline = sendline_sub
219 ret.old_expect = ret.expect
220 ret.expect = expect_sub
224 def get_nameserver(self):
225 '''Get the current nameserver from /etc/resolv.conf'''
226 child = self.pexpect_spawn('cat /etc/resolv.conf')
227 child.expect('nameserver')
228 child.expect('\d+.\d+.\d+.\d+')
231 def vm_poweroff(self, vmname, checkfail=True):
233 self.setvar('VMNAME', vmname)
234 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
236 def vm_restore(self, vmname, snapshot):
238 self.setvar('VMNAME', vmname)
239 self.setvar('SNAPSHOT', snapshot)
240 self.run_cmd("${VM_RESTORE}")
242 def ping_wait(self, hostname):
243 '''wait for a hostname to come up on the network'''
244 hostname = self.substitute(hostname)
248 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
253 raise RuntimeError("Failed to ping %s" % hostname)
254 self.info("Host %s is up" % hostname)
256 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
257 '''wait for a host to come up on the network'''
258 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
259 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
261 def run_net_time(self, child):
262 '''run net time on windows'''
263 child.sendline("net time \\\\${HOSTNAME} /set")
264 child.expect("Do you want to set the local computer")
266 child.expect("The command completed successfully")
268 def run_date_time(self, child, time_tuple=None):
269 '''run date and time on windows'''
270 if time_tuple is None:
271 time_tuple = time.localtime()
272 child.sendline("date")
273 child.expect("Enter the new date:")
274 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
276 child.sendline(time.strftime("%d-%m-%y", time_tuple))
278 child.sendline(time.strftime("%m-%d-%y", time_tuple))
280 child.sendline("time")
281 child.expect("Enter the new time:")
282 child.sendline(time.strftime("%H:%M:%S", time_tuple))
285 def get_ipconfig(self, child):
286 '''get the IP configuration of the child'''
287 child.sendline("ipconfig /all")
288 child.expect('Ethernet adapter ')
289 child.expect("[\w\s]+")
290 self.setvar("WIN_NIC", child.after)
291 child.expect(['IPv4 Address', 'IP Address'])
292 child.expect('\d+.\d+.\d+.\d+')
293 self.setvar('WIN_IPV4_ADDRESS', child.after)
294 child.expect('Subnet Mask')
295 child.expect('\d+.\d+.\d+.\d+')
296 self.setvar('WIN_SUBNET_MASK', child.after)
297 child.expect('Default Gateway')
298 child.expect('\d+.\d+.\d+.\d+')
299 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
302 def run_tlntadmn(self, child):
303 '''remove the annoying telnet restrictions'''
304 child.sendline('tlntadmn config maxconn=1024')
305 child.expect("The settings were successfully updated")
308 def disable_firewall(self, child):
309 '''remove the annoying firewall'''
310 child.sendline('netsh advfirewall set allprofiles state off')
311 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
314 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
318 def set_dns(self, child):
319 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
320 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
326 def set_ip(self, child):
327 """fix the IP address to the same value it had when we
328 connected, but don't use DHCP, and force the DNS server to our
329 DNS server. This allows DNS updates to run"""
330 self.get_ipconfig(child)
331 child.sendline('netsh')
332 child.expect('netsh>')
333 child.sendline('offline')
334 child.expect('netsh>')
335 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
336 child.expect('netsh>')
337 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
338 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
340 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
341 child.expect('netsh>')
342 child.sendline('commit')
343 child.sendline('online')
344 child.sendline('exit')
346 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
350 def resolve_ip(self, hostname, retries=60, delay=5):
351 '''resolve an IP given a hostname, assuming NBT'''
353 child = self.pexpect_spawn("bin/nmblookup %s" % hostname)
354 i = child.expect(['\d+.\d+.\d+.\d+', "Lookup failed"])
359 raise RuntimeError("Failed to resolve IP of %s" % hostname)
362 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False,
363 disable_firewall=True, run_tlntadmn=True):
364 '''open a telnet connection to a windows server, return the pexpect child'''
367 if self.getvar('WIN_IP'):
368 ip = self.getvar('WIN_IP')
370 ip = self.resolve_ip(hostname)
371 self.setvar('WIN_IP', ip)
373 child = self.pexpect_spawn("telnet " + ip + " -l '" + username + "'")
374 i = child.expect(["Welcome to Microsoft Telnet Service",
375 "Denying new connections due to the limit on number of connections",
376 "No more connections are allowed to telnet server",
377 "Unable to connect to remote host",
379 "Connection refused",
386 child.expect("password:")
387 child.sendline(password)
388 i = child.expect(["C:",
389 "Denying new connections due to the limit on number of connections",
390 "No more connections are allowed to telnet server",
391 "Unable to connect to remote host",
393 "Connection refused",
402 if self.set_dns(child):
405 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
409 self.run_date_time(child, None)
412 self.run_tlntadmn(child)
415 self.disable_firewall(child)
416 disable_firewall = False
419 if self.set_ip(child):
424 raise RuntimeError("Failed to connect with telnet")
426 def kinit(self, username, password):
427 '''use kinit to setup a credentials cache'''
428 self.run_cmd("kdestroy")
429 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
430 username = self.substitute(username)
431 s = username.split('@')
434 username = '@'.join(s)
435 child = self.pexpect_spawn('kinit -V ' + username)
436 child.expect("Password for")
437 child.sendline(password)
438 child.expect("Authenticated to Kerberos")
441 def get_domains(self):
442 '''return a dictionary of DNS domains and IPs for named.conf'''
445 if v[-6:] == "_REALM":
447 if base + '_IP' in self.vars:
448 ret[self.vars[base + '_REALM']] = self.vars[base + '_IP']