python/samba/netcmd: changes for samab.tests.samba_tool.computer
[samba.git] / python / samba / tests / samba_tool / computer.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
4 #
5 # based on group.py:
6 # Copyright (C) Michael Adam 2012
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 import os
23 import ldb
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
28
29
30 class ComputerCmdTestCase(SambaToolCmdTest):
31     """Tests for samba-tool computer subcommands"""
32     computers = []
33     samdb = None
34
35     def setUp(self):
36         super(ComputerCmdTestCase, self).setUp()
37         self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
38         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
39         # ips used to test --ip-address option
40         self.ipv4 = '10.10.10.10'
41         self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
42         data = [
43             {
44                 'name': 'testcomputer1',
45                 'ip_address_list': [self.ipv4]
46             },
47             {
48                 'name': 'testcomputer2',
49                 'ip_address_list': [self.ipv6],
50                 'service_principal_name_list': ['SPN0']
51             },
52             {
53                 'name': 'testcomputer3$',
54                 'ip_address_list': [self.ipv4, self.ipv6],
55                 'service_principal_name_list': ['SPN0', 'SPN1']
56             },
57             {
58                 'name': 'testcomputer4$',
59             },
60         ]
61         self.computers = [self._randomComputer(base=item) for item in data]
62
63         # setup the 4 computers and ensure they are correct
64         for computer in self.computers:
65             (result, out, err) = self._create_computer(computer)
66
67             self.assertCmdSuccess(result, out, err)
68             self.assertEquals(err, "", "There shouldn't be any error message")
69             self.assertIn("Computer '%s' created successfully" %
70                           computer["name"], out)
71
72             found = self._find_computer(computer["name"])
73
74             self.assertIsNotNone(found)
75
76             expectedname = computer["name"].rstrip('$')
77             expectedsamaccountname = computer["name"]
78             if not computer["name"].endswith('$'):
79                 expectedsamaccountname = "%s$" % computer["name"]
80             self.assertEquals("%s" % found.get("name"), expectedname)
81             self.assertEquals("%s" % found.get("sAMAccountName"),
82                               expectedsamaccountname)
83             self.assertEquals("%s" % found.get("description"),
84                               computer["description"])
85
86     def tearDown(self):
87         super(ComputerCmdTestCase, self).tearDown()
88         # clean up all the left over computers, just in case
89         for computer in self.computers:
90             if self._find_computer(computer["name"]):
91                 (result, out, err) = self.runsubcmd("computer", "delete",
92                                                     "%s" % computer["name"])
93                 self.assertCmdSuccess(result, out, err,
94                                       "Failed to delete computer '%s'" %
95                                       computer["name"])
96
97     def test_newcomputer_with_service_principal_name(self):
98         # Each computer should have correct servicePrincipalName as provided.
99         for computer in self.computers:
100             expected_names = computer.get('service_principal_name_list', [])
101             found = self._find_service_principal_name(computer['name'], expected_names)
102             self.assertTrue(found)
103
104     def test_newcomputer_with_dns_records(self):
105
106         # Each computer should have correct DNS record and ip address.
107         for computer in self.computers:
108             for ip_address in computer.get('ip_address_list', []):
109                 found = self._find_dns_record(computer['name'], ip_address)
110                 self.assertTrue(found)
111
112         # try to delete all the computers we just created
113         for computer in self.computers:
114             (result, out, err) = self.runsubcmd("computer", "delete",
115                                                 "%s" % computer["name"])
116             self.assertCmdSuccess(result, out, err,
117                                   "Failed to delete computer '%s'" %
118                                   computer["name"])
119             found = self._find_computer(computer["name"])
120             self.assertIsNone(found,
121                               "Deleted computer '%s' still exists" %
122                               computer["name"])
123
124         # all DNS records should be gone
125         for computer in self.computers:
126             for ip_address in computer.get('ip_address_list', []):
127                 found = self._find_dns_record(computer['name'], ip_address)
128                 self.assertFalse(found)
129
130     def test_newcomputer(self):
131         """This tests the "computer create" and "computer delete" commands"""
132         # try to create all the computers again, this should fail
133         for computer in self.computers:
134             (result, out, err) = self._create_computer(computer)
135             self.assertCmdFail(result, "Succeeded to create existing computer")
136             self.assertIn("already exists", err)
137
138         # try to delete all the computers we just created
139         for computer in self.computers:
140             (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
141                                                 computer["name"])
142             self.assertCmdSuccess(result, out, err,
143                                   "Failed to delete computer '%s'" %
144                                   computer["name"])
145             found = self._find_computer(computer["name"])
146             self.assertIsNone(found,
147                               "Deleted computer '%s' still exists" %
148                               computer["name"])
149
150         # test creating computers
151         for computer in self.computers:
152             (result, out, err) = self.runsubcmd(
153                 "computer", "create", "%s" % computer["name"],
154                 "--description=%s" % computer["description"])
155
156             self.assertCmdSuccess(result, out, err)
157             self.assertEquals(err, "", "There shouldn't be any error message")
158             self.assertIn("Computer '%s' created successfully" %
159                           computer["name"], out)
160
161             found = self._find_computer(computer["name"])
162
163             expectedname = computer["name"].rstrip('$')
164             expectedsamaccountname = computer["name"]
165             if not computer["name"].endswith('$'):
166                 expectedsamaccountname = "%s$" % computer["name"]
167             self.assertEquals("%s" % found.get("name"), expectedname)
168             self.assertEquals("%s" % found.get("sAMAccountName"),
169                               expectedsamaccountname)
170             self.assertEquals("%s" % found.get("description"),
171                               computer["description"])
172
173     def test_list(self):
174         (result, out, err) = self.runsubcmd("computer", "list")
175         self.assertCmdSuccess(result, out, err, "Error running list")
176
177         search_filter = ("(sAMAccountType=%u)" %
178                          dsdb.ATYPE_WORKSTATION_TRUST)
179
180         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
181                                          scope=ldb.SCOPE_SUBTREE,
182                                          expression=search_filter,
183                                          attrs=["samaccountname"])
184
185         self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
186
187         for computerobj in computerlist:
188             name = computerobj.get("samaccountname", idx=0)
189             found = self.assertMatch(out, str(name),
190                                      "computer '%s' not found" % name)
191
192     def test_move(self):
193         parentou = self._randomOU({"name": "parentOU"})
194         (result, out, err) = self._create_ou(parentou)
195         self.assertCmdSuccess(result, out, err)
196
197         for computer in self.computers:
198             olddn = self._find_computer(computer["name"]).get("dn")
199
200             (result, out, err) = self.runsubcmd("computer", "move",
201                                                 "%s" % computer["name"],
202                                                 "OU=%s" % parentou["name"])
203             self.assertCmdSuccess(result, out, err,
204                                   "Failed to move computer '%s'" %
205                                   computer["name"])
206             self.assertEquals(err, "", "There shouldn't be any error message")
207             self.assertIn('Moved computer "%s"' % computer["name"], out)
208
209             found = self._find_computer(computer["name"])
210             self.assertNotEquals(found.get("dn"), olddn,
211                                  ("Moved computer '%s' still exists with the "
212                                   "same dn" % computer["name"]))
213             computername = computer["name"].rstrip('$')
214             newexpecteddn = ldb.Dn(self.samdb,
215                                    "CN=%s,OU=%s,%s" %
216                                    (computername, parentou["name"],
217                                     self.samdb.domain_dn()))
218             self.assertEquals(found.get("dn"), newexpecteddn,
219                               "Moved computer '%s' does not exist" %
220                               computer["name"])
221
222             (result, out, err) = self.runsubcmd("computer", "move",
223                                                 "%s" % computer["name"],
224                                                 "%s" % olddn.parent())
225             self.assertCmdSuccess(result, out, err,
226                                   "Failed to move computer '%s'" %
227                                   computer["name"])
228
229         (result, out, err) = self.runsubcmd("ou", "delete",
230                                             "OU=%s" % parentou["name"])
231         self.assertCmdSuccess(result, out, err,
232                               "Failed to delete ou '%s'" % parentou["name"])
233
234     def _randomComputer(self, base={}):
235         """create a computer with random attribute values, you can specify base
236         attributes"""
237
238         computer = {
239             "name": self.randomName(),
240             "description": self.randomName(count=100),
241         }
242         computer.update(base)
243         return computer
244
245     def _randomOU(self, base={}):
246         """create an ou with random attribute values, you can specify base
247         attributes"""
248
249         ou = {
250             "name": self.randomName(),
251             "description": self.randomName(count=100),
252         }
253         ou.update(base)
254         return ou
255
256     def _create_computer(self, computer):
257         args = '{} {} --description={}'.format(
258             computer['name'], self.creds, computer["description"])
259
260         for ip_address in computer.get('ip_address_list', []):
261             args += ' --ip-address={}'.format(ip_address)
262
263         for service_principal_name in computer.get('service_principal_name_list', []):
264             args += ' --service-principal-name={}'.format(service_principal_name)
265
266         args = args.split()
267
268         return self.runsubcmd('computer', 'create', *args)
269
270     def _create_ou(self, ou):
271         return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
272                               "--description=%s" % ou["description"])
273
274     def _find_computer(self, name):
275         samaccountname = name
276         if not name.endswith('$'):
277             samaccountname = "%s$" % name
278         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
279                          (ldb.binary_encode(samaccountname),
280                           "CN=Computer,CN=Schema,CN=Configuration",
281                           self.samdb.domain_dn()))
282         computerlist = self.samdb.search(base=self.samdb.domain_dn(),
283                                          scope=ldb.SCOPE_SUBTREE,
284                                          expression=search_filter, attrs=[])
285         if computerlist:
286             return computerlist[0]
287         else:
288             return None
289
290     def _find_dns_record(self, name, ip_address):
291         name = name.rstrip('$')  # computername
292         records = self.samdb.search(
293             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
294             scope=ldb.SCOPE_SUBTREE,
295             expression="(&(objectClass=dnsNode)(name={}))".format(name),
296             attrs=['dnsRecord', 'dNSTombstoned'])
297
298         # unpack data and compare
299         for record in records:
300             if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
301                 # if a record is dNSTombstoned, ignore it.
302                 continue
303             for dns_record_bin in record['dnsRecord']:
304                 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
305                 ip = str(dns_record_obj.data)
306
307                 if str(ip) == str(ip_address):
308                     return True
309
310         return False
311
312     def _find_service_principal_name(self, name, expected_service_principal_names):
313         """Find all servicePrincipalName values and compare with expected_service_principal_names"""
314         samaccountname = name.strip('$') + '$'
315         search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
316                          (ldb.binary_encode(samaccountname),
317                           "CN=Computer,CN=Schema,CN=Configuration",
318                           self.samdb.domain_dn()))
319         computer_list = self.samdb.search(
320             base=self.samdb.domain_dn(),
321             scope=ldb.SCOPE_SUBTREE,
322             expression=search_filter,
323             attrs=['servicePrincipalName'])
324         names = set()
325         for computer in computer_list:
326             for name in computer.get('servicePrincipalName', []):
327                 names.add(str(name))
328         return names == set(expected_service_principal_names)