PEP8: fix E128: continuation line under-indented for visual indent
[amitay/samba.git] / source4 / torture / drs / python / ridalloc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various RID allocation scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
9 #
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 #
23
24 #
25 # Usage:
26 #  export DC1=dc1_dns_name
27 #  export DC2=dc2_dns_name
28 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 #
31
32 import drs_base
33 import samba.tests
34
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
41
42 import shutil, tempfile, os
43 from samba.auth import system_session, admin_session
44 from samba.dbchecker import dbcheck
45 from samba.ndr import ndr_pack
46 from samba.dcerpc import security
47
48 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
49     """Intended as a semi-black box test case for DsGetNCChanges
50        implementation for extended operations. It should be testing
51        how DsGetNCChanges handles different input params (mostly invalid).
52        Final goal is to make DsGetNCChanges as binary compatible to
53        Windows implementation as possible"""
54
55     def setUp(self):
56         super(DrsReplicaSyncTestCase, self).setUp()
57
58     def tearDown(self):
59         super(DrsReplicaSyncTestCase, self).tearDown()
60
61     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
62         """Returns (owner, not_owner) pair where:
63              owner: dns name for FSMO owner
64              not_owner: dns name for DC not owning the FSMO"""
65         # collect info to return later
66         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
67                        "invocation_id": self.ldb_dc1.get_invocation_id(),
68                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
69                        "server_dn": self.ldb_dc1.get_serverName()}
70         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
71                        "invocation_id": self.ldb_dc2.get_invocation_id(),
72                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
73                        "server_dn": self.ldb_dc2.get_serverName()}
74
75         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
76         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8'))
77         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
78
79         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
80         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8'))
81         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
82
83         # determine the owner dc
84         res = self.ldb_dc1.search(fsmo_obj_dn,
85                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
86         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
87         fsmo_owner = res[0]["fSMORoleOwner"][0]
88         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
89             return (fsmo_info_1, fsmo_info_2)
90         return (fsmo_info_2, fsmo_info_1)
91
92     def _check_exop_failed(self, ctr6, expected_failure):
93         self.assertEqual(ctr6.extended_ret, expected_failure)
94         #self.assertEqual(ctr6.object_count, 0)
95         #self.assertEqual(ctr6.first_object, None)
96         self.assertEqual(ctr6.more_data, False)
97         self.assertEqual(ctr6.nc_object_count, 0)
98         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
99         self.assertEqual(ctr6.linked_attributes_count, 0)
100         self.assertEqual(ctr6.linked_attributes, [])
101         self.assertEqual(ctr6.drs_error[0], 0)
102
103     def test_InvalidDestDSA_ridalloc(self):
104         """Test RID allocation with invalid destination DSA guid"""
105         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
106         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
107
108         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
109                                invocation_id=fsmo_owner["invocation_id"],
110                                nc_dn_str=fsmo_dn,
111                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
112
113         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
114         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
115         self.assertEqual(level, 6, "Expected level 6 response!")
116         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
117         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
118         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
119
120     def test_do_ridalloc(self):
121         """Test doing a RID allocation with a valid destination DSA guid"""
122         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
123         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
124
125         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
126                                invocation_id=fsmo_owner["invocation_id"],
127                                nc_dn_str=fsmo_dn,
128                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
129
130         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
131         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
132         self.assertEqual(level, 6, "Expected level 6 response!")
133         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
134         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
135         ctr6 = ctr
136         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
137         self.assertEqual(ctr6.object_count, 3)
138         self.assertNotEqual(ctr6.first_object, None)
139         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
140         self.assertNotEqual(ctr6.first_object.next_object, None)
141         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
142         second_object = ctr6.first_object.next_object.object
143         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
144         third_object = ctr6.first_object.next_object.next_object.object
145         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
146
147         self.assertEqual(ctr6.more_data, False)
148         self.assertEqual(ctr6.nc_object_count, 0)
149         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
150         self.assertEqual(ctr6.drs_error[0], 0)
151         # We don't check the linked_attributes_count as if the domain
152         # has an RODC, it can gain links on the server account object
153
154     def test_do_ridalloc_get_anc(self):
155         """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
156         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
157         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
158
159         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
160                                invocation_id=fsmo_owner["invocation_id"],
161                                nc_dn_str=fsmo_dn,
162                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
163                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
164
165         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
166         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
167         self.assertEqual(level, 6, "Expected level 6 response!")
168         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
169         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
170         ctr6 = ctr
171         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
172         self.assertEqual(ctr6.object_count, 3)
173         self.assertNotEqual(ctr6.first_object, None)
174         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
175         self.assertNotEqual(ctr6.first_object.next_object, None)
176         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
177         second_object = ctr6.first_object.next_object.object
178         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
179         third_object = ctr6.first_object.next_object.next_object.object
180         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
181         self.assertEqual(ctr6.more_data, False)
182         self.assertEqual(ctr6.nc_object_count, 0)
183         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
184         self.assertEqual(ctr6.drs_error[0], 0)
185         # We don't check the linked_attributes_count as if the domain
186         # has an RODC, it can gain links on the server account object
187
188     def test_edit_rid_master(self):
189         """Test doing a RID allocation after changing the RID master from the original one.
190            This should set rIDNextRID to 0 on the new RID master."""
191         # 1. a. Transfer role to non-RID master
192         #    b. Check that it succeeds correctly
193         #
194         # 2. a. Call the RID alloc against the former master.
195         #    b. Check that it succeeds.
196         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
197         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
198
199         # 1. Swap RID master role
200         m = ldb.Message()
201         m.dn = ldb.Dn(self.ldb_dc1, "")
202         m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
203                                                   "becomeRidMaster")
204
205         # Make sure that ldb_dc1 == RID Master
206
207         server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
208
209         # self.ldb_dc1 == LOCALDC
210         if server_dn == fsmo_owner['server_dn']:
211             # ldb_dc1 == VAMPIREDC
212             ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
213         else:
214             # Otherwise switch the two
215             ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
216
217         try:
218             # ldb_dc1 is now RID MASTER (as VAMPIREDC)
219             ldb_dc1.modify(m)
220         except ldb.LdbError as e1:
221             (num, msg) = e1.args
222             self.fail("Failed to reassign RID Master " +  msg)
223
224         try:
225             # 2. Perform a RID alloc
226             req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
227                                    invocation_id=fsmo_not_owner["invocation_id"],
228                                    nc_dn_str=fsmo_dn,
229                                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
230
231             (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
232             # 3. Make sure the allocation succeeds
233             try:
234                 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
235             except RuntimeError as e:
236                 self.fail("RID allocation failed: " + str(e))
237
238             fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
239
240             self.assertEqual(level, 6, "Expected level 6 response!")
241             self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
242             self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
243             ctr6 = ctr
244             self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
245             self.assertEqual(ctr6.object_count, 3)
246             self.assertNotEqual(ctr6.first_object, None)
247             self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
248             self.assertNotEqual(ctr6.first_object.next_object, None)
249             self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
250             second_object = ctr6.first_object.next_object.object
251             self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
252             third_object = ctr6.first_object.next_object.next_object.object
253             self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
254         finally:
255             # Swap the RID master back for other tests
256             m = ldb.Message()
257             m.dn = ldb.Dn(ldb_dc2, "")
258             m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
259             try:
260                 ldb_dc2.modify(m)
261             except ldb.LdbError as e:
262                 (num, msg) = e.args
263                 self.fail("Failed to restore RID Master " +  msg)
264
265     def test_offline_samba_tool_seized_ridalloc(self):
266         """Perform a join against the non-RID manager and then seize the RID Manager role"""
267
268         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
269         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
270
271         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
272         try:
273             # Connect to the database
274             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
275             smbconf = os.path.join(targetdir, "etc/smb.conf")
276
277             lp = self.get_loadparm()
278             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
279                             session_info=system_session(lp), lp=lp)
280
281             # 1. Get server name
282             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
283                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
284             # 2. Get server reference
285             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
286
287             # Assert that no RID Set has been set
288             res = new_ldb.search(base=server_ref_dn,
289                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
290
291             self.assertFalse("rIDSetReferences" in res[0])
292
293             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
294             self.assertCmdSuccess(result, out, err)
295             self.assertEquals(err,"","Shouldn't be any error messages")
296
297             # 3. Assert we get the RID Set
298             res = new_ldb.search(base=server_ref_dn,
299                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
300
301             self.assertTrue("rIDSetReferences" in res[0])
302         finally:
303             shutil.rmtree(targetdir, ignore_errors=True)
304             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
305
306     def _test_join(self, server, netbios_name):
307         tmpdir = os.path.join(self.tempdir, "targetdir")
308         creds = self.get_credentials()
309         (result, out, err) = self.runsubcmd("domain", "join",
310                                             creds.get_realm(),
311                                             "dc", "-U%s%%%s" % (creds.get_username(),
312                                                                 creds.get_password()),
313                                             '--targetdir=%s' % tmpdir,
314                                             '--server=%s' % server,
315                                             "--option=netbios name = %s" % netbios_name)
316         self.assertCmdSuccess(result, out, err)
317         return tmpdir
318
319     def _test_force_demote(self, server, netbios_name):
320         creds = self.get_credentials()
321         (result, out, err) = self.runsubcmd("domain", "demote",
322                                             "-U%s%%%s" % (creds.get_username(),
323                                                           creds.get_password()),
324                                             '--server=%s' % server,
325                                             "--remove-other-dead-server=%s" % netbios_name)
326         self.assertCmdSuccess(result, out, err)
327
328     def test_offline_manual_seized_ridalloc_with_dbcheck(self):
329         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
330         but do not create the RID set. Confirm that dbcheck correctly creates
331         the RID Set.
332
333         Also check
334         """
335         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
336         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
337
338         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
339         try:
340             # Connect to the database
341             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
342             lp = self.get_loadparm()
343
344             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
345                             session_info=system_session(lp), lp=lp)
346
347             serviceName = new_ldb.get_dsServiceName()
348             m = ldb.Message()
349             m.dn = fsmo_dn
350             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
351                                                     ldb.FLAG_MOD_REPLACE,
352                                                     "fSMORoleOwner")
353             new_ldb.modify(m)
354
355             # 1. Get server name
356             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
357                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
358             # 2. Get server reference
359             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
360
361             # Assert that no RID Set has been set
362             res = new_ldb.search(base=server_ref_dn,
363                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
364
365             self.assertFalse("rIDSetReferences" in res[0])
366
367             smbconf = os.path.join(targetdir, "etc/smb.conf")
368
369             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
370
371             self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
372
373             # 3. Assert we get the RID Set
374             res = new_ldb.search(base=server_ref_dn,
375                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
376
377             self.assertTrue("rIDSetReferences" in res[0])
378         finally:
379             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
380             shutil.rmtree(targetdir, ignore_errors=True)
381
382     def test_offline_manual_seized_ridalloc_add_user(self):
383         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
384         but do not create the RID set. Confirm that user-add correctly creates
385         the RID Set."""
386         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
387         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
388
389         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
390         try:
391             # Connect to the database
392             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
393             lp = self.get_loadparm()
394
395             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
396                             session_info=system_session(lp), lp=lp)
397
398             serviceName = new_ldb.get_dsServiceName()
399             m = ldb.Message()
400             m.dn = fsmo_dn
401             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
402                                                     ldb.FLAG_MOD_REPLACE,
403                                                     "fSMORoleOwner")
404             new_ldb.modify(m)
405
406             # 1. Get server name
407             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
408                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
409             # 2. Get server reference
410             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
411
412             # Assert that no RID Set has been set
413             res = new_ldb.search(base=server_ref_dn,
414                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
415
416             self.assertFalse("rIDSetReferences" in res[0])
417
418             smbconf = os.path.join(targetdir, "etc/smb.conf")
419
420             new_ldb.newuser("ridalloctestuser", "P@ssword!")
421
422             # 3. Assert we get the RID Set
423             res = new_ldb.search(base=server_ref_dn,
424                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
425
426             self.assertTrue("rIDSetReferences" in res[0])
427
428         finally:
429             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
430             shutil.rmtree(targetdir, ignore_errors=True)
431
432     def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
433         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
434         but do not create the RID set. Confirm that user-add correctly creates
435         the RID Set."""
436         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
437         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
438
439         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
440         try:
441             # Connect to the database
442             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
443             lp = self.get_loadparm()
444
445             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
446                             session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
447
448             serviceName = new_ldb.get_dsServiceName()
449             m = ldb.Message()
450             m.dn = fsmo_dn
451             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
452                                                     ldb.FLAG_MOD_REPLACE,
453                                                     "fSMORoleOwner")
454             new_ldb.modify(m)
455
456             # 1. Get server name
457             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
458                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
459             # 2. Get server reference
460             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
461
462             # Assert that no RID Set has been set
463             res = new_ldb.search(base=server_ref_dn,
464                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
465
466             self.assertFalse("rIDSetReferences" in res[0])
467
468             smbconf = os.path.join(targetdir, "etc/smb.conf")
469
470             # Create a user to allocate a RID Set for itself (the RID master)
471             new_ldb.newuser("ridalloctestuser", "P@ssword!")
472
473             # 3. Assert we get the RID Set
474             res = new_ldb.search(base=server_ref_dn,
475                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
476
477             self.assertTrue("rIDSetReferences" in res[0])
478
479         finally:
480             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
481             shutil.rmtree(targetdir, ignore_errors=True)
482
483     def test_join_time_ridalloc(self):
484         """Perform a join against the RID manager and assert we have a RID Set"""
485
486         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
487         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
488
489         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
490         try:
491             # Connect to the database
492             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
493             smbconf = os.path.join(targetdir, "etc/smb.conf")
494
495             lp = self.get_loadparm()
496             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
497                             session_info=system_session(lp), lp=lp)
498
499             # 1. Get server name
500             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
501                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
502             # 2. Get server reference
503             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
504
505             # 3. Assert we get the RID Set
506             res = new_ldb.search(base=server_ref_dn,
507                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
508
509             self.assertTrue("rIDSetReferences" in res[0])
510         finally:
511             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
512             shutil.rmtree(targetdir, ignore_errors=True)
513
514     def test_rid_set_dbcheck(self):
515         """Perform a join against the RID manager and assert we have a RID Set.
516         Using dbcheck, we assert that we can detect out of range users."""
517
518         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
519         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
520
521         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
522         try:
523             # Connect to the database
524             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
525             smbconf = os.path.join(targetdir, "etc/smb.conf")
526
527             lp = self.get_loadparm()
528             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
529                             session_info=system_session(lp), lp=lp)
530
531             # 1. Get server name
532             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
533                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
534             # 2. Get server reference
535             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
536
537             # 3. Assert we get the RID Set
538             res = new_ldb.search(base=server_ref_dn,
539                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
540
541             self.assertTrue("rIDSetReferences" in res[0])
542             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
543
544             # 4. Add a new user (triggers RID set work)
545             new_ldb.newuser("ridalloctestuser", "P@ssword!")
546
547             # 5. Now fetch the RID SET
548             rid_set_res = new_ldb.search(base=rid_set_dn,
549                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
550                                                                       'rIDAllocationPool'])
551             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
552             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
553
554             # 6. Add user above the ridNextRid and at mid-range.
555             #
556             # We can do this with safety because this is an offline DB that will be
557             # destroyed.
558             m = ldb.Message()
559             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
560             m.dn.add_base(new_ldb.get_default_basedn())
561             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
562             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
563                                                 ldb.FLAG_MOD_ADD,
564                                                 'objectSid')
565             new_ldb.add(m, controls=["relax:0"])
566
567             # 7. Check the RID Set
568             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
569
570             # Should have one error (wrong rIDNextRID)
571             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
572
573             # 8. Assert we get didn't show any other errors
574             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
575
576             rid_set_res = new_ldb.search(base=rid_set_dn,
577                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
578                                                                       'rIDAllocationPool'])
579             last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
580             self.assertEquals(last_allocated_rid, last_rid - 10)
581
582             # 9. Assert that the range wasn't thrown away
583
584             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
585             self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
586         finally:
587             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
588             shutil.rmtree(targetdir, ignore_errors=True)
589
590
591     def test_rid_set_dbcheck_after_seize(self):
592         """Perform a join against the RID manager and assert we have a RID Set.
593         We seize the RID master role, then using dbcheck, we assert that we can
594         detect out of range users (and then bump the RID set as required)."""
595
596         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
597         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
598
599         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
600         try:
601             # Connect to the database
602             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
603             smbconf = os.path.join(targetdir, "etc/smb.conf")
604
605             lp = self.get_loadparm()
606             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
607                             session_info=system_session(lp), lp=lp)
608
609             # 1. Get server name
610             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
611                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
612             # 2. Get server reference
613             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
614
615             # 3. Assert we get the RID Set
616             res = new_ldb.search(base=server_ref_dn,
617                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
618
619             self.assertTrue("rIDSetReferences" in res[0])
620             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
621             # 4. Seize the RID Manager role
622             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
623             self.assertCmdSuccess(result, out, err)
624             self.assertEquals(err,"","Shouldn't be any error messages")
625
626             # 5. Add a new user (triggers RID set work)
627             new_ldb.newuser("ridalloctestuser", "P@ssword!")
628
629             # 6. Now fetch the RID SET
630             rid_set_res = new_ldb.search(base=rid_set_dn,
631                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
632                                                                       'rIDAllocationPool'])
633             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
634             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
635
636             # 7. Add user above the ridNextRid and at almost the end of the range.
637             #
638             m = ldb.Message()
639             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
640             m.dn.add_base(new_ldb.get_default_basedn())
641             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
642             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
643                                                 ldb.FLAG_MOD_ADD,
644                                                 'objectSid')
645             new_ldb.add(m, controls=["relax:0"])
646
647             # 8. Add user above the ridNextRid and at the end of the range
648             m = ldb.Message()
649             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
650             m.dn.add_base(new_ldb.get_default_basedn())
651             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
652             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
653                                                 ldb.FLAG_MOD_ADD,
654                                                 'objectSid')
655             new_ldb.add(m, controls=["relax:0"])
656
657             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
658
659             # Should have fixed two errors (wrong ridNextRid)
660             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
661
662             # 9. Assert we get didn't show any other errors
663             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
664
665             # 10. Add another user (checks RID rollover)
666             # We have seized the role, so we can do that.
667             new_ldb.newuser("ridalloctestuser3", "P@ssword!")
668
669             rid_set_res = new_ldb.search(base=rid_set_dn,
670                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
671                                                                       'rIDAllocationPool'])
672             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
673             self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
674         finally:
675             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
676             shutil.rmtree(targetdir, ignore_errors=True)