3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
9 '''testing of Samba against windows VMs'''
13 self.list_mode = False
14 os.putenv('PYTHONUNBUFFERED', '1')
16 def setvar(self, varname, value):
17 '''set a substitution variable'''
18 self.vars[varname] = value
20 def getvar(self, varname):
21 '''return a substitution variable'''
22 return self.vars[varname]
24 def setwinvars(self, vm, prefix='WIN'):
25 '''setup WIN_XX vars based on a vm name'''
26 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']:
27 vname = '%s_%s' % (vm, v)
28 if vname in self.vars:
29 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
31 self.vars.pop("%s_%s" % (prefix,v), None)
34 '''print some information'''
35 if not self.list_mode:
36 print(self.substitute(msg))
38 def load_config(self, fname):
39 '''load the config file'''
43 if len(line) == 0 or line[0] == '#':
45 colon = line.find(':')
47 raise RuntimeError("Invalid config line '%s'" % line)
48 varname = line[0:colon].strip()
49 value = line[colon+1:].strip()
50 self.setvar(varname, value)
52 def list_steps_mode(self):
53 '''put wintest in step listing mode'''
56 def set_skip(self, skiplist):
57 '''set a list of tests to skip'''
58 self.skiplist = skiplist.split(',')
61 '''return True if we should skip a step'''
65 return step in self.skiplist
67 def substitute(self, text):
68 """Substitute strings of the form ${NAME} in text, replacing
69 with substitutions from vars.
71 if isinstance(text, list):
73 for i in range(len(ret)):
74 ret[i] = self.substitute(ret[i])
77 """We may have objects such as pexpect.EOF that are not strings"""
78 if not isinstance(text, str):
81 var_start = text.find("${")
84 var_end = text.find("}", var_start)
87 var_name = text[var_start+2:var_end]
88 if not var_name in self.vars:
89 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
90 text = text.replace("${%s}" % var_name, self.vars[var_name])
93 def have_var(self, varname):
94 '''see if a variable has been set'''
95 return varname in self.vars
98 def putenv(self, key, value):
99 '''putenv with substitution'''
100 os.putenv(key, self.substitute(value))
102 def chdir(self, dir):
103 '''chdir with substitution'''
104 os.chdir(self.substitute(dir))
106 def del_files(self, dirs):
107 '''delete all files in the given directory'''
109 self.run_cmd("find %s -type f | xargs rm -f" % d)
111 def write_file(self, filename, text, mode='w'):
112 '''write to a file'''
113 f = open(self.substitute(filename), mode=mode)
114 f.write(self.substitute(text))
117 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
118 cmd = self.substitute(cmd)
119 if isinstance(cmd, list):
120 self.info('$ ' + " ".join(cmd))
122 self.info('$ ' + cmd)
124 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
125 if isinstance(cmd, list):
130 return subprocess.check_call(cmd, shell=shell, cwd=dir)
132 return subprocess.call(cmd, shell=shell, cwd=dir)
134 def run_child(self, cmd, dir="."):
136 cmd = self.substitute(cmd)
137 if isinstance(cmd, list):
138 self.info('$ ' + " ".join(cmd))
140 self.info('$ ' + cmd)
141 if isinstance(cmd, list):
146 ret = subprocess.Popen(cmd, shell=shell)
150 def cmd_output(self, cmd):
151 '''return output from and command'''
152 cmd = self.substitute(cmd)
153 return self.run_cmd(cmd, output=True)
155 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
157 '''check that command output contains the listed strings'''
159 if isinstance(contains, str):
160 contains = [contains]
162 out = self.cmd_output(cmd)
164 for c in self.substitute(contains):
166 m = re.search(c, out)
174 start = out.upper().find(c.upper())
181 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
184 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
185 if ordered and start != -1:
188 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
189 ordered=False, regex=False, casefold=False):
190 '''retry a command a number of times'''
193 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
194 ordered=ordered, regex=regex, casefold=casefold)
198 retries = retries - 1
199 raise RuntimeError("Failed to find %s" % contains)
201 def pexpect_spawn(self, cmd, timeout=60):
202 '''wrapper around pexpect spawn'''
203 cmd = self.substitute(cmd)
204 self.info("$ " + cmd)
205 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
207 def sendline_sub(line):
208 line = self.substitute(line).replace('\n', '\r\n')
209 return ret.old_sendline(line + '\r')
211 def expect_sub(line, timeout=ret.timeout):
212 line = self.substitute(line)
213 return ret.old_expect(line, timeout=timeout)
215 ret.old_sendline = ret.sendline
216 ret.sendline = sendline_sub
217 ret.old_expect = ret.expect
218 ret.expect = expect_sub
222 def get_nameserver(self):
223 '''Get the current nameserver from /etc/resolv.conf'''
224 child = self.pexpect_spawn('cat /etc/resolv.conf')
225 child.expect('nameserver')
226 child.expect('\d+.\d+.\d+.\d+')
229 def vm_poweroff(self, vmname, checkfail=True):
231 self.setvar('VMNAME', vmname)
232 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
234 def vm_restore(self, vmname, snapshot):
236 self.setvar('VMNAME', vmname)
237 self.setvar('SNAPSHOT', snapshot)
238 self.run_cmd("${VM_RESTORE}")
240 def ping_wait(self, hostname):
241 '''wait for a hostname to come up on the network'''
242 hostname = self.substitute(hostname)
246 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
251 raise RuntimeError("Failed to ping %s" % hostname)
252 self.info("Host %s is up" % hostname)
254 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
255 '''wait for a host to come up on the network'''
256 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
257 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
259 def run_net_time(self, child):
260 '''run net time on windows'''
261 child.sendline("net time \\\\${HOSTNAME} /set")
262 child.expect("Do you want to set the local computer")
264 child.expect("The command completed successfully")
266 def run_date_time(self, child, time_tuple=None):
267 '''run date and time on windows'''
268 if time_tuple is None:
269 time_tuple = time.localtime()
270 child.sendline("date")
271 child.expect("Enter the new date:")
272 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
274 child.sendline(time.strftime("%d-%m-%y", time_tuple))
276 child.sendline(time.strftime("%m-%d-%y", time_tuple))
278 child.sendline("time")
279 child.expect("Enter the new time:")
280 child.sendline(time.strftime("%H:%M:%S", time_tuple))
283 def get_ipconfig(self, child):
284 '''get the IP configuration of the child'''
285 child.sendline("ipconfig /all")
286 child.expect('Ethernet adapter ')
287 child.expect("[\w\s]+")
288 self.setvar("WIN_NIC", child.after)
289 child.expect(['IPv4 Address', 'IP Address'])
290 child.expect('\d+.\d+.\d+.\d+')
291 self.setvar('WIN_IPV4_ADDRESS', child.after)
292 child.expect('Subnet Mask')
293 child.expect('\d+.\d+.\d+.\d+')
294 self.setvar('WIN_SUBNET_MASK', child.after)
295 child.expect('Default Gateway')
296 child.expect('\d+.\d+.\d+.\d+')
297 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
300 def run_tlntadmn(self, child):
301 '''remove the annoying telnet restrictions'''
302 child.sendline('tlntadmn config maxconn=1024')
303 child.expect("The settings were successfully updated")
306 def disable_firewall(self, child):
307 '''remove the annoying firewall'''
308 child.sendline('netsh advfirewall set allprofiles state off')
309 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
312 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
316 def set_dns(self, child):
317 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
318 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
324 def set_ip(self, child):
325 """fix the IP address to the same value it had when we
326 connected, but don't use DHCP, and force the DNS server to our
327 DNS server. This allows DNS updates to run"""
328 self.get_ipconfig(child)
329 child.sendline('netsh')
330 child.expect('netsh>')
331 child.sendline('offline')
332 child.expect('netsh>')
333 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
334 child.expect('netsh>')
335 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
336 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
338 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
339 child.expect('netsh>')
340 child.sendline('commit')
341 child.sendline('online')
342 child.sendline('exit')
344 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
347 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True, run_tlntadmn=True):
348 '''open a telnet connection to a windows server, return the pexpect child'''
352 child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
353 i = child.expect(["Welcome to Microsoft Telnet Service",
354 "Denying new connections due to the limit on number of connections",
355 "No more connections are allowed to telnet server",
356 "Unable to connect to remote host",
358 "Connection refused",
365 child.expect("password:")
366 child.sendline(password)
367 i = child.expect(["C:",
368 "Denying new connections due to the limit on number of connections",
369 "No more connections are allowed to telnet server",
370 "Unable to connect to remote host",
372 "Connection refused",
381 if self.set_dns(child):
384 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
388 self.run_date_time(child, None)
391 self.run_tlntadmn(child)
394 self.disable_firewall(child)
395 disable_firewall = False
398 if self.set_ip(child):
403 raise RuntimeError("Failed to connect with telnet")
405 def kinit(self, username, password):
406 '''use kinit to setup a credentials cache'''
407 self.run_cmd("kdestroy")
408 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
409 username = self.substitute(username)
410 s = username.split('@')
413 username = '@'.join(s)
414 child = self.pexpect_spawn('kinit -V ' + username)
415 child.expect("Password for")
416 child.sendline(password)
417 child.expect("Authenticated to Kerberos")