PEP8: fix E302: expected 2 blank lines, found 1
[samba.git] / python / samba / tests / samba_tool / computer.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
4 #
5 # based on group.py:
6 # Copyright (C) Michael Adam 2012
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 import os
23 import ldb
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
28
29
30 class ComputerCmdTestCase(SambaToolCmdTest):
31     """Tests for samba-tool computer subcommands"""
32     computers = []
33     samdb = None
34
35     def setUp(self):
36         super(ComputerCmdTestCase, self).setUp()
37         self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
38         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
39         # ips used to test --ip-address option
40         self.ipv4 = '10.10.10.10'
41         self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
42         data = [
43             {
44                 'name': 'testcomputer1',
45                 'ip_address_list': [self.ipv4]
46             },
47             {
48                 'name': 'testcomputer2',
49                 'ip_address_list': [self.ipv6],
50                 'service_principal_name_list': ['SPN0']
51             },
52             {
53                 'name': 'testcomputer3$',
54                 'ip_address_list': [self.ipv4, self.ipv6],
55                 'service_principal_name_list': ['SPN0', 'SPN1']
56             },
57             {
58                 'name': 'testcomputer4$',
59             },
60         ]
61         self.computers = [self._randomComputer(base=item) for item in data]
62
63         # setup the 4 computers and ensure they are correct
64         for computer in self.computers:
65             (result, out, err) = self._create_computer(computer)
66
67             self.assertCmdSuccess(result, out, err)
68             self.assertEquals(err, "", "There shouldn't be any error message")
69             self.assertIn("Computer '%s' created successfully" %
70                           computer["name"], out)
71
72             found = self._find_computer(computer["name"])
73
74             self.assertIsNotNone(found)
75
76             expectedname = computer["name"].rstrip('$')
77             expectedsamaccountname = computer["name"]
78             if not computer["name"].endswith('$'):
79                 expectedsamaccountname = "%s$" % computer["name"]
80             self.assertEquals("%s" % found.get("name"), expectedname)
81             self.assertEquals("%s" % found.get("sAMAccountName"),
82                               expectedsamaccountname)
83             self.assertEquals("%s" % found.get("description"),
84                               computer["description"])
85
86
87     def tearDown(self):
88         super(ComputerCmdTestCase, self).tearDown()
89         # clean up all the left over computers, just in case
90         for computer in self.computers:
91             if self._find_computer(computer["name"]):
92                 (result, out, err) = self.runsubcmd("computer", "delete",
93                                                     "%s" % computer["name"])
94                 self.assertCmdSuccess(result, out, err,
95                                       "Failed to delete computer '%s'" %
96                                       computer["name"])
97
98     def test_newcomputer_with_service_principal_name(self):
99         # Each computer should have correct servicePrincipalName as provided.
100         for computer in self.computers:
101             expected_names = computer.get('service_principal_name_list', [])
102             found = self._find_service_principal_name(computer['name'], expected_names)
103             self.assertTrue(found)
104
105     def test_newcomputer_with_dns_records(self):
106
107         # Each computer should have correct DNS record and ip address.
108         for computer in self.computers:
109             for ip_address in computer.get('ip_address_list', []):
110                 found = self._find_dns_record(computer['name'], ip_address)
111                 self.assertTrue(found)
112
113         # try to delete all the computers we just created
114         for computer in self.computers:
115             (result, out, err) = self.runsubcmd("computer", "delete",
116                                                 "%s" % computer["name"])
117             self.assertCmdSuccess(result, out, err,
118                                   "Failed to delete computer '%s'" %
119                                   computer["name"])
120             found = self._find_computer(computer["name"])
121             self.assertIsNone(found,
122                               "Deleted computer '%s' still exists" %
123                               computer["name"])
124
125         # all DNS records should be gone
126         for computer in self.computers:
127             for ip_address in computer.get('ip_address_list', []):
128                 found = self._find_dns_record(computer['name'], ip_address)
129                 self.assertFalse(found)
130
131     def test_newcomputer(self):
132         """This tests the "computer create" and "computer delete" commands"""
133         # try to create all the computers again, this should fail
134         for computer in self.computers:
135             (result, out, err) = self._create_computer(computer)
136             self.assertCmdFail(result, "Succeeded to create existing computer")
137             self.assertIn("already exists", err)
138
139
140         # try to delete all the computers we just created
141         for computer in self.computers:
142             (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
143                                                 computer["name"])
144             self.assertCmdSuccess(result, out, err,
145                                   "Failed to delete computer '%s'" %
146                                   computer["name"])
147             found = self._find_computer(computer["name"])
148             self.assertIsNone(found,
149                               "Deleted computer '%s' still exists" %
150                               computer["name"])
151
152         # test creating computers
153         for computer in self.computers:
154             (result, out, err) = self.runsubcmd(
155                 "computer", "create", "%s" % computer["name"],
156                 "--description=%s" % computer["description"])
157
158             self.assertCmdSuccess(result, out, err)
159             self.assertEquals(err, "", "There shouldn't be any error message")
160             self.assertIn("Computer '%s' created successfully" %
161                           computer["name"], out)
162
163             found = self._find_computer(computer["name"])
164
165             expectedname = computer["name"].rstrip('$')
166             expectedsamaccountname = computer["name"]
167             if not computer["name"].endswith('$'):
168                 expectedsamaccountname = "%s$" % computer["name"]
169             self.assertEquals("%s" % found.get("name"), expectedname)
170             self.assertEquals("%s" % found.get("sAMAccountName"),
171                               expectedsamaccountname)
172             self.assertEquals("%s" % found.get("description"),
173                               computer["description"])
174
175     def test_list(self):
176         (result, out, err) = self.runsubcmd("computer", "list")
177         self.assertCmdSuccess(result, out, err, "Error running list")
178
179         search_filter = ("(sAMAccountType=%u)" %
180                          dsdb.ATYPE_WORKSTATION_TRUST)
181
182         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
183                                          scope=ldb.SCOPE_SUBTREE,
184                                          expression=search_filter,
185                                          attrs=["samaccountname"])
186
187         self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
188
189         for computerobj in computerlist:
190             name = computerobj.get("samaccountname", idx=0)
191             found = self.assertMatch(out, name,
192                                      "computer '%s' not found" % name)
193
194     def test_move(self):
195         parentou = self._randomOU({"name": "parentOU"})
196         (result, out, err) = self._create_ou(parentou)
197         self.assertCmdSuccess(result, out, err)
198
199         for computer in self.computers:
200             olddn = self._find_computer(computer["name"]).get("dn")
201
202             (result, out, err) = self.runsubcmd("computer", "move",
203                                                 "%s" % computer["name"],
204                                                 "OU=%s" % parentou["name"])
205             self.assertCmdSuccess(result, out, err,
206                                   "Failed to move computer '%s'" %
207                                   computer["name"])
208             self.assertEquals(err, "", "There shouldn't be any error message")
209             self.assertIn('Moved computer "%s"' % computer["name"], out)
210
211             found = self._find_computer(computer["name"])
212             self.assertNotEquals(found.get("dn"), olddn,
213                                  ("Moved computer '%s' still exists with the "
214                                   "same dn" % computer["name"]))
215             computername = computer["name"].rstrip('$')
216             newexpecteddn = ldb.Dn(self.samdb,
217                                    "CN=%s,OU=%s,%s" %
218                                    (computername, parentou["name"],
219                                     self.samdb.domain_dn()))
220             self.assertEquals(found.get("dn"), newexpecteddn,
221                               "Moved computer '%s' does not exist" %
222                               computer["name"])
223
224             (result, out, err) = self.runsubcmd("computer", "move",
225                                                 "%s" % computer["name"],
226                                                 "%s" % olddn.parent())
227             self.assertCmdSuccess(result, out, err,
228                                   "Failed to move computer '%s'" %
229                                   computer["name"])
230
231         (result, out, err) = self.runsubcmd("ou", "delete",
232                                             "OU=%s" % parentou["name"])
233         self.assertCmdSuccess(result, out, err,
234                               "Failed to delete ou '%s'" % parentou["name"])
235
236     def _randomComputer(self, base={}):
237         """create a computer with random attribute values, you can specify base
238         attributes"""
239
240         computer = {
241             "name": self.randomName(),
242             "description": self.randomName(count=100),
243         }
244         computer.update(base)
245         return computer
246
247     def _randomOU(self, base={}):
248         """create an ou with random attribute values, you can specify base
249         attributes"""
250
251         ou = {
252             "name": self.randomName(),
253             "description": self.randomName(count=100),
254         }
255         ou.update(base)
256         return ou
257
258     def _create_computer(self, computer):
259         args = '{} {} --description={}'.format(
260             computer['name'], self.creds, computer["description"])
261
262         for ip_address in computer.get('ip_address_list', []):
263             args += ' --ip-address={}'.format(ip_address)
264
265         for service_principal_name in computer.get('service_principal_name_list', []):
266             args += ' --service-principal-name={}'.format(service_principal_name)
267
268         args = args.split()
269
270         return self.runsubcmd('computer', 'create', *args)
271
272     def _create_ou(self, ou):
273         return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
274                               "--description=%s" % ou["description"])
275
276     def _find_computer(self, name):
277         samaccountname = name
278         if not name.endswith('$'):
279             samaccountname = "%s$" % name
280         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
281                          (ldb.binary_encode(samaccountname),
282                           "CN=Computer,CN=Schema,CN=Configuration",
283                           self.samdb.domain_dn()))
284         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
285                                          scope=ldb.SCOPE_SUBTREE,
286                                          expression=search_filter, attrs=[])
287         if computerlist:
288             return computerlist[0]
289         else:
290             return None
291
292     def _find_dns_record(self, name, ip_address):
293         name = name.rstrip('$')  # computername
294         records = self.samdb.search(
295             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
296             scope=ldb.SCOPE_SUBTREE,
297             expression="(&(objectClass=dnsNode)(name={}))".format(name),
298             attrs=['dnsRecord', 'dNSTombstoned'])
299
300         # unpack data and compare
301         for record in records:
302             if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
303                 # if a record is dNSTombstoned, ignore it.
304                 continue
305             for dns_record_bin in record['dnsRecord']:
306                 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
307                 ip = str(dns_record_obj.data)
308
309                 if str(ip) == str(ip_address):
310                     return True
311
312         return False
313
314     def _find_service_principal_name(self, name, expected_service_principal_names):
315         """Find all servicePrincipalName values and compare with expected_service_principal_names"""
316         samaccountname = name.strip('$') + '$'
317         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
318                          (ldb.binary_encode(samaccountname),
319                           "CN=Computer,CN=Schema,CN=Configuration",
320                           self.samdb.domain_dn()))
321         computer_list = self.samdb.search(
322             base=self.samdb.domain_dn(),
323             scope=ldb.SCOPE_SUBTREE,
324             expression=search_filter,
325             attrs=['servicePrincipalName'])
326         names = set()
327         for computer in computer_list:
328             for name in computer.get('servicePrincipalName', []):
329                 names.add(name)
330         return names == set(expected_service_principal_names)