27312d20ab13fe38a88ab0f34037e43cf6cf58e7
[kai/samba.git] / wintest / wintest.py
1 #!/usr/bin/env python
2
3 '''automated testing library for testing Samba against windows'''
4
5 import pexpect, subprocess
6 import sys, os, time, re
7
8 class wintest():
9     '''testing of Samba against windows VMs'''
10
11     def __init__(self):
12         self.vars = {}
13         self.list_mode = False
14         os.putenv('PYTHONUNBUFFERED', '1')
15
16     def setvar(self, varname, value):
17         '''set a substitution variable'''
18         self.vars[varname] = value
19
20     def setwinvars(self, vm, prefix='WIN'):
21         '''setup WIN_XX vars based on a vm name'''
22         for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN']:
23             vname = '%s_%s' % (vm, v)
24             if vname in self.vars:
25                 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
26             else:
27                 self.vars.pop("%s_%s" % (prefix,v), None)
28
29     def info(self, msg):
30         '''print some information'''
31         if not self.list_mode:
32             print(self.substitute(msg))
33
34     def load_config(self, fname):
35         '''load the config file'''
36         f = open(fname)
37         for line in f:
38             line = line.strip()
39             if len(line) == 0 or line[0] == '#':
40                 continue
41             colon = line.find(':')
42             if colon == -1:
43                 raise RuntimeError("Invalid config line '%s'" % line)
44             varname = line[0:colon].strip()
45             value   = line[colon+1:].strip()
46             self.setvar(varname, value)
47
48     def list_steps_mode(self):
49         '''put wintest in step listing mode'''
50         self.list_mode = True
51
52     def set_skip(self, skiplist):
53         '''set a list of tests to skip'''
54         self.skiplist = skiplist.split(',')
55
56     def skip(self, step):
57         '''return True if we should skip a step'''
58         if self.list_mode:
59             print("\t%s" % step)
60             return True
61         return step in self.skiplist
62
63     def substitute(self, text):
64         """Substitute strings of the form ${NAME} in text, replacing
65         with substitutions from vars.
66         """
67         if isinstance(text, list):
68             ret = text[:]
69             for i in range(len(ret)):
70                 ret[i] = self.substitute(ret[i])
71             return ret
72
73         """We may have objects such as pexpect.EOF that are not strings"""
74         if not isinstance(text, str):
75             return text
76         while True:
77             var_start = text.find("${")
78             if var_start == -1:
79                 return text
80             var_end = text.find("}", var_start)
81             if var_end == -1:
82                 return text
83             var_name = text[var_start+2:var_end]
84             if not var_name in self.vars:
85                 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
86             text = text.replace("${%s}" % var_name, self.vars[var_name])
87         return text
88
89     def have_var(self, varname):
90         '''see if a variable has been set'''
91         return varname in self.vars
92
93
94     def putenv(self, key, value):
95         '''putenv with substitution'''
96         os.putenv(key, self.substitute(value))
97
98     def chdir(self, dir):
99         '''chdir with substitution'''
100         os.chdir(self.substitute(dir))
101
102     def del_files(self, dirs):
103         '''delete all files in the given directory'''
104         for d in dirs:
105             self.run_cmd("find %s -type f | xargs rm -f" % d)
106
107     def write_file(self, filename, text, mode='w'):
108         '''write to a file'''
109         f = open(self.substitute(filename), mode=mode)
110         f.write(self.substitute(text))
111         f.close()
112
113     def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
114         cmd = self.substitute(cmd)
115         if isinstance(cmd, list):
116             self.info('$ ' + " ".join(cmd))
117         else:
118             self.info('$ ' + cmd)
119         if output:
120             return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
121         if isinstance(cmd, list):
122             shell=False
123         else:
124             shell=True
125         if checkfail:
126             return subprocess.check_call(cmd, shell=shell, cwd=dir)
127         else:
128             return subprocess.call(cmd, shell=shell, cwd=dir)
129
130
131     def cmd_output(self, cmd):
132         '''return output from and command'''
133         cmd = self.substitute(cmd)
134         return self.run_cmd(cmd, output=True)
135
136     def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
137                      casefold=False):
138         '''check that command output contains the listed strings'''
139
140         if isinstance(contains, str):
141             contains = [contains]
142
143         out = self.cmd_output(cmd)
144         self.info(out)
145         for c in self.substitute(contains):
146             if regex:
147                 m = re.search(c, out)
148                 if m is None:
149                     start = -1
150                     end = -1
151                 else:
152                     start = m.start()
153                     end = m.end()
154             elif casefold:
155                 start = out.upper().find(c.upper())
156                 end = start + len(c)
157             else:
158                 start = out.find(c)
159                 end = start + len(c)
160             if nomatch:
161                 if start != -1:
162                     raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
163             else:
164                 if start == -1:
165                     raise RuntimeError("Expected to see %s in %s" % (c, cmd))
166             if ordered and start != -1:
167                 out = out[end:]
168
169     def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
170                   ordered=False, regex=False, casefold=False):
171         '''retry a command a number of times'''
172         while retries > 0:
173             try:
174                 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
175                                   ordered=ordered, regex=regex, casefold=casefold)
176                 return
177             except:
178                 time.sleep(delay)
179                 retries = retries - 1
180         raise RuntimeError("Failed to find %s" % contains)
181
182     def pexpect_spawn(self, cmd, timeout=60):
183         '''wrapper around pexpect spawn'''
184         cmd = self.substitute(cmd)
185         self.info("$ " + cmd)
186         ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
187
188         def sendline_sub(line):
189             line = self.substitute(line).replace('\n', '\r\n')
190             return ret.old_sendline(line + '\r')
191
192         def expect_sub(line, timeout=ret.timeout):
193             line = self.substitute(line)
194             return ret.old_expect(line, timeout=timeout)
195
196         ret.old_sendline = ret.sendline
197         ret.sendline = sendline_sub
198         ret.old_expect = ret.expect
199         ret.expect = expect_sub
200
201         return ret
202
203     def vm_poweroff(self, vmname, checkfail=True):
204         '''power off a VM'''
205         self.setvar('VMNAME', vmname)
206         self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
207
208     def vm_restore(self, vmname, snapshot):
209         '''restore a VM'''
210         self.setvar('VMNAME', vmname)
211         self.setvar('SNAPSHOT', snapshot)
212         self.run_cmd("${VM_RESTORE}")
213
214     def ping_wait(self, hostname):
215         '''wait for a hostname to come up on the network'''
216         hostname = self.substitute(hostname)
217         loops=10
218         while loops > 0:
219             try:
220                 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
221                 break
222             except:
223                 loops = loops - 1
224         if loops == 0:
225             raise RuntimeError("Failed to ping %s" % hostname)
226         self.info("Host %s is up" % hostname)
227
228     def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
229         '''wait for a host to come up on the network'''
230         self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
231                        retries=retries, delay=delay, wait_for_fail=wait_for_fail)
232
233     def run_net_time(self, child):
234         '''run net time on windows'''
235         child.sendline("net time \\\\${HOSTNAME} /set")
236         child.expect("Do you want to set the local computer")
237         child.sendline("Y")
238         child.expect("The command completed successfully")
239
240     def run_date_time(self, child, time_tuple=None):
241         '''run date and time on windows'''
242         if time_tuple is None:
243             time_tuple = time.localtime()
244         child.sendline("date")
245         child.expect("Enter the new date:")
246         i = child.expect(["dd-mm-yy", "mm-dd-yy"])
247         if i == 0:
248             child.sendline(time.strftime("%d-%m-%y", time_tuple))
249         else:
250             child.sendline(time.strftime("%m-%d-%y", time_tuple))
251         child.expect("C:")
252         child.sendline("time")
253         child.expect("Enter the new time:")
254         child.sendline(time.strftime("%H:%M:%S", time_tuple))
255         child.expect("C:")
256
257     def get_ipconfig(self, child):
258         '''get the IP configuration of the child'''
259         child.sendline("ipconfig /all")
260         child.expect('Ethernet adapter ')
261         child.expect("[\w\s]+")
262         self.setvar("WIN_NIC", child.after)
263         child.expect(['DHCP Enabled', 'Dhcp Enabled'])
264         i = child.expect(['Yes', 'No'])
265         if i == 0:
266             self.setvar("WIN_DHCP", True)
267         else:
268             self.setvar("WIN_DHCP", False)
269         child.expect(['IPv4 Address', 'IP Address'])
270         child.expect('\d+.\d+.\d+.\d+')
271         self.setvar('WIN_IPV4_ADDRESS', child.after)
272         child.expect('Subnet Mask')
273         child.expect('\d+.\d+.\d+.\d+')
274         self.setvar('WIN_SUBNET_MASK', child.after)
275         child.expect('Default Gateway')
276         child.expect('\d+.\d+.\d+.\d+')
277         self.setvar('WIN_DEFAULT_GATEWAY', child.after)
278         child.expect("C:")
279
280     def run_tlntadmn(self, child):
281         '''remove the annoying telnet restrictions'''
282         child.sendline('tlntadmn config maxconn=1024')
283         child.expect("The settings were successfully updated")
284         child.expect("C:")
285
286     def disable_firewall(self, child):
287         '''remove the annoying firewall'''
288         child.sendline('netsh advfirewall set allprofiles state off')
289         i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
290         child.expect("C:")
291         if i == 1:
292             child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
293             child.expect("Ok")
294             child.expect("C:")
295  
296     def set_dns(self, child):
297         child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${DNSSERVER} primary')
298         i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
299         if i > 0:
300             return True
301         else:
302             return False
303
304     def set_ip(self, child):
305         '''fix the IP address to the same value it had when we
306         connected, but don't use DHCP, and force the DNS server to our
307         DNS server.  This allows DNS updates to run'''
308         self.get_ipconfig(child)
309         if self.vars['WIN_DHCP'] is False:
310             return False
311         child.sendline('netsh')
312         child.expect('netsh>')
313         child.sendline('offline')
314         child.expect('netsh>')
315         child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
316         child.expect('netsh>')
317         child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
318         i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
319         if i == 0:
320             child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
321             child.expect('netsh>')
322         child.sendline('commit')
323         child.sendline('online')
324         child.sendline('exit')
325
326         child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
327         return True
328         
329     def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False, disable_firewall=True, run_tlntadmn=True):
330         '''open a telnet connection to a windows server, return the pexpect child'''
331         set_route = False
332         set_dns = False
333         while retries > 0:
334             child = self.pexpect_spawn("telnet " + hostname + " -l '" + username + "'")
335             i = child.expect(["Welcome to Microsoft Telnet Service",
336                               "Denying new connections due to the limit on number of connections",
337                               "No more connections are allowed to telnet server",
338                               "Unable to connect to remote host",
339                               "No route to host",
340                               "Connection refused",
341                               pexpect.EOF])
342             if i != 0:
343                 child.close()
344                 time.sleep(delay)
345                 retries -= 1
346                 continue
347             child.expect("password:")
348             child.sendline(password)
349             i = child.expect(["C:",
350                               "Denying new connections due to the limit on number of connections",
351                               "No more connections are allowed to telnet server",
352                               "Unable to connect to remote host",
353                               "No route to host",
354                               "Connection refused",
355                               pexpect.EOF])
356             if i != 0:
357                 child.close()
358                 time.sleep(delay)
359                 retries -= 1
360                 continue
361             if set_dns:
362                 set_dns = False
363                 if self.set_dns(child):
364                     continue;
365             if set_route:
366                 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
367                 child.expect("C:")
368                 set_route = False
369             if set_time:
370                 self.run_date_time(child, None)
371                 set_time = False
372             if run_tlntadmn:
373                 self.run_tlntadmn(child)
374                 run_tlntadmn = False
375             if disable_firewall:
376                 self.disable_firewall(child)
377                 disable_firewall = False
378             if set_ip:
379                 set_ip = False
380                 if self.set_ip(child):
381                     set_route = True
382                     set_dns = True
383                 continue
384             return child
385         raise RuntimeError("Failed to connect with telnet")
386
387     def kinit(self, username, password):
388         '''use kinit to setup a credentials cache'''
389         self.run_cmd("kdestroy")
390         self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
391         username = self.substitute(username)
392         s = username.split('@')
393         if len(s) > 0:
394             s[1] = s[1].upper()
395         username = '@'.join(s)
396         child = self.pexpect_spawn('kinit -V ' + username)
397         child.expect("Password for")
398         child.sendline(password)
399         child.expect("Authenticated to Kerberos")