1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
6 # Copyright (C) Michael Adam 2012
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
30 class ComputerCmdTestCase(SambaToolCmdTest):
31 """Tests for samba-tool computer subcommands"""
36 super(ComputerCmdTestCase, self).setUp()
37 self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
38 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
39 # ips used to test --ip-address option
40 self.ipv4 = '10.10.10.10'
41 self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
44 'name': 'testcomputer1',
45 'ip_address_list': [self.ipv4]
48 'name': 'testcomputer2',
49 'ip_address_list': [self.ipv6],
50 'service_principal_name_list': ['SPN0']
53 'name': 'testcomputer3$',
54 'ip_address_list': [self.ipv4, self.ipv6],
55 'service_principal_name_list': ['SPN0', 'SPN1']
58 'name': 'testcomputer4$',
61 self.computers = [self._randomComputer(base=item) for item in data]
63 # setup the 4 computers and ensure they are correct
64 for computer in self.computers:
65 (result, out, err) = self._create_computer(computer)
67 self.assertCmdSuccess(result, out, err)
68 self.assertEquals(err, "", "There shouldn't be any error message")
69 self.assertIn("Computer '%s' created successfully" %
70 computer["name"], out)
72 found = self._find_computer(computer["name"])
74 self.assertIsNotNone(found)
76 expectedname = computer["name"].rstrip('$')
77 expectedsamaccountname = computer["name"]
78 if not computer["name"].endswith('$'):
79 expectedsamaccountname = "%s$" % computer["name"]
80 self.assertEquals("%s" % found.get("name"), expectedname)
81 self.assertEquals("%s" % found.get("sAMAccountName"),
82 expectedsamaccountname)
83 self.assertEquals("%s" % found.get("description"),
84 computer["description"])
88 super(ComputerCmdTestCase, self).tearDown()
89 # clean up all the left over computers, just in case
90 for computer in self.computers:
91 if self._find_computer(computer["name"]):
92 (result, out, err) = self.runsubcmd("computer", "delete",
93 "%s" % computer["name"])
94 self.assertCmdSuccess(result, out, err,
95 "Failed to delete computer '%s'" %
98 def test_newcomputer_with_service_principal_name(self):
99 # Each computer should have correct servicePrincipalName as provided.
100 for computer in self.computers:
101 expected_names = computer.get('service_principal_name_list', [])
102 found = self._find_service_principal_name(computer['name'], expected_names)
103 self.assertTrue(found)
105 def test_newcomputer_with_dns_records(self):
107 # Each computer should have correct DNS record and ip address.
108 for computer in self.computers:
109 for ip_address in computer.get('ip_address_list', []):
110 found = self._find_dns_record(computer['name'], ip_address)
111 self.assertTrue(found)
113 # try to delete all the computers we just created
114 for computer in self.computers:
115 (result, out, err) = self.runsubcmd("computer", "delete",
116 "%s" % computer["name"])
117 self.assertCmdSuccess(result, out, err,
118 "Failed to delete computer '%s'" %
120 found = self._find_computer(computer["name"])
121 self.assertIsNone(found,
122 "Deleted computer '%s' still exists" %
125 # all DNS records should be gone
126 for computer in self.computers:
127 for ip_address in computer.get('ip_address_list', []):
128 found = self._find_dns_record(computer['name'], ip_address)
129 self.assertFalse(found)
131 def test_newcomputer(self):
132 """This tests the "computer create" and "computer delete" commands"""
133 # try to create all the computers again, this should fail
134 for computer in self.computers:
135 (result, out, err) = self._create_computer(computer)
136 self.assertCmdFail(result, "Succeeded to create existing computer")
137 self.assertIn("already exists", err)
140 # try to delete all the computers we just created
141 for computer in self.computers:
142 (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
144 self.assertCmdSuccess(result, out, err,
145 "Failed to delete computer '%s'" %
147 found = self._find_computer(computer["name"])
148 self.assertIsNone(found,
149 "Deleted computer '%s' still exists" %
152 # test creating computers
153 for computer in self.computers:
154 (result, out, err) = self.runsubcmd(
155 "computer", "create", "%s" % computer["name"],
156 "--description=%s" % computer["description"])
158 self.assertCmdSuccess(result, out, err)
159 self.assertEquals(err, "", "There shouldn't be any error message")
160 self.assertIn("Computer '%s' created successfully" %
161 computer["name"], out)
163 found = self._find_computer(computer["name"])
165 expectedname = computer["name"].rstrip('$')
166 expectedsamaccountname = computer["name"]
167 if not computer["name"].endswith('$'):
168 expectedsamaccountname = "%s$" % computer["name"]
169 self.assertEquals("%s" % found.get("name"), expectedname)
170 self.assertEquals("%s" % found.get("sAMAccountName"),
171 expectedsamaccountname)
172 self.assertEquals("%s" % found.get("description"),
173 computer["description"])
176 (result, out, err) = self.runsubcmd("computer", "list")
177 self.assertCmdSuccess(result, out, err, "Error running list")
179 search_filter = ("(sAMAccountType=%u)" %
180 dsdb.ATYPE_WORKSTATION_TRUST)
182 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
183 scope=ldb.SCOPE_SUBTREE,
184 expression=search_filter,
185 attrs=["samaccountname"])
187 self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
189 for computerobj in computerlist:
190 name = computerobj.get("samaccountname", idx=0)
191 found = self.assertMatch(out, name,
192 "computer '%s' not found" % name)
195 parentou = self._randomOU({"name": "parentOU"})
196 (result, out, err) = self._create_ou(parentou)
197 self.assertCmdSuccess(result, out, err)
199 for computer in self.computers:
200 olddn = self._find_computer(computer["name"]).get("dn")
202 (result, out, err) = self.runsubcmd("computer", "move",
203 "%s" % computer["name"],
204 "OU=%s" % parentou["name"])
205 self.assertCmdSuccess(result, out, err,
206 "Failed to move computer '%s'" %
208 self.assertEquals(err, "", "There shouldn't be any error message")
209 self.assertIn('Moved computer "%s"' % computer["name"], out)
211 found = self._find_computer(computer["name"])
212 self.assertNotEquals(found.get("dn"), olddn,
213 ("Moved computer '%s' still exists with the "
214 "same dn" % computer["name"]))
215 computername = computer["name"].rstrip('$')
216 newexpecteddn = ldb.Dn(self.samdb,
218 (computername, parentou["name"],
219 self.samdb.domain_dn()))
220 self.assertEquals(found.get("dn"), newexpecteddn,
221 "Moved computer '%s' does not exist" %
224 (result, out, err) = self.runsubcmd("computer", "move",
225 "%s" % computer["name"],
226 "%s" % olddn.parent())
227 self.assertCmdSuccess(result, out, err,
228 "Failed to move computer '%s'" %
231 (result, out, err) = self.runsubcmd("ou", "delete",
232 "OU=%s" % parentou["name"])
233 self.assertCmdSuccess(result, out, err,
234 "Failed to delete ou '%s'" % parentou["name"])
236 def _randomComputer(self, base={}):
237 """create a computer with random attribute values, you can specify base
241 "name": self.randomName(),
242 "description": self.randomName(count=100),
244 computer.update(base)
247 def _randomOU(self, base={}):
248 """create an ou with random attribute values, you can specify base
252 "name": self.randomName(),
253 "description": self.randomName(count=100),
258 def _create_computer(self, computer):
259 args = '{} {} --description={}'.format(
260 computer['name'], self.creds, computer["description"])
262 for ip_address in computer.get('ip_address_list', []):
263 args += ' --ip-address={}'.format(ip_address)
265 for service_principal_name in computer.get('service_principal_name_list', []):
266 args += ' --service-principal-name={}'.format(service_principal_name)
270 return self.runsubcmd('computer', 'create', *args)
272 def _create_ou(self, ou):
273 return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
274 "--description=%s" % ou["description"])
276 def _find_computer(self, name):
277 samaccountname = name
278 if not name.endswith('$'):
279 samaccountname = "%s$" % name
280 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
281 (ldb.binary_encode(samaccountname),
282 "CN=Computer,CN=Schema,CN=Configuration",
283 self.samdb.domain_dn()))
284 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
285 scope=ldb.SCOPE_SUBTREE,
286 expression=search_filter, attrs=[])
288 return computerlist[0]
292 def _find_dns_record(self, name, ip_address):
293 name = name.rstrip('$') # computername
294 records = self.samdb.search(
295 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
296 scope=ldb.SCOPE_SUBTREE,
297 expression="(&(objectClass=dnsNode)(name={}))".format(name),
298 attrs=['dnsRecord', 'dNSTombstoned'])
300 # unpack data and compare
301 for record in records:
302 if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
303 # if a record is dNSTombstoned, ignore it.
305 for dns_record_bin in record['dnsRecord']:
306 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
307 ip = str(dns_record_obj.data)
309 if str(ip) == str(ip_address):
314 def _find_service_principal_name(self, name, expected_service_principal_names):
315 """Find all servicePrincipalName values and compare with expected_service_principal_names"""
316 samaccountname = name.strip('$') + '$'
317 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
318 (ldb.binary_encode(samaccountname),
319 "CN=Computer,CN=Schema,CN=Configuration",
320 self.samdb.domain_dn()))
321 computer_list = self.samdb.search(
322 base=self.samdb.domain_dn(),
323 scope=ldb.SCOPE_SUBTREE,
324 expression=search_filter,
325 attrs=['servicePrincipalName'])
327 for computer in computer_list:
328 for name in computer.get('servicePrincipalName', []):
330 return names == set(expected_service_principal_names)