2 # -*- coding: utf-8 -*-
4 # Tests various RID allocation scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 # export DC1=dc1_dns_name
27 # export DC2=dc2_dns_name
28 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 from ldb import SCOPE_BASE
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
42 import shutil, tempfile, os
43 from samba.auth import system_session, admin_session
44 from samba.dbchecker import dbcheck
45 from samba.ndr import ndr_pack
46 from samba.dcerpc import security
49 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
50 """Intended as a semi-black box test case for DsGetNCChanges
51 implementation for extended operations. It should be testing
52 how DsGetNCChanges handles different input params (mostly invalid).
53 Final goal is to make DsGetNCChanges as binary compatible to
54 Windows implementation as possible"""
57 super(DrsReplicaSyncTestCase, self).setUp()
60 super(DrsReplicaSyncTestCase, self).tearDown()
62 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
63 """Returns (owner, not_owner) pair where:
64 owner: dns name for FSMO owner
65 not_owner: dns name for DC not owning the FSMO"""
66 # collect info to return later
67 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
68 "invocation_id": self.ldb_dc1.get_invocation_id(),
69 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
70 "server_dn": self.ldb_dc1.get_serverName()}
71 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
72 "invocation_id": self.ldb_dc2.get_invocation_id(),
73 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
74 "server_dn": self.ldb_dc2.get_serverName()}
76 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
77 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8'))
78 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
80 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
81 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8'))
82 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
84 # determine the owner dc
85 res = self.ldb_dc1.search(fsmo_obj_dn,
86 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
87 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!" %fsmo_obj_dn
88 fsmo_owner = res[0]["fSMORoleOwner"][0]
89 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
90 return (fsmo_info_1, fsmo_info_2)
91 return (fsmo_info_2, fsmo_info_1)
93 def _check_exop_failed(self, ctr6, expected_failure):
94 self.assertEqual(ctr6.extended_ret, expected_failure)
95 #self.assertEqual(ctr6.object_count, 0)
96 #self.assertEqual(ctr6.first_object, None)
97 self.assertEqual(ctr6.more_data, False)
98 self.assertEqual(ctr6.nc_object_count, 0)
99 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
100 self.assertEqual(ctr6.linked_attributes_count, 0)
101 self.assertEqual(ctr6.linked_attributes, [])
102 self.assertEqual(ctr6.drs_error[0], 0)
104 def test_InvalidDestDSA_ridalloc(self):
105 """Test RID allocation with invalid destination DSA guid"""
106 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
107 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
109 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
110 invocation_id=fsmo_owner["invocation_id"],
112 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
114 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
115 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
116 self.assertEqual(level, 6, "Expected level 6 response!")
117 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
118 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
119 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
121 def test_do_ridalloc(self):
122 """Test doing a RID allocation with a valid destination DSA guid"""
123 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
124 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
126 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
127 invocation_id=fsmo_owner["invocation_id"],
129 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
131 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
132 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
133 self.assertEqual(level, 6, "Expected level 6 response!")
134 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
135 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
137 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
138 self.assertEqual(ctr6.object_count, 3)
139 self.assertNotEqual(ctr6.first_object, None)
140 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
141 self.assertNotEqual(ctr6.first_object.next_object, None)
142 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
143 second_object = ctr6.first_object.next_object.object
144 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
145 third_object = ctr6.first_object.next_object.next_object.object
146 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
148 self.assertEqual(ctr6.more_data, False)
149 self.assertEqual(ctr6.nc_object_count, 0)
150 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
151 self.assertEqual(ctr6.drs_error[0], 0)
152 # We don't check the linked_attributes_count as if the domain
153 # has an RODC, it can gain links on the server account object
155 def test_do_ridalloc_get_anc(self):
156 """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
157 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
158 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
160 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
161 invocation_id=fsmo_owner["invocation_id"],
163 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
164 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
166 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
167 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
168 self.assertEqual(level, 6, "Expected level 6 response!")
169 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
170 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
172 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
173 self.assertEqual(ctr6.object_count, 3)
174 self.assertNotEqual(ctr6.first_object, None)
175 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
176 self.assertNotEqual(ctr6.first_object.next_object, None)
177 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
178 second_object = ctr6.first_object.next_object.object
179 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
180 third_object = ctr6.first_object.next_object.next_object.object
181 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
182 self.assertEqual(ctr6.more_data, False)
183 self.assertEqual(ctr6.nc_object_count, 0)
184 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
185 self.assertEqual(ctr6.drs_error[0], 0)
186 # We don't check the linked_attributes_count as if the domain
187 # has an RODC, it can gain links on the server account object
189 def test_edit_rid_master(self):
190 """Test doing a RID allocation after changing the RID master from the original one.
191 This should set rIDNextRID to 0 on the new RID master."""
192 # 1. a. Transfer role to non-RID master
193 # b. Check that it succeeds correctly
195 # 2. a. Call the RID alloc against the former master.
196 # b. Check that it succeeds.
197 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
198 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
200 # 1. Swap RID master role
202 m.dn = ldb.Dn(self.ldb_dc1, "")
203 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
206 # Make sure that ldb_dc1 == RID Master
208 server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
210 # self.ldb_dc1 == LOCALDC
211 if server_dn == fsmo_owner['server_dn']:
212 # ldb_dc1 == VAMPIREDC
213 ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
215 # Otherwise switch the two
216 ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
219 # ldb_dc1 is now RID MASTER (as VAMPIREDC)
221 except ldb.LdbError as e1:
223 self.fail("Failed to reassign RID Master " + msg)
226 # 2. Perform a RID alloc
227 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
228 invocation_id=fsmo_not_owner["invocation_id"],
230 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
232 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
233 # 3. Make sure the allocation succeeds
235 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
236 except RuntimeError as e:
237 self.fail("RID allocation failed: " + str(e))
239 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
241 self.assertEqual(level, 6, "Expected level 6 response!")
242 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
243 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
245 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
246 self.assertEqual(ctr6.object_count, 3)
247 self.assertNotEqual(ctr6.first_object, None)
248 self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
249 self.assertNotEqual(ctr6.first_object.next_object, None)
250 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
251 second_object = ctr6.first_object.next_object.object
252 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
253 third_object = ctr6.first_object.next_object.next_object.object
254 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
256 # Swap the RID master back for other tests
258 m.dn = ldb.Dn(ldb_dc2, "")
259 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
262 except ldb.LdbError as e:
264 self.fail("Failed to restore RID Master " + msg)
266 def test_offline_samba_tool_seized_ridalloc(self):
267 """Perform a join against the non-RID manager and then seize the RID Manager role"""
269 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
270 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
272 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
274 # Connect to the database
275 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
276 smbconf = os.path.join(targetdir, "etc/smb.conf")
278 lp = self.get_loadparm()
279 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
280 session_info=system_session(lp), lp=lp)
283 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
284 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
285 # 2. Get server reference
286 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
288 # Assert that no RID Set has been set
289 res = new_ldb.search(base=server_ref_dn,
290 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
292 self.assertFalse("rIDSetReferences" in res[0])
294 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
295 self.assertCmdSuccess(result, out, err)
296 self.assertEquals(err, "", "Shouldn't be any error messages")
298 # 3. Assert we get the RID Set
299 res = new_ldb.search(base=server_ref_dn,
300 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
302 self.assertTrue("rIDSetReferences" in res[0])
304 shutil.rmtree(targetdir, ignore_errors=True)
305 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
307 def _test_join(self, server, netbios_name):
308 tmpdir = os.path.join(self.tempdir, "targetdir")
309 creds = self.get_credentials()
310 (result, out, err) = self.runsubcmd("domain", "join",
312 "dc", "-U%s%%%s" % (creds.get_username(),
313 creds.get_password()),
314 '--targetdir=%s' % tmpdir,
315 '--server=%s' % server,
316 "--option=netbios name = %s" % netbios_name)
317 self.assertCmdSuccess(result, out, err)
320 def _test_force_demote(self, server, netbios_name):
321 creds = self.get_credentials()
322 (result, out, err) = self.runsubcmd("domain", "demote",
323 "-U%s%%%s" % (creds.get_username(),
324 creds.get_password()),
325 '--server=%s' % server,
326 "--remove-other-dead-server=%s" % netbios_name)
327 self.assertCmdSuccess(result, out, err)
329 def test_offline_manual_seized_ridalloc_with_dbcheck(self):
330 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
331 but do not create the RID set. Confirm that dbcheck correctly creates
336 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
337 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
339 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
341 # Connect to the database
342 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
343 lp = self.get_loadparm()
345 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
346 session_info=system_session(lp), lp=lp)
348 serviceName = new_ldb.get_dsServiceName()
351 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
352 ldb.FLAG_MOD_REPLACE,
357 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
358 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
359 # 2. Get server reference
360 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
362 # Assert that no RID Set has been set
363 res = new_ldb.search(base=server_ref_dn,
364 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
366 self.assertFalse("rIDSetReferences" in res[0])
368 smbconf = os.path.join(targetdir, "etc/smb.conf")
370 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
372 self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
374 # 3. Assert we get the RID Set
375 res = new_ldb.search(base=server_ref_dn,
376 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
378 self.assertTrue("rIDSetReferences" in res[0])
380 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
381 shutil.rmtree(targetdir, ignore_errors=True)
383 def test_offline_manual_seized_ridalloc_add_user(self):
384 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
385 but do not create the RID set. Confirm that user-add correctly creates
387 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
388 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
390 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
392 # Connect to the database
393 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
394 lp = self.get_loadparm()
396 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
397 session_info=system_session(lp), lp=lp)
399 serviceName = new_ldb.get_dsServiceName()
402 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
403 ldb.FLAG_MOD_REPLACE,
408 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
409 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
410 # 2. Get server reference
411 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
413 # Assert that no RID Set has been set
414 res = new_ldb.search(base=server_ref_dn,
415 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
417 self.assertFalse("rIDSetReferences" in res[0])
419 smbconf = os.path.join(targetdir, "etc/smb.conf")
421 new_ldb.newuser("ridalloctestuser", "P@ssword!")
423 # 3. Assert we get the RID Set
424 res = new_ldb.search(base=server_ref_dn,
425 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
427 self.assertTrue("rIDSetReferences" in res[0])
430 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
431 shutil.rmtree(targetdir, ignore_errors=True)
433 def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
434 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
435 but do not create the RID set. Confirm that user-add correctly creates
437 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
438 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
440 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
442 # Connect to the database
443 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
444 lp = self.get_loadparm()
446 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
447 session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
449 serviceName = new_ldb.get_dsServiceName()
452 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
453 ldb.FLAG_MOD_REPLACE,
458 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
459 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
460 # 2. Get server reference
461 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
463 # Assert that no RID Set has been set
464 res = new_ldb.search(base=server_ref_dn,
465 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
467 self.assertFalse("rIDSetReferences" in res[0])
469 smbconf = os.path.join(targetdir, "etc/smb.conf")
471 # Create a user to allocate a RID Set for itself (the RID master)
472 new_ldb.newuser("ridalloctestuser", "P@ssword!")
474 # 3. Assert we get the RID Set
475 res = new_ldb.search(base=server_ref_dn,
476 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
478 self.assertTrue("rIDSetReferences" in res[0])
481 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
482 shutil.rmtree(targetdir, ignore_errors=True)
484 def test_join_time_ridalloc(self):
485 """Perform a join against the RID manager and assert we have a RID Set"""
487 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
488 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
490 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
492 # Connect to the database
493 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
494 smbconf = os.path.join(targetdir, "etc/smb.conf")
496 lp = self.get_loadparm()
497 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
498 session_info=system_session(lp), lp=lp)
501 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
502 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
503 # 2. Get server reference
504 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
506 # 3. Assert we get the RID Set
507 res = new_ldb.search(base=server_ref_dn,
508 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
510 self.assertTrue("rIDSetReferences" in res[0])
512 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
513 shutil.rmtree(targetdir, ignore_errors=True)
515 def test_rid_set_dbcheck(self):
516 """Perform a join against the RID manager and assert we have a RID Set.
517 Using dbcheck, we assert that we can detect out of range users."""
519 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
520 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
522 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
524 # Connect to the database
525 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
526 smbconf = os.path.join(targetdir, "etc/smb.conf")
528 lp = self.get_loadparm()
529 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
530 session_info=system_session(lp), lp=lp)
533 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
534 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
535 # 2. Get server reference
536 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
538 # 3. Assert we get the RID Set
539 res = new_ldb.search(base=server_ref_dn,
540 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
542 self.assertTrue("rIDSetReferences" in res[0])
543 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
545 # 4. Add a new user (triggers RID set work)
546 new_ldb.newuser("ridalloctestuser", "P@ssword!")
548 # 5. Now fetch the RID SET
549 rid_set_res = new_ldb.search(base=rid_set_dn,
550 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
551 'rIDAllocationPool'])
552 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
553 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
555 # 6. Add user above the ridNextRid and at mid-range.
557 # We can do this with safety because this is an offline DB that will be
560 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
561 m.dn.add_base(new_ldb.get_default_basedn())
562 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
563 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
566 new_ldb.add(m, controls=["relax:0"])
568 # 7. Check the RID Set
569 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
571 # Should have one error (wrong rIDNextRID)
572 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
574 # 8. Assert we get didn't show any other errors
575 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
577 rid_set_res = new_ldb.search(base=rid_set_dn,
578 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
579 'rIDAllocationPool'])
580 last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
581 self.assertEquals(last_allocated_rid, last_rid - 10)
583 # 9. Assert that the range wasn't thrown away
585 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
586 self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
588 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
589 shutil.rmtree(targetdir, ignore_errors=True)
592 def test_rid_set_dbcheck_after_seize(self):
593 """Perform a join against the RID manager and assert we have a RID Set.
594 We seize the RID master role, then using dbcheck, we assert that we can
595 detect out of range users (and then bump the RID set as required)."""
597 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
598 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
600 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
602 # Connect to the database
603 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
604 smbconf = os.path.join(targetdir, "etc/smb.conf")
606 lp = self.get_loadparm()
607 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
608 session_info=system_session(lp), lp=lp)
611 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
612 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
613 # 2. Get server reference
614 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
616 # 3. Assert we get the RID Set
617 res = new_ldb.search(base=server_ref_dn,
618 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
620 self.assertTrue("rIDSetReferences" in res[0])
621 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
622 # 4. Seize the RID Manager role
623 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
624 self.assertCmdSuccess(result, out, err)
625 self.assertEquals(err, "", "Shouldn't be any error messages")
627 # 5. Add a new user (triggers RID set work)
628 new_ldb.newuser("ridalloctestuser", "P@ssword!")
630 # 6. Now fetch the RID SET
631 rid_set_res = new_ldb.search(base=rid_set_dn,
632 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
633 'rIDAllocationPool'])
634 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
635 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
637 # 7. Add user above the ridNextRid and at almost the end of the range.
640 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
641 m.dn.add_base(new_ldb.get_default_basedn())
642 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
643 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
646 new_ldb.add(m, controls=["relax:0"])
648 # 8. Add user above the ridNextRid and at the end of the range
650 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
651 m.dn.add_base(new_ldb.get_default_basedn())
652 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
653 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
656 new_ldb.add(m, controls=["relax:0"])
658 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
660 # Should have fixed two errors (wrong ridNextRid)
661 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
663 # 9. Assert we get didn't show any other errors
664 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
666 # 10. Add another user (checks RID rollover)
667 # We have seized the role, so we can do that.
668 new_ldb.newuser("ridalloctestuser3", "P@ssword!")
670 rid_set_res = new_ldb.search(base=rid_set_dn,
671 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
672 'rIDAllocationPool'])
673 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
674 self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
676 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
677 shutil.rmtree(targetdir, ignore_errors=True)