1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
6 # Copyright (C) Michael Adam 2012
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
29 class ComputerCmdTestCase(SambaToolCmdTest):
30 """Tests for samba-tool computer subcommands"""
35 super(ComputerCmdTestCase, self).setUp()
36 self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
37 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
38 # ips used to test --ip-address option
39 self.ipv4 = '10.10.10.10'
40 self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
43 'name': 'testcomputer1',
44 'ip_address_list': [self.ipv4]
47 'name': 'testcomputer2',
48 'ip_address_list': [self.ipv6],
49 'service_principal_name_list': ['SPN0']
52 'name': 'testcomputer3$',
53 'ip_address_list': [self.ipv4, self.ipv6],
54 'service_principal_name_list': ['SPN0', 'SPN1']
57 'name': 'testcomputer4$',
60 self.computers = [self._randomComputer(base=item) for item in data]
62 # setup the 4 computers and ensure they are correct
63 for computer in self.computers:
64 (result, out, err) = self._create_computer(computer)
66 self.assertCmdSuccess(result, out, err)
67 self.assertEquals(err, "", "There shouldn't be any error message")
68 self.assertIn("Computer '%s' created successfully" %
69 computer["name"], out)
71 found = self._find_computer(computer["name"])
73 self.assertIsNotNone(found)
75 expectedname = computer["name"].rstrip('$')
76 expectedsamaccountname = computer["name"]
77 if not computer["name"].endswith('$'):
78 expectedsamaccountname = "%s$" % computer["name"]
79 self.assertEquals("%s" % found.get("name"), expectedname)
80 self.assertEquals("%s" % found.get("sAMAccountName"),
81 expectedsamaccountname)
82 self.assertEquals("%s" % found.get("description"),
83 computer["description"])
87 super(ComputerCmdTestCase, self).tearDown()
88 # clean up all the left over computers, just in case
89 for computer in self.computers:
90 if self._find_computer(computer["name"]):
91 (result, out, err) = self.runsubcmd("computer", "delete",
92 "%s" % computer["name"])
93 self.assertCmdSuccess(result, out, err,
94 "Failed to delete computer '%s'" %
97 def test_newcomputer_with_service_principal_name(self):
98 # Each computer should have correct servicePrincipalName as provided.
99 for computer in self.computers:
100 expected_names = computer.get('service_principal_name_list', [])
101 found = self._find_service_principal_name(computer['name'], expected_names)
102 self.assertTrue(found)
104 def test_newcomputer_with_dns_records(self):
106 # Each computer should have correct DNS record and ip address.
107 for computer in self.computers:
108 for ip_address in computer.get('ip_address_list', []):
109 found = self._find_dns_record(computer['name'], ip_address)
110 self.assertTrue(found)
112 # try to delete all the computers we just created
113 for computer in self.computers:
114 (result, out, err) = self.runsubcmd("computer", "delete",
115 "%s" % computer["name"])
116 self.assertCmdSuccess(result, out, err,
117 "Failed to delete computer '%s'" %
119 found = self._find_computer(computer["name"])
120 self.assertIsNone(found,
121 "Deleted computer '%s' still exists" %
124 # all DNS records should be gone
125 for computer in self.computers:
126 for ip_address in computer.get('ip_address_list', []):
127 found = self._find_dns_record(computer['name'], ip_address)
128 self.assertFalse(found)
130 def test_newcomputer(self):
131 """This tests the "computer create" and "computer delete" commands"""
132 # try to create all the computers again, this should fail
133 for computer in self.computers:
134 (result, out, err) = self._create_computer(computer)
135 self.assertCmdFail(result, "Succeeded to create existing computer")
136 self.assertIn("already exists", err)
139 # try to delete all the computers we just created
140 for computer in self.computers:
141 (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
143 self.assertCmdSuccess(result, out, err,
144 "Failed to delete computer '%s'" %
146 found = self._find_computer(computer["name"])
147 self.assertIsNone(found,
148 "Deleted computer '%s' still exists" %
151 # test creating computers
152 for computer in self.computers:
153 (result, out, err) = self.runsubcmd(
154 "computer", "create", "%s" % computer["name"],
155 "--description=%s" % computer["description"])
157 self.assertCmdSuccess(result, out, err)
158 self.assertEquals(err, "", "There shouldn't be any error message")
159 self.assertIn("Computer '%s' created successfully" %
160 computer["name"], out)
162 found = self._find_computer(computer["name"])
164 expectedname = computer["name"].rstrip('$')
165 expectedsamaccountname = computer["name"]
166 if not computer["name"].endswith('$'):
167 expectedsamaccountname = "%s$" % computer["name"]
168 self.assertEquals("%s" % found.get("name"), expectedname)
169 self.assertEquals("%s" % found.get("sAMAccountName"),
170 expectedsamaccountname)
171 self.assertEquals("%s" % found.get("description"),
172 computer["description"])
175 (result, out, err) = self.runsubcmd("computer", "list")
176 self.assertCmdSuccess(result, out, err, "Error running list")
178 search_filter = ("(sAMAccountType=%u)" %
179 dsdb.ATYPE_WORKSTATION_TRUST)
181 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
182 scope=ldb.SCOPE_SUBTREE,
183 expression=search_filter,
184 attrs=["samaccountname"])
186 self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
188 for computerobj in computerlist:
189 name = computerobj.get("samaccountname", idx=0)
190 found = self.assertMatch(out, name,
191 "computer '%s' not found" % name)
194 parentou = self._randomOU({"name": "parentOU"})
195 (result, out, err) = self._create_ou(parentou)
196 self.assertCmdSuccess(result, out, err)
198 for computer in self.computers:
199 olddn = self._find_computer(computer["name"]).get("dn")
201 (result, out, err) = self.runsubcmd("computer", "move",
202 "%s" % computer["name"],
203 "OU=%s" % parentou["name"])
204 self.assertCmdSuccess(result, out, err,
205 "Failed to move computer '%s'" %
207 self.assertEquals(err, "", "There shouldn't be any error message")
208 self.assertIn('Moved computer "%s"' % computer["name"], out)
210 found = self._find_computer(computer["name"])
211 self.assertNotEquals(found.get("dn"), olddn,
212 ("Moved computer '%s' still exists with the "
213 "same dn" % computer["name"]))
214 computername = computer["name"].rstrip('$')
215 newexpecteddn = ldb.Dn(self.samdb,
217 (computername, parentou["name"],
218 self.samdb.domain_dn()))
219 self.assertEquals(found.get("dn"), newexpecteddn,
220 "Moved computer '%s' does not exist" %
223 (result, out, err) = self.runsubcmd("computer", "move",
224 "%s" % computer["name"],
225 "%s" % olddn.parent())
226 self.assertCmdSuccess(result, out, err,
227 "Failed to move computer '%s'" %
230 (result, out, err) = self.runsubcmd("ou", "delete",
231 "OU=%s" % parentou["name"])
232 self.assertCmdSuccess(result, out, err,
233 "Failed to delete ou '%s'" % parentou["name"])
235 def _randomComputer(self, base={}):
236 """create a computer with random attribute values, you can specify base
240 "name": self.randomName(),
241 "description": self.randomName(count=100),
243 computer.update(base)
246 def _randomOU(self, base={}):
247 """create an ou with random attribute values, you can specify base
251 "name": self.randomName(),
252 "description": self.randomName(count=100),
257 def _create_computer(self, computer):
258 args = '{} {} --description={}'.format(
259 computer['name'], self.creds, computer["description"])
261 for ip_address in computer.get('ip_address_list', []):
262 args += ' --ip-address={}'.format(ip_address)
264 for service_principal_name in computer.get('service_principal_name_list', []):
265 args += ' --service-principal-name={}'.format(service_principal_name)
269 return self.runsubcmd('computer', 'create', *args)
271 def _create_ou(self, ou):
272 return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
273 "--description=%s" % ou["description"])
275 def _find_computer(self, name):
276 samaccountname = name
277 if not name.endswith('$'):
278 samaccountname = "%s$" % name
279 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
280 (ldb.binary_encode(samaccountname),
281 "CN=Computer,CN=Schema,CN=Configuration",
282 self.samdb.domain_dn()))
283 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
284 scope=ldb.SCOPE_SUBTREE,
285 expression=search_filter, attrs=[])
287 return computerlist[0]
291 def _find_dns_record(self, name, ip_address):
292 name = name.rstrip('$') # computername
293 records = self.samdb.search(
294 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
295 scope=ldb.SCOPE_SUBTREE,
296 expression="(&(objectClass=dnsNode)(name={}))".format(name),
297 attrs=['dnsRecord', 'dNSTombstoned'])
299 # unpack data and compare
300 for record in records:
301 if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
302 # if a record is dNSTombstoned, ignore it.
304 for dns_record_bin in record['dnsRecord']:
305 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
306 ip = str(dns_record_obj.data)
308 if str(ip) == str(ip_address):
313 def _find_service_principal_name(self, name, expected_service_principal_names):
314 """Find all servicePrincipalName values and compare with expected_service_principal_names"""
315 samaccountname = name.strip('$') + '$'
316 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
317 (ldb.binary_encode(samaccountname),
318 "CN=Computer,CN=Schema,CN=Configuration",
319 self.samdb.domain_dn()))
320 computer_list = self.samdb.search(
321 base=self.samdb.domain_dn(),
322 scope=ldb.SCOPE_SUBTREE,
323 expression=search_filter,
324 attrs=['servicePrincipalName'])
326 for computer in computer_list:
327 for name in computer.get('servicePrincipalName', []):
329 return names == set(expected_service_principal_names)