wintest Allow substitute to cope with objects like pexpect.EOF
[kai/samba-autobuild/.git] / wintest / wintest.py
1 #!/usr/bin/env python
2
3 '''automated testing library for testing Samba against windows'''
4
5 import pexpect, subprocess
6 import sys, os, time, re
7
8 class wintest():
9     '''testing of Samba against windows VMs'''
10
11     def __init__(self):
12         self.vars = {}
13         self.list_mode = False
14         os.putenv('PYTHONUNBUFFERED', '1')
15
16     def setvar(self, varname, value):
17         '''set a substitution variable'''
18         self.vars[varname] = value
19
20     def setwinvars(self, vm, prefix='WIN'):
21         '''setup WIN_XX vars based on a vm name'''
22         for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']:
23             vname = '%s_%s' % (vm, v)
24             if vname in self.vars:
25                 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
26             else:
27                 self.vars.pop("%s_%s" % (prefix,v), None)
28
29     def info(self, msg):
30         '''print some information'''
31         if not self.list_mode:
32             print(self.substitute(msg))
33
34     def load_config(self, fname):
35         '''load the config file'''
36         f = open(fname)
37         for line in f:
38             line = line.strip()
39             if len(line) == 0 or line[0] == '#':
40                 continue
41             colon = line.find(':')
42             if colon == -1:
43                 raise RuntimeError("Invalid config line '%s'" % line)
44             varname = line[0:colon].strip()
45             value   = line[colon+1:].strip()
46             self.setvar(varname, value)
47
48     def list_steps_mode(self):
49         '''put wintest in step listing mode'''
50         self.list_mode = True
51
52     def set_skip(self, skiplist):
53         '''set a list of tests to skip'''
54         self.skiplist = skiplist.split(',')
55
56     def skip(self, step):
57         '''return True if we should skip a step'''
58         if self.list_mode:
59             print("\t%s" % step)
60             return True
61         return step in self.skiplist
62
63     def substitute(self, text):
64         """Substitute strings of the form ${NAME} in text, replacing
65         with substitutions from vars.
66         """
67         if isinstance(text, list):
68             ret = text[:]
69             for i in range(len(ret)):
70                 ret[i] = self.substitute(ret[i])
71             return ret
72
73         """We may have objects such as pexpect.EOF that are not strings"""
74         if not isinstance(text, str):
75             return text
76         while True:
77             var_start = text.find("${")
78             if var_start == -1:
79                 return text
80             var_end = text.find("}", var_start)
81             if var_end == -1:
82                 return text
83             var_name = text[var_start+2:var_end]
84             if not var_name in self.vars:
85                 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
86             text = text.replace("${%s}" % var_name, self.vars[var_name])
87         return text
88
89     def have_var(self, varname):
90         '''see if a variable has been set'''
91         return varname in self.vars
92
93
94     def putenv(self, key, value):
95         '''putenv with substitution'''
96         os.putenv(key, self.substitute(value))
97
98     def chdir(self, dir):
99         '''chdir with substitution'''
100         os.chdir(self.substitute(dir))
101
102     def del_files(self, dirs):
103         '''delete all files in the given directory'''
104         for d in dirs:
105             self.run_cmd("find %s -type f | xargs rm -f" % d)
106
107     def write_file(self, filename, text, mode='w'):
108         '''write to a file'''
109         f = open(self.substitute(filename), mode=mode)
110         f.write(self.substitute(text))
111         f.close()
112
113     def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
114         cmd = self.substitute(cmd)
115         if isinstance(cmd, list):
116             self.info('$ ' + " ".join(cmd))
117         else:
118             self.info('$ ' + cmd)
119         if output:
120             return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
121         if isinstance(cmd, list):
122             shell=False
123         else:
124             shell=True
125         if checkfail:
126             return subprocess.check_call(cmd, shell=shell, cwd=dir)
127         else:
128             return subprocess.call(cmd, shell=shell, cwd=dir)
129
130
131     def cmd_output(self, cmd):
132         '''return output from and command'''
133         cmd = self.substitute(cmd)
134         return self.run_cmd(cmd, output=True)
135
136     def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
137                      casefold=False):
138         '''check that command output contains the listed strings'''
139
140         if isinstance(contains, str):
141             contains = [contains]
142
143         out = self.cmd_output(cmd)
144         self.info(out)
145         for c in self.substitute(contains):
146             if regex:
147                 m = re.search(c, out)
148                 if m is None:
149                     start = -1
150                     end = -1
151                 else:
152                     start = m.start()
153                     end = m.end()
154             elif casefold:
155                 start = out.upper().find(c.upper())
156                 end = start + len(c)
157             else:
158                 start = out.find(c)
159                 end = start + len(c)
160             if nomatch:
161                 if start != -1:
162                     raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
163             else:
164                 if start == -1:
165                     raise RuntimeError("Expected to see %s in %s" % (c, cmd))
166             if ordered and start != -1:
167                 out = out[end:]
168
169     def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
170                   ordered=False, regex=False, casefold=False):
171         '''retry a command a number of times'''
172         while retries > 0:
173             try:
174                 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
175                                   ordered=ordered, regex=regex, casefold=casefold)
176                 return
177             except:
178                 time.sleep(delay)
179                 retries = retries - 1
180         raise RuntimeError("Failed to find %s" % contains)
181
182     def pexpect_spawn(self, cmd, timeout=60):
183         '''wrapper around pexpect spawn'''
184         cmd = self.substitute(cmd)
185         self.info("$ " + cmd)
186         ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
187
188         def sendline_sub(line):
189             line = self.substitute(line).replace('\n', '\r\n')
190             return ret.old_sendline(line + '\r')
191
192         def expect_sub(line, timeout=ret.timeout):
193             line = self.substitute(line)
194             return ret.old_expect(line, timeout=timeout)
195
196         ret.old_sendline = ret.sendline
197         ret.sendline = sendline_sub
198         ret.old_expect = ret.expect
199         ret.expect = expect_sub
200
201         return ret
202
203     def vm_poweroff(self, vmname, checkfail=True):
204         '''power off a VM'''
205         self.setvar('VMNAME', vmname)
206         self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
207
208     def vm_restore(self, vmname, snapshot):
209         '''restore a VM'''
210         self.setvar('VMNAME', vmname)
211         self.setvar('SNAPSHOT', snapshot)
212         self.run_cmd("${VM_RESTORE}")
213
214     def ping_wait(self, hostname):
215         '''wait for a hostname to come up on the network'''
216         hostname = self.substitute(hostname)
217         loops=10
218         while loops > 0:
219             try:
220                 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
221                 break
222             except:
223                 loops = loops - 1
224         if loops == 0:
225             raise RuntimeError("Failed to ping %s" % hostname)
226         self.info("Host %s is up" % hostname)
227
228     def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
229         '''wait for a host to come up on the network'''
230         self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
231                        retries=retries, delay=delay, wait_for_fail=wait_for_fail)
232
233     def run_net_time(self, child):
234         '''run net time on windows'''
235         child.sendline("net time \\\\${HOSTNAME} /set")
236         child.expect("Do you want to set the local computer")
237         child.sendline("Y")
238         child.expect("The command completed successfully")
239
240     def run_date_time(self, child, time_tuple=None):
241         '''run date and time on windows'''
242         if time_tuple is None:
243             time_tuple = time.localtime()
244         child.sendline("date")
245         child.expect("Enter the new date:")
246         i = child.expect(["dd-mm-yy", "mm-dd-yy"])
247         if i == 0:
248             child.sendline(time.strftime("%d-%m-%y", time_tuple))
249         else:
250             child.sendline(time.strftime("%m-%d-%y", time_tuple))
251         child.expect("C:")
252         child.sendline("time")
253         child.expect("Enter the new time:")
254         child.sendline(time.strftime("%H:%M:%S", time_tuple))
255         child.expect("C:")
256
257     def get_ipconfig(self, child):
258         '''get the IP configuration of the child'''
259         child.sendline("ipconfig")
260         child.expect('Ethernet adapter ')
261         child.expect("[\w\s]+")
262         self.setvar("WIN_NIC", child.after)
263         child.expect(['IPv4 Address', 'IP Address'])
264         child.expect('\d+.\d+.\d+.\d+')
265         self.setvar('WIN_IPV4_ADDRESS', child.after)
266         child.expect('Subnet Mask')
267         child.expect('\d+.\d+.\d+.\d+')
268         self.setvar('WIN_SUBNET_MASK', child.after)
269         child.expect('Default Gateway')
270         child.expect('\d+.\d+.\d+.\d+')
271         self.setvar('WIN_DEFAULT_GATEWAY', child.after)
272
273     def disable_firewall(self, child):
274         '''remove the annoying firewall'''
275         child.sendline('netsh advfirewall set allprofiles state off')
276         i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
277         child.expect("C:")
278         if i == 1:
279             child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
280             child.expect("Ok")
281             child.expect("C:")
282  
283     def set_ip(self, child):
284         '''fix the IP address to the same value it had when we
285         connected, but don't use DHCP, and force the DNS server to our
286         DNS server.  This allows DNS updates to run'''
287         self.get_ipconfig(child)
288         child.sendline('netsh')
289         child.sendline('offline')
290         child.sendline('interface ip set dns "${WIN_NIC}" static ${DNSSERVER} primary')
291         child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
292         i = child.expect(["The syntax supplied for this command is not valid. Check help for the correct syntax", pexpect.EOF, pexpect.TIMEOUT], timeout=5)
293         if i == 0:
294             child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
295         child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
296         child.sendline('online')
297         child.sendline('commit')
298         child.sendline('exit')
299
300         child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
301
302     def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True):
303         '''open a telnet connection to a windows server, return the pexpect child'''
304         set_route = False
305         while retries > 0:
306             child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
307             i = child.expect(["Welcome to Microsoft Telnet Service",
308                               "Denying new connections due to the limit on number of connections",
309                               "No more connections are allowed to telnet server",
310                               "Unable to connect to remote host",
311                               "No route to host",
312                               "Connection refused"])
313             if i != 0:
314                 child.close()
315                 time.sleep(delay)
316                 retries -= 1
317                 continue
318             child.expect("password:")
319             child.sendline(password)
320             child.expect("C:")
321             if set_route:
322                 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
323                 child.expect("C:")
324             if set_time:
325                 self.run_date_time(child, None)
326             if disable_firewall:
327                 self.disable_firewall(child)
328             if set_ip:
329                 self.set_ip(child)
330                 set_ip = False
331                 set_time = False
332                 set_route = True
333                 continue
334             return child
335         raise RuntimeError("Failed to connect with telnet")
336
337     def kinit(self, username, password):
338         '''use kinit to setup a credentials cache'''
339         self.run_cmd("kdestroy")
340         self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
341         username = self.substitute(username)
342         s = username.split('@')
343         if len(s) > 0:
344             s[1] = s[1].upper()
345         username = '@'.join(s)
346         child = self.pexpect_spawn('kinit -V ' + username)
347         child.expect("Password for")
348         child.sendline(password)
349         child.expect("Authenticated to Kerberos")