8a015f1dd05a8460cf06479c204a6566d1cdc432
[garming/samba-autobuild/.git] / source4 / torture / drs / python / ridalloc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various RID allocation scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
9 #
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 #
23
24 #
25 # Usage:
26 #  export DC1=dc1_dns_name
27 #  export DC2=dc2_dns_name
28 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 #
31
32 import drs_base
33 import samba.tests
34
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
41
42 import shutil, tempfile, os
43 from samba.auth import system_session, admin_session
44 from samba.dbchecker import dbcheck
45 from samba.ndr import ndr_pack
46 from samba.dcerpc import security
47
48
49 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
50     """Intended as a semi-black box test case for DsGetNCChanges
51        implementation for extended operations. It should be testing
52        how DsGetNCChanges handles different input params (mostly invalid).
53        Final goal is to make DsGetNCChanges as binary compatible to
54        Windows implementation as possible"""
55
56     def setUp(self):
57         super(DrsReplicaSyncTestCase, self).setUp()
58
59     def tearDown(self):
60         super(DrsReplicaSyncTestCase, self).tearDown()
61
62     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
63         """Returns (owner, not_owner) pair where:
64              owner: dns name for FSMO owner
65              not_owner: dns name for DC not owning the FSMO"""
66         # collect info to return later
67         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
68                        "invocation_id": self.ldb_dc1.get_invocation_id(),
69                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
70                        "server_dn": self.ldb_dc1.get_serverName()}
71         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
72                        "invocation_id": self.ldb_dc2.get_invocation_id(),
73                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
74                        "server_dn": self.ldb_dc2.get_serverName()}
75
76         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
77         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8'))
78         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
79
80         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
81         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8'))
82         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
83
84         # determine the owner dc
85         res = self.ldb_dc1.search(fsmo_obj_dn,
86                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
87         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!" %fsmo_obj_dn
88         fsmo_owner = res[0]["fSMORoleOwner"][0]
89         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
90             return (fsmo_info_1, fsmo_info_2)
91         return (fsmo_info_2, fsmo_info_1)
92
93     def _check_exop_failed(self, ctr6, expected_failure):
94         self.assertEqual(ctr6.extended_ret, expected_failure)
95         #self.assertEqual(ctr6.object_count, 0)
96         #self.assertEqual(ctr6.first_object, None)
97         self.assertEqual(ctr6.more_data, False)
98         self.assertEqual(ctr6.nc_object_count, 0)
99         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
100         self.assertEqual(ctr6.linked_attributes_count, 0)
101         self.assertEqual(ctr6.linked_attributes, [])
102         self.assertEqual(ctr6.drs_error[0], 0)
103
104     def test_InvalidDestDSA_ridalloc(self):
105         """Test RID allocation with invalid destination DSA guid"""
106         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
107         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
108
109         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
110                                invocation_id=fsmo_owner["invocation_id"],
111                                nc_dn_str=fsmo_dn,
112                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
113
114         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
115         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
116         self.assertEqual(level, 6, "Expected level 6 response!")
117         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
118         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
119         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
120
121     def test_do_ridalloc(self):
122         """Test doing a RID allocation with a valid destination DSA guid"""
123         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
124         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
125
126         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
127                                invocation_id=fsmo_owner["invocation_id"],
128                                nc_dn_str=fsmo_dn,
129                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
130
131         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
132         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
133         self.assertEqual(level, 6, "Expected level 6 response!")
134         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
135         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
136         ctr6 = ctr
137         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
138         self.assertEqual(ctr6.object_count, 3)
139         self.assertNotEqual(ctr6.first_object, None)
140         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
141         self.assertNotEqual(ctr6.first_object.next_object, None)
142         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
143         second_object = ctr6.first_object.next_object.object
144         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
145         third_object = ctr6.first_object.next_object.next_object.object
146         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
147
148         self.assertEqual(ctr6.more_data, False)
149         self.assertEqual(ctr6.nc_object_count, 0)
150         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
151         self.assertEqual(ctr6.drs_error[0], 0)
152         # We don't check the linked_attributes_count as if the domain
153         # has an RODC, it can gain links on the server account object
154
155     def test_do_ridalloc_get_anc(self):
156         """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
157         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
158         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
159
160         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
161                                invocation_id=fsmo_owner["invocation_id"],
162                                nc_dn_str=fsmo_dn,
163                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
164                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
165
166         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
167         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
168         self.assertEqual(level, 6, "Expected level 6 response!")
169         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
170         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
171         ctr6 = ctr
172         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
173         self.assertEqual(ctr6.object_count, 3)
174         self.assertNotEqual(ctr6.first_object, None)
175         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
176         self.assertNotEqual(ctr6.first_object.next_object, None)
177         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
178         second_object = ctr6.first_object.next_object.object
179         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
180         third_object = ctr6.first_object.next_object.next_object.object
181         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
182         self.assertEqual(ctr6.more_data, False)
183         self.assertEqual(ctr6.nc_object_count, 0)
184         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
185         self.assertEqual(ctr6.drs_error[0], 0)
186         # We don't check the linked_attributes_count as if the domain
187         # has an RODC, it can gain links on the server account object
188
189     def test_edit_rid_master(self):
190         """Test doing a RID allocation after changing the RID master from the original one.
191            This should set rIDNextRID to 0 on the new RID master."""
192         # 1. a. Transfer role to non-RID master
193         #    b. Check that it succeeds correctly
194         #
195         # 2. a. Call the RID alloc against the former master.
196         #    b. Check that it succeeds.
197         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
198         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
199
200         # 1. Swap RID master role
201         m = ldb.Message()
202         m.dn = ldb.Dn(self.ldb_dc1, "")
203         m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
204                                                   "becomeRidMaster")
205
206         # Make sure that ldb_dc1 == RID Master
207
208         server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
209
210         # self.ldb_dc1 == LOCALDC
211         if server_dn == fsmo_owner['server_dn']:
212             # ldb_dc1 == VAMPIREDC
213             ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
214         else:
215             # Otherwise switch the two
216             ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
217
218         try:
219             # ldb_dc1 is now RID MASTER (as VAMPIREDC)
220             ldb_dc1.modify(m)
221         except ldb.LdbError as e1:
222             (num, msg) = e1.args
223             self.fail("Failed to reassign RID Master " + msg)
224
225         try:
226             # 2. Perform a RID alloc
227             req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
228                                    invocation_id=fsmo_not_owner["invocation_id"],
229                                    nc_dn_str=fsmo_dn,
230                                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
231
232             (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
233             # 3. Make sure the allocation succeeds
234             try:
235                 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
236             except RuntimeError as e:
237                 self.fail("RID allocation failed: " + str(e))
238
239             fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
240
241             self.assertEqual(level, 6, "Expected level 6 response!")
242             self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
243             self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
244             ctr6 = ctr
245             self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
246             self.assertEqual(ctr6.object_count, 3)
247             self.assertNotEqual(ctr6.first_object, None)
248             self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
249             self.assertNotEqual(ctr6.first_object.next_object, None)
250             self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
251             second_object = ctr6.first_object.next_object.object
252             self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
253             third_object = ctr6.first_object.next_object.next_object.object
254             self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
255         finally:
256             # Swap the RID master back for other tests
257             m = ldb.Message()
258             m.dn = ldb.Dn(ldb_dc2, "")
259             m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
260             try:
261                 ldb_dc2.modify(m)
262             except ldb.LdbError as e:
263                 (num, msg) = e.args
264                 self.fail("Failed to restore RID Master " + msg)
265
266     def test_offline_samba_tool_seized_ridalloc(self):
267         """Perform a join against the non-RID manager and then seize the RID Manager role"""
268
269         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
270         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
271
272         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
273         try:
274             # Connect to the database
275             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
276             smbconf = os.path.join(targetdir, "etc/smb.conf")
277
278             lp = self.get_loadparm()
279             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
280                             session_info=system_session(lp), lp=lp)
281
282             # 1. Get server name
283             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
284                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
285             # 2. Get server reference
286             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
287
288             # Assert that no RID Set has been set
289             res = new_ldb.search(base=server_ref_dn,
290                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
291
292             self.assertFalse("rIDSetReferences" in res[0])
293
294             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
295             self.assertCmdSuccess(result, out, err)
296             self.assertEquals(err, "", "Shouldn't be any error messages")
297
298             # 3. Assert we get the RID Set
299             res = new_ldb.search(base=server_ref_dn,
300                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
301
302             self.assertTrue("rIDSetReferences" in res[0])
303         finally:
304             shutil.rmtree(targetdir, ignore_errors=True)
305             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
306
307     def _test_join(self, server, netbios_name):
308         tmpdir = os.path.join(self.tempdir, "targetdir")
309         creds = self.get_credentials()
310         (result, out, err) = self.runsubcmd("domain", "join",
311                                             creds.get_realm(),
312                                             "dc", "-U%s%%%s" % (creds.get_username(),
313                                                                 creds.get_password()),
314                                             '--targetdir=%s' % tmpdir,
315                                             '--server=%s' % server,
316                                             "--option=netbios name = %s" % netbios_name)
317         self.assertCmdSuccess(result, out, err)
318         return tmpdir
319
320     def _test_force_demote(self, server, netbios_name):
321         creds = self.get_credentials()
322         (result, out, err) = self.runsubcmd("domain", "demote",
323                                             "-U%s%%%s" % (creds.get_username(),
324                                                           creds.get_password()),
325                                             '--server=%s' % server,
326                                             "--remove-other-dead-server=%s" % netbios_name)
327         self.assertCmdSuccess(result, out, err)
328
329     def test_offline_manual_seized_ridalloc_with_dbcheck(self):
330         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
331         but do not create the RID set. Confirm that dbcheck correctly creates
332         the RID Set.
333
334         Also check
335         """
336         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
337         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
338
339         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
340         try:
341             # Connect to the database
342             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
343             lp = self.get_loadparm()
344
345             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
346                             session_info=system_session(lp), lp=lp)
347
348             serviceName = new_ldb.get_dsServiceName()
349             m = ldb.Message()
350             m.dn = fsmo_dn
351             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
352                                                     ldb.FLAG_MOD_REPLACE,
353                                                     "fSMORoleOwner")
354             new_ldb.modify(m)
355
356             # 1. Get server name
357             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
358                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
359             # 2. Get server reference
360             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
361
362             # Assert that no RID Set has been set
363             res = new_ldb.search(base=server_ref_dn,
364                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
365
366             self.assertFalse("rIDSetReferences" in res[0])
367
368             smbconf = os.path.join(targetdir, "etc/smb.conf")
369
370             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
371
372             self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
373
374             # 3. Assert we get the RID Set
375             res = new_ldb.search(base=server_ref_dn,
376                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
377
378             self.assertTrue("rIDSetReferences" in res[0])
379         finally:
380             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
381             shutil.rmtree(targetdir, ignore_errors=True)
382
383     def test_offline_manual_seized_ridalloc_add_user(self):
384         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
385         but do not create the RID set. Confirm that user-add correctly creates
386         the RID Set."""
387         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
388         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
389
390         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
391         try:
392             # Connect to the database
393             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
394             lp = self.get_loadparm()
395
396             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
397                             session_info=system_session(lp), lp=lp)
398
399             serviceName = new_ldb.get_dsServiceName()
400             m = ldb.Message()
401             m.dn = fsmo_dn
402             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
403                                                     ldb.FLAG_MOD_REPLACE,
404                                                     "fSMORoleOwner")
405             new_ldb.modify(m)
406
407             # 1. Get server name
408             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
409                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
410             # 2. Get server reference
411             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
412
413             # Assert that no RID Set has been set
414             res = new_ldb.search(base=server_ref_dn,
415                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
416
417             self.assertFalse("rIDSetReferences" in res[0])
418
419             smbconf = os.path.join(targetdir, "etc/smb.conf")
420
421             new_ldb.newuser("ridalloctestuser", "P@ssword!")
422
423             # 3. Assert we get the RID Set
424             res = new_ldb.search(base=server_ref_dn,
425                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
426
427             self.assertTrue("rIDSetReferences" in res[0])
428
429         finally:
430             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
431             shutil.rmtree(targetdir, ignore_errors=True)
432
433     def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
434         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
435         but do not create the RID set. Confirm that user-add correctly creates
436         the RID Set."""
437         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
438         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
439
440         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
441         try:
442             # Connect to the database
443             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
444             lp = self.get_loadparm()
445
446             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
447                             session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
448
449             serviceName = new_ldb.get_dsServiceName()
450             m = ldb.Message()
451             m.dn = fsmo_dn
452             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
453                                                     ldb.FLAG_MOD_REPLACE,
454                                                     "fSMORoleOwner")
455             new_ldb.modify(m)
456
457             # 1. Get server name
458             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
459                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
460             # 2. Get server reference
461             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
462
463             # Assert that no RID Set has been set
464             res = new_ldb.search(base=server_ref_dn,
465                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
466
467             self.assertFalse("rIDSetReferences" in res[0])
468
469             smbconf = os.path.join(targetdir, "etc/smb.conf")
470
471             # Create a user to allocate a RID Set for itself (the RID master)
472             new_ldb.newuser("ridalloctestuser", "P@ssword!")
473
474             # 3. Assert we get the RID Set
475             res = new_ldb.search(base=server_ref_dn,
476                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
477
478             self.assertTrue("rIDSetReferences" in res[0])
479
480         finally:
481             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
482             shutil.rmtree(targetdir, ignore_errors=True)
483
484     def test_join_time_ridalloc(self):
485         """Perform a join against the RID manager and assert we have a RID Set"""
486
487         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
488         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
489
490         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
491         try:
492             # Connect to the database
493             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
494             smbconf = os.path.join(targetdir, "etc/smb.conf")
495
496             lp = self.get_loadparm()
497             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
498                             session_info=system_session(lp), lp=lp)
499
500             # 1. Get server name
501             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
502                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
503             # 2. Get server reference
504             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
505
506             # 3. Assert we get the RID Set
507             res = new_ldb.search(base=server_ref_dn,
508                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
509
510             self.assertTrue("rIDSetReferences" in res[0])
511         finally:
512             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
513             shutil.rmtree(targetdir, ignore_errors=True)
514
515     def test_rid_set_dbcheck(self):
516         """Perform a join against the RID manager and assert we have a RID Set.
517         Using dbcheck, we assert that we can detect out of range users."""
518
519         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
520         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
521
522         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
523         try:
524             # Connect to the database
525             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
526             smbconf = os.path.join(targetdir, "etc/smb.conf")
527
528             lp = self.get_loadparm()
529             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
530                             session_info=system_session(lp), lp=lp)
531
532             # 1. Get server name
533             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
534                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
535             # 2. Get server reference
536             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
537
538             # 3. Assert we get the RID Set
539             res = new_ldb.search(base=server_ref_dn,
540                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
541
542             self.assertTrue("rIDSetReferences" in res[0])
543             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
544
545             # 4. Add a new user (triggers RID set work)
546             new_ldb.newuser("ridalloctestuser", "P@ssword!")
547
548             # 5. Now fetch the RID SET
549             rid_set_res = new_ldb.search(base=rid_set_dn,
550                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
551                                                                       'rIDAllocationPool'])
552             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
553             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
554
555             # 6. Add user above the ridNextRid and at mid-range.
556             #
557             # We can do this with safety because this is an offline DB that will be
558             # destroyed.
559             m = ldb.Message()
560             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
561             m.dn.add_base(new_ldb.get_default_basedn())
562             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
563             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
564                                                 ldb.FLAG_MOD_ADD,
565                                                 'objectSid')
566             new_ldb.add(m, controls=["relax:0"])
567
568             # 7. Check the RID Set
569             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
570
571             # Should have one error (wrong rIDNextRID)
572             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
573
574             # 8. Assert we get didn't show any other errors
575             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
576
577             rid_set_res = new_ldb.search(base=rid_set_dn,
578                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
579                                                                       'rIDAllocationPool'])
580             last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
581             self.assertEquals(last_allocated_rid, last_rid - 10)
582
583             # 9. Assert that the range wasn't thrown away
584
585             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
586             self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
587         finally:
588             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
589             shutil.rmtree(targetdir, ignore_errors=True)
590
591
592     def test_rid_set_dbcheck_after_seize(self):
593         """Perform a join against the RID manager and assert we have a RID Set.
594         We seize the RID master role, then using dbcheck, we assert that we can
595         detect out of range users (and then bump the RID set as required)."""
596
597         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
598         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
599
600         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
601         try:
602             # Connect to the database
603             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
604             smbconf = os.path.join(targetdir, "etc/smb.conf")
605
606             lp = self.get_loadparm()
607             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
608                             session_info=system_session(lp), lp=lp)
609
610             # 1. Get server name
611             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
612                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
613             # 2. Get server reference
614             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
615
616             # 3. Assert we get the RID Set
617             res = new_ldb.search(base=server_ref_dn,
618                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
619
620             self.assertTrue("rIDSetReferences" in res[0])
621             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
622             # 4. Seize the RID Manager role
623             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
624             self.assertCmdSuccess(result, out, err)
625             self.assertEquals(err, "", "Shouldn't be any error messages")
626
627             # 5. Add a new user (triggers RID set work)
628             new_ldb.newuser("ridalloctestuser", "P@ssword!")
629
630             # 6. Now fetch the RID SET
631             rid_set_res = new_ldb.search(base=rid_set_dn,
632                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
633                                                                       'rIDAllocationPool'])
634             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
635             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
636
637             # 7. Add user above the ridNextRid and at almost the end of the range.
638             #
639             m = ldb.Message()
640             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
641             m.dn.add_base(new_ldb.get_default_basedn())
642             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
643             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
644                                                 ldb.FLAG_MOD_ADD,
645                                                 'objectSid')
646             new_ldb.add(m, controls=["relax:0"])
647
648             # 8. Add user above the ridNextRid and at the end of the range
649             m = ldb.Message()
650             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
651             m.dn.add_base(new_ldb.get_default_basedn())
652             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
653             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
654                                                 ldb.FLAG_MOD_ADD,
655                                                 'objectSid')
656             new_ldb.add(m, controls=["relax:0"])
657
658             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
659
660             # Should have fixed two errors (wrong ridNextRid)
661             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
662
663             # 9. Assert we get didn't show any other errors
664             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
665
666             # 10. Add another user (checks RID rollover)
667             # We have seized the role, so we can do that.
668             new_ldb.newuser("ridalloctestuser3", "P@ssword!")
669
670             rid_set_res = new_ldb.search(base=rid_set_dn,
671                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
672                                                                       'rIDAllocationPool'])
673             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
674             self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
675         finally:
676             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
677             shutil.rmtree(targetdir, ignore_errors=True)