2 # -*- coding: utf-8 -*-
4 # Tests various RID allocation scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 # export DC1=dc1_dns_name
27 # export DC2=dc2_dns_name
28 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 from ldb import SCOPE_BASE
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
42 import shutil, tempfile, os
43 from samba.auth import system_session, admin_session
44 from samba.dbchecker import dbcheck
45 from samba.ndr import ndr_pack
46 from samba.dcerpc import security
48 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
49 """Intended as a semi-black box test case for DsGetNCChanges
50 implementation for extended operations. It should be testing
51 how DsGetNCChanges handles different input params (mostly invalid).
52 Final goal is to make DsGetNCChanges as binary compatible to
53 Windows implementation as possible"""
56 super(DrsReplicaSyncTestCase, self).setUp()
59 super(DrsReplicaSyncTestCase, self).tearDown()
61 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
62 """Returns (owner, not_owner) pair where:
63 owner: dns name for FSMO owner
64 not_owner: dns name for DC not owning the FSMO"""
65 # collect info to return later
66 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
67 "invocation_id": self.ldb_dc1.get_invocation_id(),
68 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
69 "server_dn": self.ldb_dc1.get_serverName()}
70 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
71 "invocation_id": self.ldb_dc2.get_invocation_id(),
72 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
73 "server_dn": self.ldb_dc2.get_serverName()}
75 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
76 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8'))
77 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
79 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
80 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8'))
81 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
83 # determine the owner dc
84 res = self.ldb_dc1.search(fsmo_obj_dn,
85 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
86 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
87 fsmo_owner = res[0]["fSMORoleOwner"][0]
88 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
89 return (fsmo_info_1, fsmo_info_2)
90 return (fsmo_info_2, fsmo_info_1)
92 def _check_exop_failed(self, ctr6, expected_failure):
93 self.assertEqual(ctr6.extended_ret, expected_failure)
94 #self.assertEqual(ctr6.object_count, 0)
95 #self.assertEqual(ctr6.first_object, None)
96 self.assertEqual(ctr6.more_data, False)
97 self.assertEqual(ctr6.nc_object_count, 0)
98 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
99 self.assertEqual(ctr6.linked_attributes_count, 0)
100 self.assertEqual(ctr6.linked_attributes, [])
101 self.assertEqual(ctr6.drs_error[0], 0)
103 def test_InvalidDestDSA_ridalloc(self):
104 """Test RID allocation with invalid destination DSA guid"""
105 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
106 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
108 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
109 invocation_id=fsmo_owner["invocation_id"],
111 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
113 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
114 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
115 self.assertEqual(level, 6, "Expected level 6 response!")
116 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
117 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
118 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
120 def test_do_ridalloc(self):
121 """Test doing a RID allocation with a valid destination DSA guid"""
122 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
123 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
125 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
126 invocation_id=fsmo_owner["invocation_id"],
128 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
130 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
131 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
132 self.assertEqual(level, 6, "Expected level 6 response!")
133 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
134 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
136 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
137 self.assertEqual(ctr6.object_count, 3)
138 self.assertNotEqual(ctr6.first_object, None)
139 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
140 self.assertNotEqual(ctr6.first_object.next_object, None)
141 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
142 second_object = ctr6.first_object.next_object.object
143 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
144 third_object = ctr6.first_object.next_object.next_object.object
145 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
147 self.assertEqual(ctr6.more_data, False)
148 self.assertEqual(ctr6.nc_object_count, 0)
149 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
150 self.assertEqual(ctr6.drs_error[0], 0)
151 # We don't check the linked_attributes_count as if the domain
152 # has an RODC, it can gain links on the server account object
154 def test_do_ridalloc_get_anc(self):
155 """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
156 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
157 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
159 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
160 invocation_id=fsmo_owner["invocation_id"],
162 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
163 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
165 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
166 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
167 self.assertEqual(level, 6, "Expected level 6 response!")
168 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
169 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
171 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
172 self.assertEqual(ctr6.object_count, 3)
173 self.assertNotEqual(ctr6.first_object, None)
174 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
175 self.assertNotEqual(ctr6.first_object.next_object, None)
176 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
177 second_object = ctr6.first_object.next_object.object
178 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
179 third_object = ctr6.first_object.next_object.next_object.object
180 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
181 self.assertEqual(ctr6.more_data, False)
182 self.assertEqual(ctr6.nc_object_count, 0)
183 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
184 self.assertEqual(ctr6.drs_error[0], 0)
185 # We don't check the linked_attributes_count as if the domain
186 # has an RODC, it can gain links on the server account object
188 def test_edit_rid_master(self):
189 """Test doing a RID allocation after changing the RID master from the original one.
190 This should set rIDNextRID to 0 on the new RID master."""
191 # 1. a. Transfer role to non-RID master
192 # b. Check that it succeeds correctly
194 # 2. a. Call the RID alloc against the former master.
195 # b. Check that it succeeds.
196 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
197 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
199 # 1. Swap RID master role
201 m.dn = ldb.Dn(self.ldb_dc1, "")
202 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
205 # Make sure that ldb_dc1 == RID Master
207 server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
209 # self.ldb_dc1 == LOCALDC
210 if server_dn == fsmo_owner['server_dn']:
211 # ldb_dc1 == VAMPIREDC
212 ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
214 # Otherwise switch the two
215 ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
218 # ldb_dc1 is now RID MASTER (as VAMPIREDC)
220 except ldb.LdbError as e1:
222 self.fail("Failed to reassign RID Master " + msg)
225 # 2. Perform a RID alloc
226 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
227 invocation_id=fsmo_not_owner["invocation_id"],
229 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
231 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
232 # 3. Make sure the allocation succeeds
234 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
235 except RuntimeError as e:
236 self.fail("RID allocation failed: " + str(e))
238 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
240 self.assertEqual(level, 6, "Expected level 6 response!")
241 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
242 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
244 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
245 self.assertEqual(ctr6.object_count, 3)
246 self.assertNotEqual(ctr6.first_object, None)
247 self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
248 self.assertNotEqual(ctr6.first_object.next_object, None)
249 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
250 second_object = ctr6.first_object.next_object.object
251 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
252 third_object = ctr6.first_object.next_object.next_object.object
253 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
255 # Swap the RID master back for other tests
257 m.dn = ldb.Dn(ldb_dc2, "")
258 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
261 except ldb.LdbError as e:
263 self.fail("Failed to restore RID Master " + msg)
265 def test_offline_samba_tool_seized_ridalloc(self):
266 """Perform a join against the non-RID manager and then seize the RID Manager role"""
268 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
269 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
271 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
273 # Connect to the database
274 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
275 smbconf = os.path.join(targetdir, "etc/smb.conf")
277 lp = self.get_loadparm()
278 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
279 session_info=system_session(lp), lp=lp)
282 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
283 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
284 # 2. Get server reference
285 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
287 # Assert that no RID Set has been set
288 res = new_ldb.search(base=server_ref_dn,
289 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
291 self.assertFalse("rIDSetReferences" in res[0])
293 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
294 self.assertCmdSuccess(result, out, err)
295 self.assertEquals(err,"","Shouldn't be any error messages")
297 # 3. Assert we get the RID Set
298 res = new_ldb.search(base=server_ref_dn,
299 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
301 self.assertTrue("rIDSetReferences" in res[0])
303 shutil.rmtree(targetdir, ignore_errors=True)
304 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
306 def _test_join(self, server, netbios_name):
307 tmpdir = os.path.join(self.tempdir, "targetdir")
308 creds = self.get_credentials()
309 (result, out, err) = self.runsubcmd("domain", "join",
311 "dc", "-U%s%%%s" % (creds.get_username(),
312 creds.get_password()),
313 '--targetdir=%s' % tmpdir,
314 '--server=%s' % server,
315 "--option=netbios name = %s" % netbios_name)
316 self.assertCmdSuccess(result, out, err)
319 def _test_force_demote(self, server, netbios_name):
320 creds = self.get_credentials()
321 (result, out, err) = self.runsubcmd("domain", "demote",
322 "-U%s%%%s" % (creds.get_username(),
323 creds.get_password()),
324 '--server=%s' % server,
325 "--remove-other-dead-server=%s" % netbios_name)
326 self.assertCmdSuccess(result, out, err)
328 def test_offline_manual_seized_ridalloc_with_dbcheck(self):
329 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
330 but do not create the RID set. Confirm that dbcheck correctly creates
335 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
336 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
338 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
340 # Connect to the database
341 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
342 lp = self.get_loadparm()
344 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
345 session_info=system_session(lp), lp=lp)
347 serviceName = new_ldb.get_dsServiceName()
350 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
351 ldb.FLAG_MOD_REPLACE,
356 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
357 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
358 # 2. Get server reference
359 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
361 # Assert that no RID Set has been set
362 res = new_ldb.search(base=server_ref_dn,
363 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
365 self.assertFalse("rIDSetReferences" in res[0])
367 smbconf = os.path.join(targetdir, "etc/smb.conf")
369 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
371 self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
373 # 3. Assert we get the RID Set
374 res = new_ldb.search(base=server_ref_dn,
375 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
377 self.assertTrue("rIDSetReferences" in res[0])
379 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
380 shutil.rmtree(targetdir, ignore_errors=True)
382 def test_offline_manual_seized_ridalloc_add_user(self):
383 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
384 but do not create the RID set. Confirm that user-add correctly creates
386 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
387 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
389 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
391 # Connect to the database
392 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
393 lp = self.get_loadparm()
395 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
396 session_info=system_session(lp), lp=lp)
398 serviceName = new_ldb.get_dsServiceName()
401 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
402 ldb.FLAG_MOD_REPLACE,
407 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
408 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
409 # 2. Get server reference
410 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
412 # Assert that no RID Set has been set
413 res = new_ldb.search(base=server_ref_dn,
414 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
416 self.assertFalse("rIDSetReferences" in res[0])
418 smbconf = os.path.join(targetdir, "etc/smb.conf")
420 new_ldb.newuser("ridalloctestuser", "P@ssword!")
422 # 3. Assert we get the RID Set
423 res = new_ldb.search(base=server_ref_dn,
424 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
426 self.assertTrue("rIDSetReferences" in res[0])
429 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
430 shutil.rmtree(targetdir, ignore_errors=True)
432 def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
433 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
434 but do not create the RID set. Confirm that user-add correctly creates
436 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
437 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
439 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
441 # Connect to the database
442 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
443 lp = self.get_loadparm()
445 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
446 session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
448 serviceName = new_ldb.get_dsServiceName()
451 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
452 ldb.FLAG_MOD_REPLACE,
457 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
458 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
459 # 2. Get server reference
460 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
462 # Assert that no RID Set has been set
463 res = new_ldb.search(base=server_ref_dn,
464 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
466 self.assertFalse("rIDSetReferences" in res[0])
468 smbconf = os.path.join(targetdir, "etc/smb.conf")
470 # Create a user to allocate a RID Set for itself (the RID master)
471 new_ldb.newuser("ridalloctestuser", "P@ssword!")
473 # 3. Assert we get the RID Set
474 res = new_ldb.search(base=server_ref_dn,
475 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
477 self.assertTrue("rIDSetReferences" in res[0])
480 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
481 shutil.rmtree(targetdir, ignore_errors=True)
483 def test_join_time_ridalloc(self):
484 """Perform a join against the RID manager and assert we have a RID Set"""
486 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
487 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
489 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
491 # Connect to the database
492 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
493 smbconf = os.path.join(targetdir, "etc/smb.conf")
495 lp = self.get_loadparm()
496 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
497 session_info=system_session(lp), lp=lp)
500 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
501 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
502 # 2. Get server reference
503 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
505 # 3. Assert we get the RID Set
506 res = new_ldb.search(base=server_ref_dn,
507 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
509 self.assertTrue("rIDSetReferences" in res[0])
511 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
512 shutil.rmtree(targetdir, ignore_errors=True)
514 def test_rid_set_dbcheck(self):
515 """Perform a join against the RID manager and assert we have a RID Set.
516 Using dbcheck, we assert that we can detect out of range users."""
518 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
519 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
521 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
523 # Connect to the database
524 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
525 smbconf = os.path.join(targetdir, "etc/smb.conf")
527 lp = self.get_loadparm()
528 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
529 session_info=system_session(lp), lp=lp)
532 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
533 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
534 # 2. Get server reference
535 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
537 # 3. Assert we get the RID Set
538 res = new_ldb.search(base=server_ref_dn,
539 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
541 self.assertTrue("rIDSetReferences" in res[0])
542 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
544 # 4. Add a new user (triggers RID set work)
545 new_ldb.newuser("ridalloctestuser", "P@ssword!")
547 # 5. Now fetch the RID SET
548 rid_set_res = new_ldb.search(base=rid_set_dn,
549 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
550 'rIDAllocationPool'])
551 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
552 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
554 # 6. Add user above the ridNextRid and at mid-range.
556 # We can do this with safety because this is an offline DB that will be
559 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
560 m.dn.add_base(new_ldb.get_default_basedn())
561 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
562 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
565 new_ldb.add(m, controls=["relax:0"])
567 # 7. Check the RID Set
568 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
570 # Should have one error (wrong rIDNextRID)
571 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
573 # 8. Assert we get didn't show any other errors
574 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
576 rid_set_res = new_ldb.search(base=rid_set_dn,
577 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
578 'rIDAllocationPool'])
579 last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
580 self.assertEquals(last_allocated_rid, last_rid - 10)
582 # 9. Assert that the range wasn't thrown away
584 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
585 self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
587 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
588 shutil.rmtree(targetdir, ignore_errors=True)
591 def test_rid_set_dbcheck_after_seize(self):
592 """Perform a join against the RID manager and assert we have a RID Set.
593 We seize the RID master role, then using dbcheck, we assert that we can
594 detect out of range users (and then bump the RID set as required)."""
596 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
597 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
599 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
601 # Connect to the database
602 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
603 smbconf = os.path.join(targetdir, "etc/smb.conf")
605 lp = self.get_loadparm()
606 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
607 session_info=system_session(lp), lp=lp)
610 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
611 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
612 # 2. Get server reference
613 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
615 # 3. Assert we get the RID Set
616 res = new_ldb.search(base=server_ref_dn,
617 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
619 self.assertTrue("rIDSetReferences" in res[0])
620 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
621 # 4. Seize the RID Manager role
622 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
623 self.assertCmdSuccess(result, out, err)
624 self.assertEquals(err,"","Shouldn't be any error messages")
626 # 5. Add a new user (triggers RID set work)
627 new_ldb.newuser("ridalloctestuser", "P@ssword!")
629 # 6. Now fetch the RID SET
630 rid_set_res = new_ldb.search(base=rid_set_dn,
631 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
632 'rIDAllocationPool'])
633 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
634 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
636 # 7. Add user above the ridNextRid and at almost the end of the range.
639 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
640 m.dn.add_base(new_ldb.get_default_basedn())
641 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
642 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
645 new_ldb.add(m, controls=["relax:0"])
647 # 8. Add user above the ridNextRid and at the end of the range
649 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
650 m.dn.add_base(new_ldb.get_default_basedn())
651 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
652 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
655 new_ldb.add(m, controls=["relax:0"])
657 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
659 # Should have fixed two errors (wrong ridNextRid)
660 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
662 # 9. Assert we get didn't show any other errors
663 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
665 # 10. Add another user (checks RID rollover)
666 # We have seized the role, so we can do that.
667 new_ldb.newuser("ridalloctestuser3", "P@ssword!")
669 rid_set_res = new_ldb.search(base=rid_set_dn,
670 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
671 'rIDAllocationPool'])
672 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
673 self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
675 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
676 shutil.rmtree(targetdir, ignore_errors=True)