2 # -*- coding: utf-8 -*-
4 # Tests various RID allocation scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 # export DC1=dc1_dns_name
27 # export DC2=dc2_dns_name
28 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 from ldb import SCOPE_BASE
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
42 import shutil, tempfile, os
43 from samba.netcmd.main import cmd_sambatool
44 from samba.auth import system_session, admin_session
45 from samba.dbchecker import dbcheck
46 from samba.ndr import ndr_pack
47 from samba.dcerpc import security
49 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
50 """Intended as a semi-black box test case for DsGetNCChanges
51 implementation for extended operations. It should be testing
52 how DsGetNCChanges handles different input params (mostly invalid).
53 Final goal is to make DsGetNCChanges as binary compatible to
54 Windows implementation as possible"""
57 super(DrsReplicaSyncTestCase, self).setUp()
60 super(DrsReplicaSyncTestCase, self).tearDown()
62 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
63 """Returns (owner, not_owner) pair where:
64 owner: dns name for FSMO owner
65 not_owner: dns name for DC not owning the FSMO"""
66 # collect info to return later
67 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
68 "invocation_id": self.ldb_dc1.get_invocation_id(),
69 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
70 "server_dn": self.ldb_dc1.get_serverName()}
71 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
72 "invocation_id": self.ldb_dc2.get_invocation_id(),
73 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
74 "server_dn": self.ldb_dc2.get_serverName()}
76 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
77 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
78 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
80 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
81 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
82 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
84 # determine the owner dc
85 res = self.ldb_dc1.search(fsmo_obj_dn,
86 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
87 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
88 fsmo_owner = res[0]["fSMORoleOwner"][0]
89 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
90 return (fsmo_info_1, fsmo_info_2)
91 return (fsmo_info_2, fsmo_info_1)
93 def _check_exop_failed(self, ctr6, expected_failure):
94 self.assertEqual(ctr6.extended_ret, expected_failure)
95 #self.assertEqual(ctr6.object_count, 0)
96 #self.assertEqual(ctr6.first_object, None)
97 self.assertEqual(ctr6.more_data, False)
98 self.assertEqual(ctr6.nc_object_count, 0)
99 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
100 self.assertEqual(ctr6.linked_attributes_count, 0)
101 self.assertEqual(ctr6.linked_attributes, [])
102 self.assertEqual(ctr6.drs_error[0], 0)
104 def test_InvalidDestDSA_ridalloc(self):
105 """Test RID allocation with invalid destination DSA guid"""
106 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
107 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
109 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
110 invocation_id=fsmo_owner["invocation_id"],
112 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
114 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
115 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
116 self.assertEqual(level, 6, "Expected level 6 response!")
117 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
118 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
119 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
121 def test_do_ridalloc(self):
122 """Test doing a RID allocation with a valid destination DSA guid"""
123 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
124 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
126 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
127 invocation_id=fsmo_owner["invocation_id"],
129 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
131 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
132 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
133 self.assertEqual(level, 6, "Expected level 6 response!")
134 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
135 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
137 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
138 self.assertEqual(ctr6.object_count, 3)
139 self.assertNotEqual(ctr6.first_object, None)
140 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
141 self.assertNotEqual(ctr6.first_object.next_object, None)
142 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
143 second_object = ctr6.first_object.next_object.object
144 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
145 third_object = ctr6.first_object.next_object.next_object.object
146 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
148 self.assertEqual(ctr6.more_data, False)
149 self.assertEqual(ctr6.nc_object_count, 0)
150 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
151 self.assertEqual(ctr6.drs_error[0], 0)
152 # We don't check the linked_attributes_count as if the domain
153 # has an RODC, it can gain links on the server account object
155 def test_do_ridalloc_get_anc(self):
156 """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
157 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
158 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
160 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
161 invocation_id=fsmo_owner["invocation_id"],
163 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
164 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
166 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
167 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
168 self.assertEqual(level, 6, "Expected level 6 response!")
169 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
170 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
172 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
173 self.assertEqual(ctr6.object_count, 3)
174 self.assertNotEqual(ctr6.first_object, None)
175 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
176 self.assertNotEqual(ctr6.first_object.next_object, None)
177 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
178 second_object = ctr6.first_object.next_object.object
179 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
180 third_object = ctr6.first_object.next_object.next_object.object
181 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
182 self.assertEqual(ctr6.more_data, False)
183 self.assertEqual(ctr6.nc_object_count, 0)
184 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
185 self.assertEqual(ctr6.drs_error[0], 0)
186 # We don't check the linked_attributes_count as if the domain
187 # has an RODC, it can gain links on the server account object
189 def test_edit_rid_master(self):
190 """Test doing a RID allocation after changing the RID master from the original one.
191 This should set rIDNextRID to 0 on the new RID master."""
192 # 1. a. Transfer role to non-RID master
193 # b. Check that it succeeds correctly
195 # 2. a. Call the RID alloc against the former master.
196 # b. Check that it succeeds.
197 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
198 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
200 # 1. Swap RID master role
202 m.dn = ldb.Dn(self.ldb_dc1, "")
203 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
206 # Make sure that ldb_dc1 == RID Master
208 server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
210 # self.ldb_dc1 == LOCALDC
211 if server_dn == fsmo_owner['server_dn']:
212 # ldb_dc1 == VAMPIREDC
213 ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
215 # Otherwise switch the two
216 ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
219 # ldb_dc1 is now RID MASTER (as VAMPIREDC)
221 except ldb.LdbError, (num, msg):
222 self.fail("Failed to reassign RID Master " + msg)
225 # 2. Perform a RID alloc
226 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
227 invocation_id=fsmo_not_owner["invocation_id"],
229 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
231 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
232 # 3. Make sure the allocation succeeds
234 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
235 except RuntimeError, e:
236 self.fail("RID allocation failed: " + str(e))
238 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
240 self.assertEqual(level, 6, "Expected level 6 response!")
241 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
242 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
244 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
245 self.assertEqual(ctr6.object_count, 3)
246 self.assertNotEqual(ctr6.first_object, None)
247 self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
248 self.assertNotEqual(ctr6.first_object.next_object, None)
249 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
250 second_object = ctr6.first_object.next_object.object
251 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
252 third_object = ctr6.first_object.next_object.next_object.object
253 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
255 # Swap the RID master back for other tests
257 m.dn = ldb.Dn(ldb_dc2, "")
258 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
261 except ldb.LdbError, (num, msg):
262 self.fail("Failed to restore RID Master " + msg)
264 def test_offline_samba_tool_seized_ridalloc(self):
265 """Perform a join against the non-RID manager and then seize the RID Manager role"""
267 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
268 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
270 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
272 # Connect to the database
273 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
274 smbconf = os.path.join(targetdir, "etc/smb.conf")
276 lp = self.get_loadparm()
277 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
278 session_info=system_session(lp), lp=lp)
281 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
282 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
283 # 2. Get server reference
284 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
286 # Assert that no RID Set has been set
287 res = new_ldb.search(base=server_ref_dn,
288 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
290 self.assertFalse("rIDSetReferences" in res[0])
292 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
293 self.assertCmdSuccess(result, out, err)
294 self.assertEquals(err,"","Shouldn't be any error messages")
296 # 3. Assert we get the RID Set
297 res = new_ldb.search(base=server_ref_dn,
298 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
300 self.assertTrue("rIDSetReferences" in res[0])
302 shutil.rmtree(targetdir, ignore_errors=True)
303 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
305 def _test_join(self, server, netbios_name):
306 tmpdir = os.path.join(self.tempdir, "targetdir")
307 creds = self.get_credentials()
308 cmd = cmd_sambatool.subcommands['domain'].subcommands['join']
309 result = cmd._run("samba-tool domain join",
311 "dc", "-U%s%%%s" % (creds.get_username(),
312 creds.get_password()),
313 '--targetdir=%s' % tmpdir,
314 '--server=%s' % server,
315 "--option=netbios name = %s" % netbios_name)
318 def _test_force_demote(self, server, netbios_name):
319 creds = self.get_credentials()
320 cmd = cmd_sambatool.subcommands['domain'].subcommands['demote']
321 result = cmd._run("samba-tool domain demote",
322 "-U%s%%%s" % (creds.get_username(),
323 creds.get_password()),
324 '--server=%s' % server,
325 "--remove-other-dead-server=%s" % netbios_name)
327 def test_offline_manual_seized_ridalloc_with_dbcheck(self):
328 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
329 but do not create the RID set. Confirm that dbcheck correctly creates
334 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
335 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
337 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
339 # Connect to the database
340 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
341 lp = self.get_loadparm()
343 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
344 session_info=system_session(lp), lp=lp)
346 serviceName = new_ldb.get_dsServiceName()
349 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
350 ldb.FLAG_MOD_REPLACE,
355 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
356 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
357 # 2. Get server reference
358 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
360 # Assert that no RID Set has been set
361 res = new_ldb.search(base=server_ref_dn,
362 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
364 self.assertFalse("rIDSetReferences" in res[0])
366 smbconf = os.path.join(targetdir, "etc/smb.conf")
368 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
370 self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
372 # 3. Assert we get the RID Set
373 res = new_ldb.search(base=server_ref_dn,
374 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
376 self.assertTrue("rIDSetReferences" in res[0])
378 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
379 shutil.rmtree(targetdir, ignore_errors=True)
381 def test_offline_manual_seized_ridalloc_add_user(self):
382 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
383 but do not create the RID set. Confirm that user-add correctly creates
385 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
386 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
388 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
390 # Connect to the database
391 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
392 lp = self.get_loadparm()
394 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
395 session_info=system_session(lp), lp=lp)
397 serviceName = new_ldb.get_dsServiceName()
400 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
401 ldb.FLAG_MOD_REPLACE,
406 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
407 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
408 # 2. Get server reference
409 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
411 # Assert that no RID Set has been set
412 res = new_ldb.search(base=server_ref_dn,
413 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
415 self.assertFalse("rIDSetReferences" in res[0])
417 smbconf = os.path.join(targetdir, "etc/smb.conf")
419 new_ldb.newuser("ridalloctestuser", "P@ssword!")
421 # 3. Assert we get the RID Set
422 res = new_ldb.search(base=server_ref_dn,
423 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
425 self.assertTrue("rIDSetReferences" in res[0])
428 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
429 shutil.rmtree(targetdir, ignore_errors=True)
431 def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
432 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
433 but do not create the RID set. Confirm that user-add correctly creates
435 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
436 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
438 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
440 # Connect to the database
441 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
442 lp = self.get_loadparm()
444 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
445 session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
447 serviceName = new_ldb.get_dsServiceName()
450 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
451 ldb.FLAG_MOD_REPLACE,
456 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
457 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
458 # 2. Get server reference
459 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
461 # Assert that no RID Set has been set
462 res = new_ldb.search(base=server_ref_dn,
463 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
465 self.assertFalse("rIDSetReferences" in res[0])
467 smbconf = os.path.join(targetdir, "etc/smb.conf")
469 # Create a user to allocate a RID Set for itself (the RID master)
470 new_ldb.newuser("ridalloctestuser", "P@ssword!")
472 # 3. Assert we get the RID Set
473 res = new_ldb.search(base=server_ref_dn,
474 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
476 self.assertTrue("rIDSetReferences" in res[0])
479 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
480 shutil.rmtree(targetdir, ignore_errors=True)
482 def test_join_time_ridalloc(self):
483 """Perform a join against the RID manager and assert we have a RID Set"""
485 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
486 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
488 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
490 # Connect to the database
491 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
492 smbconf = os.path.join(targetdir, "etc/smb.conf")
494 lp = self.get_loadparm()
495 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
496 session_info=system_session(lp), lp=lp)
499 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
500 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
501 # 2. Get server reference
502 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
504 # 3. Assert we get the RID Set
505 res = new_ldb.search(base=server_ref_dn,
506 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
508 self.assertTrue("rIDSetReferences" in res[0])
510 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
511 shutil.rmtree(targetdir, ignore_errors=True)
513 def test_rid_set_dbcheck(self):
514 """Perform a join against the RID manager and assert we have a RID Set.
515 Using dbcheck, we assert that we can detect out of range users."""
517 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
518 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
520 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
522 # Connect to the database
523 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
524 smbconf = os.path.join(targetdir, "etc/smb.conf")
526 lp = self.get_loadparm()
527 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
528 session_info=system_session(lp), lp=lp)
531 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
532 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
533 # 2. Get server reference
534 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
536 # 3. Assert we get the RID Set
537 res = new_ldb.search(base=server_ref_dn,
538 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
540 self.assertTrue("rIDSetReferences" in res[0])
541 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
543 # 4. Add a new user (triggers RID set work)
544 new_ldb.newuser("ridalloctestuser", "P@ssword!")
546 # 5. Now fetch the RID SET
547 rid_set_res = new_ldb.search(base=rid_set_dn,
548 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
549 'rIDAllocationPool'])
550 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
551 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
553 # 6. Add user above the ridNextRid and at mid-range.
555 # We can do this with safety because this is an offline DB that will be
558 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
559 m.dn.add_base(new_ldb.get_default_basedn())
560 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
561 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
564 new_ldb.add(m, controls=["relax:0"])
566 # 7. Check the RID Set
567 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
569 # Should have one error (wrong rIDNextRID)
570 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
572 # 8. Assert we get didn't show any other errors
573 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
575 rid_set_res = new_ldb.search(base=rid_set_dn,
576 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
577 'rIDAllocationPool'])
578 last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
579 self.assertEquals(last_allocated_rid, last_rid - 10)
581 # 9. Assert that the range wasn't thrown away
583 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
584 self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
586 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
587 shutil.rmtree(targetdir, ignore_errors=True)
590 def test_rid_set_dbcheck_after_seize(self):
591 """Perform a join against the RID manager and assert we have a RID Set.
592 We seize the RID master role, then using dbcheck, we assert that we can
593 detect out of range users (and then bump the RID set as required)."""
595 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
596 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
598 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
600 # Connect to the database
601 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
602 smbconf = os.path.join(targetdir, "etc/smb.conf")
604 lp = self.get_loadparm()
605 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
606 session_info=system_session(lp), lp=lp)
609 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
610 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
611 # 2. Get server reference
612 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
614 # 3. Assert we get the RID Set
615 res = new_ldb.search(base=server_ref_dn,
616 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
618 self.assertTrue("rIDSetReferences" in res[0])
619 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
621 # 4. Seize the RID Manager role
622 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
623 self.assertCmdSuccess(result, out, err)
624 self.assertEquals(err,"","Shouldn't be any error messages")
626 # 5. Add a new user (triggers RID set work)
627 new_ldb.newuser("ridalloctestuser", "P@ssword!")
629 # 6. Now fetch the RID SET
630 rid_set_res = new_ldb.search(base=rid_set_dn,
631 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
632 'rIDAllocationPool'])
633 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
634 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
636 # 7. Add user above the ridNextRid and at almost the end of the range.
639 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
640 m.dn.add_base(new_ldb.get_default_basedn())
641 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
642 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
645 new_ldb.add(m, controls=["relax:0"])
647 # 8. Add user above the ridNextRid and at the end of the range
649 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
650 m.dn.add_base(new_ldb.get_default_basedn())
651 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
652 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
655 new_ldb.add(m, controls=["relax:0"])
657 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
659 # Should have fixed two errors (wrong ridNextRid)
660 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
662 # 9. Assert we get didn't show any other errors
663 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
665 # 10. Add another user (checks RID rollover)
666 # We have seized the role, so we can do that.
667 new_ldb.newuser("ridalloctestuser3", "P@ssword!")
669 rid_set_res = new_ldb.search(base=rid_set_dn,
670 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
671 'rIDAllocationPool'])
672 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
673 self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
675 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
676 shutil.rmtree(targetdir, ignore_errors=True)