PEP8: fix E128: continuation line under-indented for visual indent
[samba.git] / python / samba / tests / samba_tool / computer.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
4 #
5 # based on group.py:
6 # Copyright (C) Michael Adam 2012
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 import os
23 import ldb
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
28
29 class ComputerCmdTestCase(SambaToolCmdTest):
30     """Tests for samba-tool computer subcommands"""
31     computers = []
32     samdb = None
33
34     def setUp(self):
35         super(ComputerCmdTestCase, self).setUp()
36         self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
37         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
38         # ips used to test --ip-address option
39         self.ipv4 = '10.10.10.10'
40         self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
41         data = [
42             {
43                 'name': 'testcomputer1',
44                 'ip_address_list': [self.ipv4]
45             },
46             {
47                 'name': 'testcomputer2',
48                 'ip_address_list': [self.ipv6],
49                 'service_principal_name_list': ['SPN0']
50             },
51             {
52                 'name': 'testcomputer3$',
53                 'ip_address_list': [self.ipv4, self.ipv6],
54                 'service_principal_name_list': ['SPN0', 'SPN1']
55             },
56             {
57                 'name': 'testcomputer4$',
58             },
59         ]
60         self.computers = [self._randomComputer(base=item) for item in data]
61
62         # setup the 4 computers and ensure they are correct
63         for computer in self.computers:
64             (result, out, err) = self._create_computer(computer)
65
66             self.assertCmdSuccess(result, out, err)
67             self.assertEquals(err, "", "There shouldn't be any error message")
68             self.assertIn("Computer '%s' created successfully" %
69                           computer["name"], out)
70
71             found = self._find_computer(computer["name"])
72
73             self.assertIsNotNone(found)
74
75             expectedname = computer["name"].rstrip('$')
76             expectedsamaccountname = computer["name"]
77             if not computer["name"].endswith('$'):
78                 expectedsamaccountname = "%s$" % computer["name"]
79             self.assertEquals("%s" % found.get("name"), expectedname)
80             self.assertEquals("%s" % found.get("sAMAccountName"),
81                               expectedsamaccountname)
82             self.assertEquals("%s" % found.get("description"),
83                               computer["description"])
84
85
86     def tearDown(self):
87         super(ComputerCmdTestCase, self).tearDown()
88         # clean up all the left over computers, just in case
89         for computer in self.computers:
90             if self._find_computer(computer["name"]):
91                 (result, out, err) = self.runsubcmd("computer", "delete",
92                                                     "%s" % computer["name"])
93                 self.assertCmdSuccess(result, out, err,
94                                       "Failed to delete computer '%s'" %
95                                       computer["name"])
96
97     def test_newcomputer_with_service_principal_name(self):
98         # Each computer should have correct servicePrincipalName as provided.
99         for computer in self.computers:
100             expected_names = computer.get('service_principal_name_list', [])
101             found = self._find_service_principal_name(computer['name'], expected_names)
102             self.assertTrue(found)
103
104     def test_newcomputer_with_dns_records(self):
105
106         # Each computer should have correct DNS record and ip address.
107         for computer in self.computers:
108             for ip_address in computer.get('ip_address_list', []):
109                 found = self._find_dns_record(computer['name'], ip_address)
110                 self.assertTrue(found)
111
112         # try to delete all the computers we just created
113         for computer in self.computers:
114             (result, out, err) = self.runsubcmd("computer", "delete",
115                                                 "%s" % computer["name"])
116             self.assertCmdSuccess(result, out, err,
117                                   "Failed to delete computer '%s'" %
118                                   computer["name"])
119             found = self._find_computer(computer["name"])
120             self.assertIsNone(found,
121                               "Deleted computer '%s' still exists" %
122                               computer["name"])
123
124         # all DNS records should be gone
125         for computer in self.computers:
126             for ip_address in computer.get('ip_address_list', []):
127                 found = self._find_dns_record(computer['name'], ip_address)
128                 self.assertFalse(found)
129
130     def test_newcomputer(self):
131         """This tests the "computer create" and "computer delete" commands"""
132         # try to create all the computers again, this should fail
133         for computer in self.computers:
134             (result, out, err) = self._create_computer(computer)
135             self.assertCmdFail(result, "Succeeded to create existing computer")
136             self.assertIn("already exists", err)
137
138
139         # try to delete all the computers we just created
140         for computer in self.computers:
141             (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
142                                                 computer["name"])
143             self.assertCmdSuccess(result, out, err,
144                                   "Failed to delete computer '%s'" %
145                                   computer["name"])
146             found = self._find_computer(computer["name"])
147             self.assertIsNone(found,
148                               "Deleted computer '%s' still exists" %
149                               computer["name"])
150
151         # test creating computers
152         for computer in self.computers:
153             (result, out, err) = self.runsubcmd(
154                 "computer", "create", "%s" % computer["name"],
155                 "--description=%s" % computer["description"])
156
157             self.assertCmdSuccess(result, out, err)
158             self.assertEquals(err, "", "There shouldn't be any error message")
159             self.assertIn("Computer '%s' created successfully" %
160                           computer["name"], out)
161
162             found = self._find_computer(computer["name"])
163
164             expectedname = computer["name"].rstrip('$')
165             expectedsamaccountname = computer["name"]
166             if not computer["name"].endswith('$'):
167                 expectedsamaccountname = "%s$" % computer["name"]
168             self.assertEquals("%s" % found.get("name"), expectedname)
169             self.assertEquals("%s" % found.get("sAMAccountName"),
170                               expectedsamaccountname)
171             self.assertEquals("%s" % found.get("description"),
172                               computer["description"])
173
174     def test_list(self):
175         (result, out, err) = self.runsubcmd("computer", "list")
176         self.assertCmdSuccess(result, out, err, "Error running list")
177
178         search_filter = ("(sAMAccountType=%u)" %
179                          dsdb.ATYPE_WORKSTATION_TRUST)
180
181         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
182                                          scope=ldb.SCOPE_SUBTREE,
183                                          expression=search_filter,
184                                          attrs=["samaccountname"])
185
186         self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
187
188         for computerobj in computerlist:
189             name = computerobj.get("samaccountname", idx=0)
190             found = self.assertMatch(out, name,
191                                      "computer '%s' not found" % name)
192
193     def test_move(self):
194         parentou = self._randomOU({"name": "parentOU"})
195         (result, out, err) = self._create_ou(parentou)
196         self.assertCmdSuccess(result, out, err)
197
198         for computer in self.computers:
199             olddn = self._find_computer(computer["name"]).get("dn")
200
201             (result, out, err) = self.runsubcmd("computer", "move",
202                                                 "%s" % computer["name"],
203                                                 "OU=%s" % parentou["name"])
204             self.assertCmdSuccess(result, out, err,
205                                   "Failed to move computer '%s'" %
206                                   computer["name"])
207             self.assertEquals(err, "", "There shouldn't be any error message")
208             self.assertIn('Moved computer "%s"' % computer["name"], out)
209
210             found = self._find_computer(computer["name"])
211             self.assertNotEquals(found.get("dn"), olddn,
212                                  ("Moved computer '%s' still exists with the "
213                                   "same dn" % computer["name"]))
214             computername = computer["name"].rstrip('$')
215             newexpecteddn = ldb.Dn(self.samdb,
216                                    "CN=%s,OU=%s,%s" %
217                                    (computername, parentou["name"],
218                                     self.samdb.domain_dn()))
219             self.assertEquals(found.get("dn"), newexpecteddn,
220                               "Moved computer '%s' does not exist" %
221                               computer["name"])
222
223             (result, out, err) = self.runsubcmd("computer", "move",
224                                                 "%s" % computer["name"],
225                                                 "%s" % olddn.parent())
226             self.assertCmdSuccess(result, out, err,
227                                   "Failed to move computer '%s'" %
228                                   computer["name"])
229
230         (result, out, err) = self.runsubcmd("ou", "delete",
231                                             "OU=%s" % parentou["name"])
232         self.assertCmdSuccess(result, out, err,
233                               "Failed to delete ou '%s'" % parentou["name"])
234
235     def _randomComputer(self, base={}):
236         """create a computer with random attribute values, you can specify base
237         attributes"""
238
239         computer = {
240             "name": self.randomName(),
241             "description": self.randomName(count=100),
242         }
243         computer.update(base)
244         return computer
245
246     def _randomOU(self, base={}):
247         """create an ou with random attribute values, you can specify base
248         attributes"""
249
250         ou = {
251             "name": self.randomName(),
252             "description": self.randomName(count=100),
253         }
254         ou.update(base)
255         return ou
256
257     def _create_computer(self, computer):
258         args = '{} {} --description={}'.format(
259             computer['name'], self.creds, computer["description"])
260
261         for ip_address in computer.get('ip_address_list', []):
262             args += ' --ip-address={}'.format(ip_address)
263
264         for service_principal_name in computer.get('service_principal_name_list', []):
265             args += ' --service-principal-name={}'.format(service_principal_name)
266
267         args = args.split()
268
269         return self.runsubcmd('computer', 'create', *args)
270
271     def _create_ou(self, ou):
272         return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
273                               "--description=%s" % ou["description"])
274
275     def _find_computer(self, name):
276         samaccountname = name
277         if not name.endswith('$'):
278             samaccountname = "%s$" % name
279         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
280                          (ldb.binary_encode(samaccountname),
281                           "CN=Computer,CN=Schema,CN=Configuration",
282                           self.samdb.domain_dn()))
283         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
284                                          scope=ldb.SCOPE_SUBTREE,
285                                          expression=search_filter, attrs=[])
286         if computerlist:
287             return computerlist[0]
288         else:
289             return None
290
291     def _find_dns_record(self, name, ip_address):
292         name = name.rstrip('$')  # computername
293         records = self.samdb.search(
294             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
295             scope=ldb.SCOPE_SUBTREE,
296             expression="(&(objectClass=dnsNode)(name={}))".format(name),
297             attrs=['dnsRecord', 'dNSTombstoned'])
298
299         # unpack data and compare
300         for record in records:
301             if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
302                 # if a record is dNSTombstoned, ignore it.
303                 continue
304             for dns_record_bin in record['dnsRecord']:
305                 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
306                 ip = str(dns_record_obj.data)
307
308                 if str(ip) == str(ip_address):
309                     return True
310
311         return False
312
313     def _find_service_principal_name(self, name, expected_service_principal_names):
314         """Find all servicePrincipalName values and compare with expected_service_principal_names"""
315         samaccountname = name.strip('$') + '$'
316         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
317                          (ldb.binary_encode(samaccountname),
318                           "CN=Computer,CN=Schema,CN=Configuration",
319                           self.samdb.domain_dn()))
320         computer_list = self.samdb.search(
321             base=self.samdb.domain_dn(),
322             scope=ldb.SCOPE_SUBTREE,
323             expression=search_filter,
324             attrs=['servicePrincipalName'])
325         names = set()
326         for computer in computer_list:
327             for name in computer.get('servicePrincipalName', []):
328                 names.add(name)
329         return names == set(expected_service_principal_names)