1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
6 # Copyright (C) Michael Adam 2012
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
30 class ComputerCmdTestCase(SambaToolCmdTest):
31 """Tests for samba-tool computer subcommands"""
36 super(ComputerCmdTestCase, self).setUp()
37 self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
38 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
39 # ips used to test --ip-address option
40 self.ipv4 = '10.10.10.10'
41 self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
44 'name': 'testcomputer1',
45 'ip_address_list': [self.ipv4]
48 'name': 'testcomputer2',
49 'ip_address_list': [self.ipv6],
50 'service_principal_name_list': ['SPN0']
53 'name': 'testcomputer3$',
54 'ip_address_list': [self.ipv4, self.ipv6],
55 'service_principal_name_list': ['SPN0', 'SPN1']
58 'name': 'testcomputer4$',
61 self.computers = [self._randomComputer(base=item) for item in data]
63 # setup the 4 computers and ensure they are correct
64 for computer in self.computers:
65 (result, out, err) = self._create_computer(computer)
67 self.assertCmdSuccess(result, out, err)
69 "ERROR", err, "There shouldn't be any error message")
70 self.assertIn("Computer '%s' created successfully" %
71 computer["name"], out)
73 found = self._find_computer(computer["name"])
75 self.assertIsNotNone(found)
77 expectedname = computer["name"].rstrip('$')
78 expectedsamaccountname = computer["name"]
79 if not computer["name"].endswith('$'):
80 expectedsamaccountname = "%s$" % computer["name"]
81 self.assertEquals("%s" % found.get("name"), expectedname)
82 self.assertEquals("%s" % found.get("sAMAccountName"),
83 expectedsamaccountname)
84 self.assertEquals("%s" % found.get("description"),
85 computer["description"])
88 super(ComputerCmdTestCase, self).tearDown()
89 # clean up all the left over computers, just in case
90 for computer in self.computers:
91 if self._find_computer(computer["name"]):
92 (result, out, err) = self.runsubcmd("computer", "delete",
93 "%s" % computer["name"])
94 self.assertCmdSuccess(result, out, err,
95 "Failed to delete computer '%s'" %
98 def test_newcomputer_with_service_principal_name(self):
99 # Each computer should have correct servicePrincipalName as provided.
100 for computer in self.computers:
101 expected_names = computer.get('service_principal_name_list', [])
102 found = self._find_service_principal_name(computer['name'], expected_names)
103 self.assertTrue(found)
105 def test_newcomputer_with_dns_records(self):
107 # Each computer should have correct DNS record and ip address.
108 for computer in self.computers:
109 for ip_address in computer.get('ip_address_list', []):
110 found = self._find_dns_record(computer['name'], ip_address)
111 self.assertTrue(found)
113 # try to delete all the computers we just created
114 for computer in self.computers:
115 (result, out, err) = self.runsubcmd("computer", "delete",
116 "%s" % computer["name"])
117 self.assertCmdSuccess(result, out, err,
118 "Failed to delete computer '%s'" %
120 found = self._find_computer(computer["name"])
121 self.assertIsNone(found,
122 "Deleted computer '%s' still exists" %
125 # all DNS records should be gone
126 for computer in self.computers:
127 for ip_address in computer.get('ip_address_list', []):
128 found = self._find_dns_record(computer['name'], ip_address)
129 self.assertFalse(found)
131 def test_newcomputer(self):
132 """This tests the "computer create" and "computer delete" commands"""
133 # try to create all the computers again, this should fail
134 for computer in self.computers:
135 (result, out, err) = self._create_computer(computer)
136 self.assertCmdFail(result, "Succeeded to create existing computer")
137 self.assertIn("already exists", err)
139 # try to delete all the computers we just created
140 for computer in self.computers:
141 (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
143 self.assertCmdSuccess(result, out, err,
144 "Failed to delete computer '%s'" %
146 found = self._find_computer(computer["name"])
147 self.assertIsNone(found,
148 "Deleted computer '%s' still exists" %
151 # test creating computers
152 for computer in self.computers:
153 (result, out, err) = self.runsubcmd(
154 "computer", "create", "%s" % computer["name"],
155 "--description=%s" % computer["description"])
157 self.assertCmdSuccess(result, out, err)
158 self.assertEquals(err, "", "There shouldn't be any error message")
159 self.assertIn("Computer '%s' created successfully" %
160 computer["name"], out)
162 found = self._find_computer(computer["name"])
164 expectedname = computer["name"].rstrip('$')
165 expectedsamaccountname = computer["name"]
166 if not computer["name"].endswith('$'):
167 expectedsamaccountname = "%s$" % computer["name"]
168 self.assertEquals("%s" % found.get("name"), expectedname)
169 self.assertEquals("%s" % found.get("sAMAccountName"),
170 expectedsamaccountname)
171 self.assertEquals("%s" % found.get("description"),
172 computer["description"])
175 (result, out, err) = self.runsubcmd("computer", "list")
176 self.assertCmdSuccess(result, out, err, "Error running list")
178 search_filter = ("(sAMAccountType=%u)" %
179 dsdb.ATYPE_WORKSTATION_TRUST)
181 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
182 scope=ldb.SCOPE_SUBTREE,
183 expression=search_filter,
184 attrs=["samaccountname"])
186 self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
188 for computerobj in computerlist:
189 name = computerobj.get("samaccountname", idx=0)
190 found = self.assertMatch(out, str(name),
191 "computer '%s' not found" % name)
194 parentou = self._randomOU({"name": "parentOU"})
195 (result, out, err) = self._create_ou(parentou)
196 self.assertCmdSuccess(result, out, err)
198 for computer in self.computers:
199 olddn = self._find_computer(computer["name"]).get("dn")
201 (result, out, err) = self.runsubcmd("computer", "move",
202 "%s" % computer["name"],
203 "OU=%s" % parentou["name"])
204 self.assertCmdSuccess(result, out, err,
205 "Failed to move computer '%s'" %
207 self.assertEquals(err, "", "There shouldn't be any error message")
208 self.assertIn('Moved computer "%s"' % computer["name"], out)
210 found = self._find_computer(computer["name"])
211 self.assertNotEquals(found.get("dn"), olddn,
212 ("Moved computer '%s' still exists with the "
213 "same dn" % computer["name"]))
214 computername = computer["name"].rstrip('$')
215 newexpecteddn = ldb.Dn(self.samdb,
217 (computername, parentou["name"],
218 self.samdb.domain_dn()))
219 self.assertEquals(found.get("dn"), newexpecteddn,
220 "Moved computer '%s' does not exist" %
223 (result, out, err) = self.runsubcmd("computer", "move",
224 "%s" % computer["name"],
225 "%s" % olddn.parent())
226 self.assertCmdSuccess(result, out, err,
227 "Failed to move computer '%s'" %
230 (result, out, err) = self.runsubcmd("ou", "delete",
231 "OU=%s" % parentou["name"])
232 self.assertCmdSuccess(result, out, err,
233 "Failed to delete ou '%s'" % parentou["name"])
235 def _randomComputer(self, base={}):
236 """create a computer with random attribute values, you can specify base
240 "name": self.randomName(),
241 "description": self.randomName(count=100),
243 computer.update(base)
246 def _randomOU(self, base={}):
247 """create an ou with random attribute values, you can specify base
251 "name": self.randomName(),
252 "description": self.randomName(count=100),
257 def _create_computer(self, computer):
258 args = '{0} {1} --description={2}'.format(
259 computer['name'], self.creds, computer["description"])
261 for ip_address in computer.get('ip_address_list', []):
262 args += ' --ip-address={0}'.format(ip_address)
264 for service_principal_name in computer.get('service_principal_name_list', []):
265 args += ' --service-principal-name={0}'.format(service_principal_name)
269 return self.runsubcmd('computer', 'create', *args)
271 def _create_ou(self, ou):
272 return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
273 "--description=%s" % ou["description"])
275 def _find_computer(self, name):
276 samaccountname = name
277 if not name.endswith('$'):
278 samaccountname = "%s$" % name
279 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
280 (ldb.binary_encode(samaccountname),
281 "CN=Computer,CN=Schema,CN=Configuration",
282 self.samdb.domain_dn()))
283 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
284 scope=ldb.SCOPE_SUBTREE,
285 expression=search_filter, attrs=[])
287 return computerlist[0]
291 def _find_dns_record(self, name, ip_address):
292 name = name.rstrip('$') # computername
293 records = self.samdb.search(
294 base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
295 scope=ldb.SCOPE_SUBTREE,
296 expression="(&(objectClass=dnsNode)(name={0}))".format(name),
297 attrs=['dnsRecord', 'dNSTombstoned'])
299 # unpack data and compare
300 for record in records:
301 if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
302 # if a record is dNSTombstoned, ignore it.
304 for dns_record_bin in record['dnsRecord']:
305 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
306 ip = str(dns_record_obj.data)
308 if str(ip) == str(ip_address):
313 def _find_service_principal_name(self, name, expected_service_principal_names):
314 """Find all servicePrincipalName values and compare with expected_service_principal_names"""
315 samaccountname = name.strip('$') + '$'
316 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
317 (ldb.binary_encode(samaccountname),
318 "CN=Computer,CN=Schema,CN=Configuration",
319 self.samdb.domain_dn()))
320 computer_list = self.samdb.search(
321 base=self.samdb.domain_dn(),
322 scope=ldb.SCOPE_SUBTREE,
323 expression=search_filter,
324 attrs=['servicePrincipalName'])
326 for computer in computer_list:
327 for name in computer.get('servicePrincipalName', []):
329 return names == set(expected_service_principal_names)