2 # -*- coding: utf-8 -*-
4 # Tests various RID allocation scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 # export DC1=dc1_dns_name
27 # export DC2=dc2_dns_name
28 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 from ldb import SCOPE_BASE
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
42 import shutil, tempfile, os
43 from samba.netcmd.main import cmd_sambatool
44 from samba.auth import system_session, admin_session
45 from samba.dbchecker import dbcheck
46 from samba.ndr import ndr_pack
47 from samba.dcerpc import security
50 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
51 replica_flags=0, max_objects=0, partial_attribute_set=None,
52 partial_attribute_set_ex=None, mapping_ctr=None):
53 req8 = drsuapi.DsGetNCChangesRequest8()
55 req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
56 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
57 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
58 req8.naming_context.dn = unicode(nc_dn_str)
59 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
60 req8.highwatermark.tmp_highest_usn = 0
61 req8.highwatermark.reserved_usn = 0
62 req8.highwatermark.highest_usn = 0
63 req8.uptodateness_vector = None
64 req8.replica_flags = replica_flags
65 req8.max_object_count = max_objects
66 req8.max_ndr_size = 402116
67 req8.extended_op = exop
69 req8.partial_attribute_set = partial_attribute_set
70 req8.partial_attribute_set_ex = partial_attribute_set_ex
72 req8.mapping_ctr = mapping_ctr
74 req8.mapping_ctr.num_mappings = 0
75 req8.mapping_ctr.mappings = None
79 def _ds_bind(self, server_name):
80 binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
82 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
83 (drs_handle, supported_extensions) = drs_DsBind(drs)
84 return (drs, drs_handle)
87 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
88 """Intended as a semi-black box test case for DsGetNCChanges
89 implementation for extended operations. It should be testing
90 how DsGetNCChanges handles different input params (mostly invalid).
91 Final goal is to make DsGetNCChanges as binary compatible to
92 Windows implementation as possible"""
95 super(DrsReplicaSyncTestCase, self).setUp()
98 super(DrsReplicaSyncTestCase, self).tearDown()
100 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
101 """Returns (owner, not_owner) pair where:
102 owner: dns name for FSMO owner
103 not_owner: dns name for DC not owning the FSMO"""
104 # collect info to return later
105 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
106 "invocation_id": self.ldb_dc1.get_invocation_id(),
107 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
108 "server_dn": self.ldb_dc1.get_serverName()}
109 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
110 "invocation_id": self.ldb_dc2.get_invocation_id(),
111 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
112 "server_dn": self.ldb_dc2.get_serverName()}
114 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
115 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
116 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
118 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
119 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
120 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
122 # determine the owner dc
123 res = self.ldb_dc1.search(fsmo_obj_dn,
124 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
125 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
126 fsmo_owner = res[0]["fSMORoleOwner"][0]
127 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
128 return (fsmo_info_1, fsmo_info_2)
129 return (fsmo_info_2, fsmo_info_1)
131 def _check_exop_failed(self, ctr6, expected_failure):
132 self.assertEqual(ctr6.extended_ret, expected_failure)
133 #self.assertEqual(ctr6.object_count, 0)
134 #self.assertEqual(ctr6.first_object, None)
135 self.assertEqual(ctr6.more_data, False)
136 self.assertEqual(ctr6.nc_object_count, 0)
137 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
138 self.assertEqual(ctr6.linked_attributes_count, 0)
139 self.assertEqual(ctr6.linked_attributes, [])
140 self.assertEqual(ctr6.drs_error[0], 0)
142 def test_InvalidDestDSA_ridalloc(self):
143 """Test RID allocation with invalid destination DSA guid"""
144 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
145 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
147 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
148 invocation_id=fsmo_owner["invocation_id"],
150 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
152 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
153 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
154 self.assertEqual(level, 6, "Expected level 6 response!")
155 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
156 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
157 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
159 def test_do_ridalloc(self):
160 """Test doing a RID allocation with a valid destination DSA guid"""
161 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
162 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
164 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
165 invocation_id=fsmo_owner["invocation_id"],
167 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
169 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
170 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
171 self.assertEqual(level, 6, "Expected level 6 response!")
172 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
173 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
175 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
176 self.assertEqual(ctr6.object_count, 3)
177 self.assertNotEqual(ctr6.first_object, None)
178 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
179 self.assertNotEqual(ctr6.first_object.next_object, None)
180 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
181 second_object = ctr6.first_object.next_object.object
182 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
183 third_object = ctr6.first_object.next_object.next_object.object
184 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
186 self.assertEqual(ctr6.more_data, False)
187 self.assertEqual(ctr6.nc_object_count, 0)
188 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
189 self.assertEqual(ctr6.drs_error[0], 0)
190 # We don't check the linked_attributes_count as if the domain
191 # has an RODC, it can gain links on the server account object
193 def test_do_ridalloc_get_anc(self):
194 """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
195 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
196 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
198 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
199 invocation_id=fsmo_owner["invocation_id"],
201 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
202 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
204 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
205 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
206 self.assertEqual(level, 6, "Expected level 6 response!")
207 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
208 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
210 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
211 self.assertEqual(ctr6.object_count, 3)
212 self.assertNotEqual(ctr6.first_object, None)
213 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
214 self.assertNotEqual(ctr6.first_object.next_object, None)
215 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
216 second_object = ctr6.first_object.next_object.object
217 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
218 third_object = ctr6.first_object.next_object.next_object.object
219 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
220 self.assertEqual(ctr6.more_data, False)
221 self.assertEqual(ctr6.nc_object_count, 0)
222 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
223 self.assertEqual(ctr6.drs_error[0], 0)
224 # We don't check the linked_attributes_count as if the domain
225 # has an RODC, it can gain links on the server account object
227 def test_edit_rid_master(self):
228 """Test doing a RID allocation after changing the RID master from the original one.
229 This should set rIDNextRID to 0 on the new RID master."""
230 # 1. a. Transfer role to non-RID master
231 # b. Check that it succeeds correctly
233 # 2. a. Call the RID alloc against the former master.
234 # b. Check that it succeeds.
235 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
236 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
238 # 1. Swap RID master role
240 m.dn = ldb.Dn(self.ldb_dc1, "")
241 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
244 # Make sure that ldb_dc1 == RID Master
246 server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
248 # self.ldb_dc1 == LOCALDC
249 if server_dn == fsmo_owner['server_dn']:
250 # ldb_dc1 == VAMPIREDC
251 ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
253 # Otherwise switch the two
254 ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
257 # ldb_dc1 is now RID MASTER (as VAMPIREDC)
259 except ldb.LdbError, (num, msg):
260 self.fail("Failed to reassign RID Master " + msg)
263 # 2. Perform a RID alloc
264 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
265 invocation_id=fsmo_not_owner["invocation_id"],
267 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
269 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
270 # 3. Make sure the allocation succeeds
272 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
273 except RuntimeError, e:
274 self.fail("RID allocation failed: " + str(e))
276 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
278 self.assertEqual(level, 6, "Expected level 6 response!")
279 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
280 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
282 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
283 self.assertEqual(ctr6.object_count, 3)
284 self.assertNotEqual(ctr6.first_object, None)
285 self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
286 self.assertNotEqual(ctr6.first_object.next_object, None)
287 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
288 second_object = ctr6.first_object.next_object.object
289 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
290 third_object = ctr6.first_object.next_object.next_object.object
291 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
293 # Swap the RID master back for other tests
295 m.dn = ldb.Dn(ldb_dc2, "")
296 m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
299 except ldb.LdbError, (num, msg):
300 self.fail("Failed to restore RID Master " + msg)
302 def test_offline_samba_tool_seized_ridalloc(self):
303 """Perform a join against the non-RID manager and then seize the RID Manager role"""
305 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
306 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
308 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
310 # Connect to the database
311 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
312 smbconf = os.path.join(targetdir, "etc/smb.conf")
314 lp = self.get_loadparm()
315 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
316 session_info=system_session(lp), lp=lp)
319 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
320 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
321 # 2. Get server reference
322 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
324 # Assert that no RID Set has been set
325 res = new_ldb.search(base=server_ref_dn,
326 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
328 self.assertFalse("rIDSetReferences" in res[0])
330 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
331 self.assertCmdSuccess(result, out, err)
332 self.assertEquals(err,"","Shouldn't be any error messages")
334 # 3. Assert we get the RID Set
335 res = new_ldb.search(base=server_ref_dn,
336 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
338 self.assertTrue("rIDSetReferences" in res[0])
340 shutil.rmtree(targetdir, ignore_errors=True)
341 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
343 def _test_join(self, server, netbios_name):
344 tmpdir = os.path.join(self.tempdir, "targetdir")
345 creds = self.get_credentials()
346 cmd = cmd_sambatool.subcommands['domain'].subcommands['join']
347 result = cmd._run("samba-tool domain join",
349 "dc", "-U%s%%%s" % (creds.get_username(),
350 creds.get_password()),
351 '--targetdir=%s' % tmpdir,
352 '--server=%s' % server,
353 "--option=netbios name = %s" % netbios_name)
356 def _test_force_demote(self, server, netbios_name):
357 creds = self.get_credentials()
358 cmd = cmd_sambatool.subcommands['domain'].subcommands['demote']
359 result = cmd._run("samba-tool domain demote",
360 "-U%s%%%s" % (creds.get_username(),
361 creds.get_password()),
362 '--server=%s' % server,
363 "--remove-other-dead-server=%s" % netbios_name)
365 def test_offline_manual_seized_ridalloc_with_dbcheck(self):
366 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
367 but do not create the RID set. Confirm that dbcheck correctly creates
372 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
373 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
375 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
377 # Connect to the database
378 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
379 lp = self.get_loadparm()
381 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
382 session_info=system_session(lp), lp=lp)
384 serviceName = new_ldb.get_dsServiceName()
387 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
388 ldb.FLAG_MOD_REPLACE,
393 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
394 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
395 # 2. Get server reference
396 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
398 # Assert that no RID Set has been set
399 res = new_ldb.search(base=server_ref_dn,
400 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
402 self.assertFalse("rIDSetReferences" in res[0])
404 smbconf = os.path.join(targetdir, "etc/smb.conf")
406 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
408 self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
410 # 3. Assert we get the RID Set
411 res = new_ldb.search(base=server_ref_dn,
412 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
414 self.assertTrue("rIDSetReferences" in res[0])
416 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
417 shutil.rmtree(targetdir, ignore_errors=True)
419 def test_offline_manual_seized_ridalloc_add_user(self):
420 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
421 but do not create the RID set. Confirm that user-add correctly creates
423 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
424 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
426 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
428 # Connect to the database
429 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
430 lp = self.get_loadparm()
432 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
433 session_info=system_session(lp), lp=lp)
435 serviceName = new_ldb.get_dsServiceName()
438 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
439 ldb.FLAG_MOD_REPLACE,
444 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
445 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
446 # 2. Get server reference
447 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
449 # Assert that no RID Set has been set
450 res = new_ldb.search(base=server_ref_dn,
451 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
453 self.assertFalse("rIDSetReferences" in res[0])
455 smbconf = os.path.join(targetdir, "etc/smb.conf")
457 new_ldb.newuser("ridalloctestuser", "P@ssword!")
459 # 3. Assert we get the RID Set
460 res = new_ldb.search(base=server_ref_dn,
461 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
463 self.assertTrue("rIDSetReferences" in res[0])
466 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
467 shutil.rmtree(targetdir, ignore_errors=True)
469 def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
470 """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
471 but do not create the RID set. Confirm that user-add correctly creates
473 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
474 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
476 targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
478 # Connect to the database
479 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
480 lp = self.get_loadparm()
482 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
483 session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
485 serviceName = new_ldb.get_dsServiceName()
488 m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
489 ldb.FLAG_MOD_REPLACE,
494 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
495 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
496 # 2. Get server reference
497 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
499 # Assert that no RID Set has been set
500 res = new_ldb.search(base=server_ref_dn,
501 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
503 self.assertFalse("rIDSetReferences" in res[0])
505 smbconf = os.path.join(targetdir, "etc/smb.conf")
507 # Create a user to allocate a RID Set for itself (the RID master)
508 new_ldb.newuser("ridalloctestuser", "P@ssword!")
510 # 3. Assert we get the RID Set
511 res = new_ldb.search(base=server_ref_dn,
512 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
514 self.assertTrue("rIDSetReferences" in res[0])
517 self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
518 shutil.rmtree(targetdir, ignore_errors=True)
520 def test_join_time_ridalloc(self):
521 """Perform a join against the RID manager and assert we have a RID Set"""
523 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
524 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
526 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
528 # Connect to the database
529 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
530 smbconf = os.path.join(targetdir, "etc/smb.conf")
532 lp = self.get_loadparm()
533 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
534 session_info=system_session(lp), lp=lp)
537 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
538 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
539 # 2. Get server reference
540 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
542 # 3. Assert we get the RID Set
543 res = new_ldb.search(base=server_ref_dn,
544 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
546 self.assertTrue("rIDSetReferences" in res[0])
548 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
549 shutil.rmtree(targetdir, ignore_errors=True)
551 def test_rid_set_dbcheck(self):
552 """Perform a join against the RID manager and assert we have a RID Set.
553 Using dbcheck, we assert that we can detect out of range users."""
555 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
556 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
558 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
560 # Connect to the database
561 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
562 smbconf = os.path.join(targetdir, "etc/smb.conf")
564 lp = self.get_loadparm()
565 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
566 session_info=system_session(lp), lp=lp)
569 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
570 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
571 # 2. Get server reference
572 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
574 # 3. Assert we get the RID Set
575 res = new_ldb.search(base=server_ref_dn,
576 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
578 self.assertTrue("rIDSetReferences" in res[0])
579 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
581 # 4. Add a new user (triggers RID set work)
582 new_ldb.newuser("ridalloctestuser", "P@ssword!")
584 # 5. Now fetch the RID SET
585 rid_set_res = new_ldb.search(base=rid_set_dn,
586 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
587 'rIDAllocationPool'])
588 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
589 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
591 # 6. Add user above the ridNextRid and at mid-range.
593 # We can do this with safety because this is an offline DB that will be
596 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
597 m.dn.add_base(new_ldb.get_default_basedn())
598 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
599 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
602 new_ldb.add(m, controls=["relax:0"])
604 # 7. Check the RID Set
605 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
607 # Should have one error (wrong rIDNextRID)
608 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
610 # 8. Assert we get didn't show any other errors
611 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
613 rid_set_res = new_ldb.search(base=rid_set_dn,
614 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
615 'rIDAllocationPool'])
616 last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
617 self.assertEquals(last_allocated_rid, last_rid - 10)
619 # 9. Assert that the range wasn't thrown away
621 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
622 self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
624 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
625 shutil.rmtree(targetdir, ignore_errors=True)
628 def test_rid_set_dbcheck_after_seize(self):
629 """Perform a join against the RID manager and assert we have a RID Set.
630 We seize the RID master role, then using dbcheck, we assert that we can
631 detect out of range users (and then bump the RID set as required)."""
633 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
634 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
636 targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
638 # Connect to the database
639 ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
640 smbconf = os.path.join(targetdir, "etc/smb.conf")
642 lp = self.get_loadparm()
643 new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
644 session_info=system_session(lp), lp=lp)
647 res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
648 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
649 # 2. Get server reference
650 server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
652 # 3. Assert we get the RID Set
653 res = new_ldb.search(base=server_ref_dn,
654 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
656 self.assertTrue("rIDSetReferences" in res[0])
657 rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
659 # 4. Seize the RID Manager role
660 (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
661 self.assertCmdSuccess(result, out, err)
662 self.assertEquals(err,"","Shouldn't be any error messages")
664 # 5. Add a new user (triggers RID set work)
665 new_ldb.newuser("ridalloctestuser", "P@ssword!")
667 # 6. Now fetch the RID SET
668 rid_set_res = new_ldb.search(base=rid_set_dn,
669 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
670 'rIDAllocationPool'])
671 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
672 last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
674 # 7. Add user above the ridNextRid and at almost the end of the range.
677 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
678 m.dn.add_base(new_ldb.get_default_basedn())
679 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
680 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
683 new_ldb.add(m, controls=["relax:0"])
685 # 8. Add user above the ridNextRid and at the end of the range
687 m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
688 m.dn.add_base(new_ldb.get_default_basedn())
689 m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
690 m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
693 new_ldb.add(m, controls=["relax:0"])
695 chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
697 # Should have fixed two errors (wrong ridNextRid)
698 self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
700 # 9. Assert we get didn't show any other errors
701 chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
703 # 10. Add another user (checks RID rollover)
704 # We have seized the role, so we can do that.
705 new_ldb.newuser("ridalloctestuser3", "P@ssword!")
707 rid_set_res = new_ldb.search(base=rid_set_dn,
708 scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
709 'rIDAllocationPool'])
710 next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
711 self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
713 self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
714 shutil.rmtree(targetdir, ignore_errors=True)