3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
9 '''testing of Samba against windows VMs'''
13 self.list_mode = False
14 os.putenv('PYTHONUNBUFFERED', '1')
16 def setvar(self, varname, value):
17 '''set a substitution variable'''
18 self.vars[varname] = value
20 def setwinvars(self, vm, prefix='WIN'):
21 '''setup WIN_XX vars based on a vm name'''
22 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']:
23 vname = '%s_%s' % (vm, v)
24 if vname in self.vars:
25 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
27 self.vars.pop("%s_%s" % (prefix,v), None)
30 '''print some information'''
31 if not self.list_mode:
32 print(self.substitute(msg))
34 def load_config(self, fname):
35 '''load the config file'''
39 if len(line) == 0 or line[0] == '#':
41 colon = line.find(':')
43 raise RuntimeError("Invalid config line '%s'" % line)
44 varname = line[0:colon].strip()
45 value = line[colon+1:].strip()
46 self.setvar(varname, value)
48 def list_steps_mode(self):
49 '''put wintest in step listing mode'''
52 def set_skip(self, skiplist):
53 '''set a list of tests to skip'''
54 self.skiplist = skiplist.split(',')
57 '''return True if we should skip a step'''
61 return step in self.skiplist
63 def substitute(self, text):
64 """Substitute strings of the form ${NAME} in text, replacing
65 with substitutions from vars.
67 if isinstance(text, list):
69 for i in range(len(ret)):
70 ret[i] = self.substitute(ret[i])
73 """We may have objects such as pexpect.EOF that are not strings"""
74 if not isinstance(text, str):
77 var_start = text.find("${")
80 var_end = text.find("}", var_start)
83 var_name = text[var_start+2:var_end]
84 if not var_name in self.vars:
85 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
86 text = text.replace("${%s}" % var_name, self.vars[var_name])
89 def have_var(self, varname):
90 '''see if a variable has been set'''
91 return varname in self.vars
94 def putenv(self, key, value):
95 '''putenv with substitution'''
96 os.putenv(key, self.substitute(value))
99 '''chdir with substitution'''
100 os.chdir(self.substitute(dir))
102 def del_files(self, dirs):
103 '''delete all files in the given directory'''
105 self.run_cmd("find %s -type f | xargs rm -f" % d)
107 def write_file(self, filename, text, mode='w'):
108 '''write to a file'''
109 f = open(self.substitute(filename), mode=mode)
110 f.write(self.substitute(text))
113 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
114 cmd = self.substitute(cmd)
115 if isinstance(cmd, list):
116 self.info('$ ' + " ".join(cmd))
118 self.info('$ ' + cmd)
120 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
121 if isinstance(cmd, list):
126 return subprocess.check_call(cmd, shell=shell, cwd=dir)
128 return subprocess.call(cmd, shell=shell, cwd=dir)
130 def run_child(self, cmd, dir="."):
132 cmd = self.substitute(cmd)
133 if isinstance(cmd, list):
134 self.info('$ ' + " ".join(cmd))
136 self.info('$ ' + cmd)
137 if isinstance(cmd, list):
142 ret = subprocess.Popen(cmd, shell=shell)
146 def cmd_output(self, cmd):
147 '''return output from and command'''
148 cmd = self.substitute(cmd)
149 return self.run_cmd(cmd, output=True)
151 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
153 '''check that command output contains the listed strings'''
155 if isinstance(contains, str):
156 contains = [contains]
158 out = self.cmd_output(cmd)
160 for c in self.substitute(contains):
162 m = re.search(c, out)
170 start = out.upper().find(c.upper())
177 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
180 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
181 if ordered and start != -1:
184 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
185 ordered=False, regex=False, casefold=False):
186 '''retry a command a number of times'''
189 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
190 ordered=ordered, regex=regex, casefold=casefold)
194 retries = retries - 1
195 raise RuntimeError("Failed to find %s" % contains)
197 def pexpect_spawn(self, cmd, timeout=60):
198 '''wrapper around pexpect spawn'''
199 cmd = self.substitute(cmd)
200 self.info("$ " + cmd)
201 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
203 def sendline_sub(line):
204 line = self.substitute(line).replace('\n', '\r\n')
205 return ret.old_sendline(line + '\r')
207 def expect_sub(line, timeout=ret.timeout):
208 line = self.substitute(line)
209 return ret.old_expect(line, timeout=timeout)
211 ret.old_sendline = ret.sendline
212 ret.sendline = sendline_sub
213 ret.old_expect = ret.expect
214 ret.expect = expect_sub
218 def get_nameserver(self):
219 '''Get the current nameserver from /etc/resolv.conf'''
220 child = self.pexpect_spawn('cat /etc/resolv.conf')
221 child.expect('nameserver')
222 child.expect('\d+.\d+.\d+.\d+')
225 def vm_poweroff(self, vmname, checkfail=True):
227 self.setvar('VMNAME', vmname)
228 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
230 def vm_restore(self, vmname, snapshot):
232 self.setvar('VMNAME', vmname)
233 self.setvar('SNAPSHOT', snapshot)
234 self.run_cmd("${VM_RESTORE}")
236 def ping_wait(self, hostname):
237 '''wait for a hostname to come up on the network'''
238 hostname = self.substitute(hostname)
242 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
247 raise RuntimeError("Failed to ping %s" % hostname)
248 self.info("Host %s is up" % hostname)
250 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
251 '''wait for a host to come up on the network'''
252 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
253 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
255 def run_net_time(self, child):
256 '''run net time on windows'''
257 child.sendline("net time \\\\${HOSTNAME} /set")
258 child.expect("Do you want to set the local computer")
260 child.expect("The command completed successfully")
262 def run_date_time(self, child, time_tuple=None):
263 '''run date and time on windows'''
264 if time_tuple is None:
265 time_tuple = time.localtime()
266 child.sendline("date")
267 child.expect("Enter the new date:")
268 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
270 child.sendline(time.strftime("%d-%m-%y", time_tuple))
272 child.sendline(time.strftime("%m-%d-%y", time_tuple))
274 child.sendline("time")
275 child.expect("Enter the new time:")
276 child.sendline(time.strftime("%H:%M:%S", time_tuple))
279 def get_ipconfig(self, child):
280 '''get the IP configuration of the child'''
281 child.sendline("ipconfig /all")
282 child.expect('Ethernet adapter ')
283 child.expect("[\w\s]+")
284 self.setvar("WIN_NIC", child.after)
285 child.expect(['DHCP Enabled', 'Dhcp Enabled'])
286 i = child.expect(['Yes', 'No'])
288 self.setvar("WIN_DHCP", True)
290 self.setvar("WIN_DHCP", False)
291 child.expect(['IPv4 Address', 'IP Address'])
292 child.expect('\d+.\d+.\d+.\d+')
293 self.setvar('WIN_IPV4_ADDRESS', child.after)
294 child.expect('Subnet Mask')
295 child.expect('\d+.\d+.\d+.\d+')
296 self.setvar('WIN_SUBNET_MASK', child.after)
297 child.expect('Default Gateway')
298 child.expect('\d+.\d+.\d+.\d+')
299 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
302 def run_tlntadmn(self, child):
303 '''remove the annoying telnet restrictions'''
304 child.sendline('tlntadmn config maxconn=1024')
305 child.expect("The settings were successfully updated")
308 def disable_firewall(self, child):
309 '''remove the annoying firewall'''
310 child.sendline('netsh advfirewall set allprofiles state off')
311 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
314 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
318 def set_dns(self, child):
319 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
320 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
326 def set_ip(self, child):
327 '''fix the IP address to the same value it had when we
328 connected, but don't use DHCP, and force the DNS server to our
329 DNS server. This allows DNS updates to run'''
330 self.get_ipconfig(child)
331 if self.vars['WIN_DHCP'] is False:
333 child.sendline('netsh')
334 child.expect('netsh>')
335 child.sendline('offline')
336 child.expect('netsh>')
337 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
338 child.expect('netsh>')
339 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
340 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
342 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
343 child.expect('netsh>')
344 child.sendline('commit')
345 child.sendline('online')
346 child.sendline('exit')
348 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
351 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True, run_tlntadmn=True):
352 '''open a telnet connection to a windows server, return the pexpect child'''
356 child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
357 i = child.expect(["Welcome to Microsoft Telnet Service",
358 "Denying new connections due to the limit on number of connections",
359 "No more connections are allowed to telnet server",
360 "Unable to connect to remote host",
362 "Connection refused",
369 child.expect("password:")
370 child.sendline(password)
371 i = child.expect(["C:",
372 "Denying new connections due to the limit on number of connections",
373 "No more connections are allowed to telnet server",
374 "Unable to connect to remote host",
376 "Connection refused",
385 if self.set_dns(child):
388 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
392 self.run_date_time(child, None)
395 self.run_tlntadmn(child)
398 self.disable_firewall(child)
399 disable_firewall = False
402 if self.set_ip(child):
407 raise RuntimeError("Failed to connect with telnet")
409 def kinit(self, username, password):
410 '''use kinit to setup a credentials cache'''
411 self.run_cmd("kdestroy")
412 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
413 username = self.substitute(username)
414 s = username.split('@')
417 username = '@'.join(s)
418 child = self.pexpect_spawn('kinit -V ' + username)
419 child.expect("Password for")
420 child.sendline(password)
421 child.expect("Authenticated to Kerberos")