3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
9 '''testing of Samba against windows VMs'''
13 self.list_mode = False
14 os.putenv('PYTHONUNBUFFERED', '1')
16 def setvar(self, varname, value):
17 '''set a substitution variable'''
18 self.vars[varname] = value
20 def setwinvars(self, vm, prefix='WIN'):
21 '''setup WIN_XX vars based on a vm name'''
22 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']:
23 vname = '%s_%s' % (vm, v)
24 if vname in self.vars:
25 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
27 self.vars.pop("%s_%s" % (prefix,v), None)
30 '''print some information'''
31 if not self.list_mode:
32 print(self.substitute(msg))
34 def load_config(self, fname):
35 '''load the config file'''
39 if len(line) == 0 or line[0] == '#':
41 colon = line.find(':')
43 raise RuntimeError("Invalid config line '%s'" % line)
44 varname = line[0:colon].strip()
45 value = line[colon+1:].strip()
46 self.setvar(varname, value)
48 def list_steps_mode(self):
49 '''put wintest in step listing mode'''
52 def set_skip(self, skiplist):
53 '''set a list of tests to skip'''
54 self.skiplist = skiplist.split(',')
57 '''return True if we should skip a step'''
61 return step in self.skiplist
63 def substitute(self, text):
64 """Substitute strings of the form ${NAME} in text, replacing
65 with substitutions from vars.
67 if isinstance(text, list):
69 for i in range(len(ret)):
70 ret[i] = self.substitute(ret[i])
74 var_start = text.find("${")
77 var_end = text.find("}", var_start)
80 var_name = text[var_start+2:var_end]
81 if not var_name in self.vars:
82 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
83 text = text.replace("${%s}" % var_name, self.vars[var_name])
86 def have_var(self, varname):
87 '''see if a variable has been set'''
88 return varname in self.vars
91 def putenv(self, key, value):
92 '''putenv with substitution'''
93 os.putenv(key, self.substitute(value))
96 '''chdir with substitution'''
97 os.chdir(self.substitute(dir))
99 def del_files(self, dirs):
100 '''delete all files in the given directory'''
102 self.run_cmd("find %s -type f | xargs rm -f" % d)
104 def write_file(self, filename, text, mode='w'):
105 '''write to a file'''
106 f = open(self.substitute(filename), mode=mode)
107 f.write(self.substitute(text))
110 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
111 cmd = self.substitute(cmd)
112 if isinstance(cmd, list):
113 self.info('$ ' + " ".join(cmd))
115 self.info('$ ' + cmd)
117 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
118 if isinstance(cmd, list):
123 return subprocess.check_call(cmd, shell=shell, cwd=dir)
125 return subprocess.call(cmd, shell=shell, cwd=dir)
128 def cmd_output(self, cmd):
129 '''return output from and command'''
130 cmd = self.substitute(cmd)
131 return self.run_cmd(cmd, output=True)
133 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
135 '''check that command output contains the listed strings'''
137 if isinstance(contains, str):
138 contains = [contains]
140 out = self.cmd_output(cmd)
142 for c in self.substitute(contains):
144 m = re.search(c, out)
152 start = out.upper().find(c.upper())
159 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
162 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
163 if ordered and start != -1:
166 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
167 ordered=False, regex=False, casefold=False):
168 '''retry a command a number of times'''
171 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
172 ordered=ordered, regex=regex, casefold=casefold)
176 retries = retries - 1
177 raise RuntimeError("Failed to find %s" % contains)
179 def pexpect_spawn(self, cmd, timeout=60):
180 '''wrapper around pexpect spawn'''
181 cmd = self.substitute(cmd)
182 self.info("$ " + cmd)
183 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
185 def sendline_sub(line):
186 line = self.substitute(line).replace('\n', '\r\n')
187 return ret.old_sendline(line + '\r')
189 def expect_sub(line, timeout=ret.timeout):
190 line = self.substitute(line)
191 return ret.old_expect(line, timeout=timeout)
193 ret.old_sendline = ret.sendline
194 ret.sendline = sendline_sub
195 ret.old_expect = ret.expect
196 ret.expect = expect_sub
200 def vm_poweroff(self, vmname, checkfail=True):
202 self.setvar('VMNAME', vmname)
203 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
205 def vm_restore(self, vmname, snapshot):
207 self.setvar('VMNAME', vmname)
208 self.setvar('SNAPSHOT', snapshot)
209 self.run_cmd("${VM_RESTORE}")
211 def ping_wait(self, hostname):
212 '''wait for a hostname to come up on the network'''
213 hostname = self.substitute(hostname)
217 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
222 raise RuntimeError("Failed to ping %s" % hostname)
223 self.info("Host %s is up" % hostname)
225 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
226 '''wait for a host to come up on the network'''
227 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
228 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
230 def run_net_time(self, child):
231 '''run net time on windows'''
232 child.sendline("net time \\\\${HOSTNAME} /set")
233 child.expect("Do you want to set the local computer")
235 child.expect("The command completed successfully")
237 def run_date_time(self, child, time_tuple=None):
238 '''run date and time on windows'''
239 if time_tuple is None:
240 time_tuple = time.localtime()
241 child.sendline("date")
242 child.expect("Enter the new date:")
243 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
245 child.sendline(time.strftime("%d-%m-%y", time_tuple))
247 child.sendline(time.strftime("%m-%d-%y", time_tuple))
249 child.sendline("time")
250 child.expect("Enter the new time:")
251 child.sendline(time.strftime("%H:%M:%S", time_tuple))
254 def get_ipconfig(self, child):
255 '''get the IP configuration of the child'''
256 child.sendline("ipconfig")
257 child.expect('Ethernet adapter ')
258 child.expect("[\w\s]+")
259 self.setvar("WIN_NIC", child.after)
260 child.expect(['IPv4 Address', 'IP Address'])
261 child.expect('\d+.\d+.\d+.\d+')
262 self.setvar('WIN_IPV4_ADDRESS', child.after)
263 child.expect('Subnet Mask')
264 child.expect('\d+.\d+.\d+.\d+')
265 self.setvar('WIN_SUBNET_MASK', child.after)
266 child.expect('Default Gateway')
267 child.expect('\d+.\d+.\d+.\d+')
268 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
270 def disable_firewall(self, child):
271 '''remove the annoying firewall'''
272 child.sendline('netsh advfirewall set allprofiles state off')
273 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
276 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
280 def set_ip(self, child):
281 '''fix the IP address to the same value it had when we
282 connected, but don't use DHCP, and force the DNS server to our
283 DNS server. This allows DNS updates to run'''
284 self.get_ipconfig(child)
285 child.sendline('netsh')
286 child.sendline('offline')
287 child.sendline('interface ip set dns "${WIN_NIC}" static ${DNSSERVER} primary')
288 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
289 i = child.expect(["The syntax supplied for this command is not valid. Check help for the correct syntax", pexpect.EOF, pexpect.TIMEOUT], timeout=5)
291 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
292 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
293 child.sendline('online')
294 child.sendline('commit')
295 child.sendline('exit')
297 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
299 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True):
300 '''open a telnet connection to a windows server, return the pexpect child'''
303 child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
304 i = child.expect(["Welcome to Microsoft Telnet Service",
305 "Denying new connections due to the limit on number of connections",
306 "No more connections are allowed to telnet server",
307 "Unable to connect to remote host",
309 "Connection refused"])
315 child.expect("password:")
316 child.sendline(password)
319 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
322 self.run_date_time(child, None)
324 self.disable_firewall(child)
332 raise RuntimeError("Failed to connect with telnet")
334 def kinit(self, username, password):
335 '''use kinit to setup a credentials cache'''
336 self.run_cmd("kdestroy")
337 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
338 username = self.substitute(username)
339 s = username.split('@')
342 username = '@'.join(s)
343 child = self.pexpect_spawn('kinit -V ' + username)
344 child.expect("Password for")
345 child.sendline(password)
346 child.expect("Authenticated to Kerberos")