torture/drs: move ExopBaseTest into DrsBaseTest and extend
[samba.git] / source4 / torture / drs / python / ridalloc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various RID allocation scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
9 #
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 #
23
24 #
25 # Usage:
26 #  export DC1=dc1_dns_name
27 #  export DC2=dc2_dns_name
28 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 #
31
32 import drs_base
33 import samba.tests
34
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
41
42 import shutil, tempfile, os
43 from samba.netcmd.main import cmd_sambatool
44 from samba.auth import system_session, admin_session
45 from samba.dbchecker import dbcheck
46 from samba.ndr import ndr_pack
47 from samba.dcerpc import security
48
49 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
50     """Intended as a semi-black box test case for DsGetNCChanges
51        implementation for extended operations. It should be testing
52        how DsGetNCChanges handles different input params (mostly invalid).
53        Final goal is to make DsGetNCChanges as binary compatible to
54        Windows implementation as possible"""
55
56     def setUp(self):
57         super(DrsReplicaSyncTestCase, self).setUp()
58
59     def tearDown(self):
60         super(DrsReplicaSyncTestCase, self).tearDown()
61
62     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
63         """Returns (owner, not_owner) pair where:
64              owner: dns name for FSMO owner
65              not_owner: dns name for DC not owning the FSMO"""
66         # collect info to return later
67         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
68                        "invocation_id": self.ldb_dc1.get_invocation_id(),
69                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
70                        "server_dn": self.ldb_dc1.get_serverName()}
71         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
72                        "invocation_id": self.ldb_dc2.get_invocation_id(),
73                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
74                        "server_dn": self.ldb_dc2.get_serverName()}
75
76         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
77         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
78         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
79
80         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
81         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
82         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
83
84         # determine the owner dc
85         res = self.ldb_dc1.search(fsmo_obj_dn,
86                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
87         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
88         fsmo_owner = res[0]["fSMORoleOwner"][0]
89         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
90             return (fsmo_info_1, fsmo_info_2)
91         return (fsmo_info_2, fsmo_info_1)
92
93     def _check_exop_failed(self, ctr6, expected_failure):
94         self.assertEqual(ctr6.extended_ret, expected_failure)
95         #self.assertEqual(ctr6.object_count, 0)
96         #self.assertEqual(ctr6.first_object, None)
97         self.assertEqual(ctr6.more_data, False)
98         self.assertEqual(ctr6.nc_object_count, 0)
99         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
100         self.assertEqual(ctr6.linked_attributes_count, 0)
101         self.assertEqual(ctr6.linked_attributes, [])
102         self.assertEqual(ctr6.drs_error[0], 0)
103
104     def test_InvalidDestDSA_ridalloc(self):
105         """Test RID allocation with invalid destination DSA guid"""
106         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
107         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
108
109         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
110                                invocation_id=fsmo_owner["invocation_id"],
111                                nc_dn_str=fsmo_dn,
112                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
113
114         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
115         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
116         self.assertEqual(level, 6, "Expected level 6 response!")
117         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
118         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
119         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
120
121     def test_do_ridalloc(self):
122         """Test doing a RID allocation with a valid destination DSA guid"""
123         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
124         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
125
126         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
127                                invocation_id=fsmo_owner["invocation_id"],
128                                nc_dn_str=fsmo_dn,
129                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
130
131         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
132         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
133         self.assertEqual(level, 6, "Expected level 6 response!")
134         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
135         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
136         ctr6 = ctr
137         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
138         self.assertEqual(ctr6.object_count, 3)
139         self.assertNotEqual(ctr6.first_object, None)
140         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
141         self.assertNotEqual(ctr6.first_object.next_object, None)
142         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
143         second_object = ctr6.first_object.next_object.object
144         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
145         third_object = ctr6.first_object.next_object.next_object.object
146         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
147
148         self.assertEqual(ctr6.more_data, False)
149         self.assertEqual(ctr6.nc_object_count, 0)
150         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
151         self.assertEqual(ctr6.drs_error[0], 0)
152         # We don't check the linked_attributes_count as if the domain
153         # has an RODC, it can gain links on the server account object
154
155     def test_do_ridalloc_get_anc(self):
156         """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
157         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
158         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
159
160         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
161                                invocation_id=fsmo_owner["invocation_id"],
162                                nc_dn_str=fsmo_dn,
163                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
164                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
165
166         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
167         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
168         self.assertEqual(level, 6, "Expected level 6 response!")
169         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
170         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
171         ctr6 = ctr
172         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
173         self.assertEqual(ctr6.object_count, 3)
174         self.assertNotEqual(ctr6.first_object, None)
175         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
176         self.assertNotEqual(ctr6.first_object.next_object, None)
177         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
178         second_object = ctr6.first_object.next_object.object
179         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
180         third_object = ctr6.first_object.next_object.next_object.object
181         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
182         self.assertEqual(ctr6.more_data, False)
183         self.assertEqual(ctr6.nc_object_count, 0)
184         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
185         self.assertEqual(ctr6.drs_error[0], 0)
186         # We don't check the linked_attributes_count as if the domain
187         # has an RODC, it can gain links on the server account object
188
189     def test_edit_rid_master(self):
190         """Test doing a RID allocation after changing the RID master from the original one.
191            This should set rIDNextRID to 0 on the new RID master."""
192         # 1. a. Transfer role to non-RID master
193         #    b. Check that it succeeds correctly
194         #
195         # 2. a. Call the RID alloc against the former master.
196         #    b. Check that it succeeds.
197         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
198         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
199
200         # 1. Swap RID master role
201         m = ldb.Message()
202         m.dn = ldb.Dn(self.ldb_dc1, "")
203         m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
204                                                   "becomeRidMaster")
205
206         # Make sure that ldb_dc1 == RID Master
207
208         server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
209
210         # self.ldb_dc1 == LOCALDC
211         if server_dn == fsmo_owner['server_dn']:
212             # ldb_dc1 == VAMPIREDC
213             ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
214         else:
215             # Otherwise switch the two
216             ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
217
218         try:
219             # ldb_dc1 is now RID MASTER (as VAMPIREDC)
220             ldb_dc1.modify(m)
221         except ldb.LdbError, (num, msg):
222             self.fail("Failed to reassign RID Master " +  msg)
223
224         try:
225             # 2. Perform a RID alloc
226             req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
227                     invocation_id=fsmo_not_owner["invocation_id"],
228                     nc_dn_str=fsmo_dn,
229                     exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
230
231             (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
232             # 3. Make sure the allocation succeeds
233             try:
234                 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
235             except RuntimeError, e:
236                 self.fail("RID allocation failed: " + str(e))
237
238             fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
239
240             self.assertEqual(level, 6, "Expected level 6 response!")
241             self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
242             self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
243             ctr6 = ctr
244             self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
245             self.assertEqual(ctr6.object_count, 3)
246             self.assertNotEqual(ctr6.first_object, None)
247             self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
248             self.assertNotEqual(ctr6.first_object.next_object, None)
249             self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
250             second_object = ctr6.first_object.next_object.object
251             self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
252             third_object = ctr6.first_object.next_object.next_object.object
253             self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
254         finally:
255             # Swap the RID master back for other tests
256             m = ldb.Message()
257             m.dn = ldb.Dn(ldb_dc2, "")
258             m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
259             try:
260                 ldb_dc2.modify(m)
261             except ldb.LdbError, (num, msg):
262                 self.fail("Failed to restore RID Master " +  msg)
263
264     def test_offline_samba_tool_seized_ridalloc(self):
265         """Perform a join against the non-RID manager and then seize the RID Manager role"""
266
267         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
268         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
269
270         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
271         try:
272             # Connect to the database
273             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
274             smbconf = os.path.join(targetdir, "etc/smb.conf")
275
276             lp = self.get_loadparm()
277             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
278                             session_info=system_session(lp), lp=lp)
279
280             # 1. Get server name
281             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
282                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
283             # 2. Get server reference
284             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
285
286             # Assert that no RID Set has been set
287             res = new_ldb.search(base=server_ref_dn,
288                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
289
290             self.assertFalse("rIDSetReferences" in res[0])
291
292             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
293             self.assertCmdSuccess(result, out, err)
294             self.assertEquals(err,"","Shouldn't be any error messages")
295
296             # 3. Assert we get the RID Set
297             res = new_ldb.search(base=server_ref_dn,
298                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
299
300             self.assertTrue("rIDSetReferences" in res[0])
301         finally:
302             shutil.rmtree(targetdir, ignore_errors=True)
303             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
304
305     def _test_join(self, server, netbios_name):
306         tmpdir = os.path.join(self.tempdir, "targetdir")
307         creds = self.get_credentials()
308         cmd = cmd_sambatool.subcommands['domain'].subcommands['join']
309         result = cmd._run("samba-tool domain join",
310                           creds.get_realm(),
311                           "dc", "-U%s%%%s" % (creds.get_username(),
312                                               creds.get_password()),
313                           '--targetdir=%s' % tmpdir,
314                           '--server=%s' % server,
315                           "--option=netbios name = %s" % netbios_name)
316         return tmpdir
317
318     def _test_force_demote(self, server, netbios_name):
319         creds = self.get_credentials()
320         cmd = cmd_sambatool.subcommands['domain'].subcommands['demote']
321         result = cmd._run("samba-tool domain demote",
322                           "-U%s%%%s" % (creds.get_username(),
323                                               creds.get_password()),
324                           '--server=%s' % server,
325                           "--remove-other-dead-server=%s" % netbios_name)
326
327     def test_offline_manual_seized_ridalloc_with_dbcheck(self):
328         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
329         but do not create the RID set. Confirm that dbcheck correctly creates
330         the RID Set.
331
332         Also check
333         """
334         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
335         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
336
337         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
338         try:
339             # Connect to the database
340             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
341             lp = self.get_loadparm()
342
343             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
344                             session_info=system_session(lp), lp=lp)
345
346             serviceName = new_ldb.get_dsServiceName()
347             m = ldb.Message()
348             m.dn = fsmo_dn
349             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
350                                                    ldb.FLAG_MOD_REPLACE,
351                                                    "fSMORoleOwner")
352             new_ldb.modify(m)
353
354             # 1. Get server name
355             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
356                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
357             # 2. Get server reference
358             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
359
360             # Assert that no RID Set has been set
361             res = new_ldb.search(base=server_ref_dn,
362                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
363
364             self.assertFalse("rIDSetReferences" in res[0])
365
366             smbconf = os.path.join(targetdir, "etc/smb.conf")
367
368             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
369
370             self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
371
372             # 3. Assert we get the RID Set
373             res = new_ldb.search(base=server_ref_dn,
374                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
375
376             self.assertTrue("rIDSetReferences" in res[0])
377         finally:
378             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
379             shutil.rmtree(targetdir, ignore_errors=True)
380
381     def test_offline_manual_seized_ridalloc_add_user(self):
382         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
383         but do not create the RID set. Confirm that user-add correctly creates
384         the RID Set."""
385         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
386         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
387
388         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
389         try:
390             # Connect to the database
391             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
392             lp = self.get_loadparm()
393
394             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
395                             session_info=system_session(lp), lp=lp)
396
397             serviceName = new_ldb.get_dsServiceName()
398             m = ldb.Message()
399             m.dn = fsmo_dn
400             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
401                                                    ldb.FLAG_MOD_REPLACE,
402                                                    "fSMORoleOwner")
403             new_ldb.modify(m)
404
405             # 1. Get server name
406             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
407                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
408             # 2. Get server reference
409             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
410
411             # Assert that no RID Set has been set
412             res = new_ldb.search(base=server_ref_dn,
413                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
414
415             self.assertFalse("rIDSetReferences" in res[0])
416
417             smbconf = os.path.join(targetdir, "etc/smb.conf")
418
419             new_ldb.newuser("ridalloctestuser", "P@ssword!")
420
421             # 3. Assert we get the RID Set
422             res = new_ldb.search(base=server_ref_dn,
423                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
424
425             self.assertTrue("rIDSetReferences" in res[0])
426
427         finally:
428             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
429             shutil.rmtree(targetdir, ignore_errors=True)
430
431     def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
432         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
433         but do not create the RID set. Confirm that user-add correctly creates
434         the RID Set."""
435         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
436         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
437
438         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
439         try:
440             # Connect to the database
441             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
442             lp = self.get_loadparm()
443
444             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
445                             session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
446
447             serviceName = new_ldb.get_dsServiceName()
448             m = ldb.Message()
449             m.dn = fsmo_dn
450             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
451                                                    ldb.FLAG_MOD_REPLACE,
452                                                    "fSMORoleOwner")
453             new_ldb.modify(m)
454
455             # 1. Get server name
456             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
457                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
458             # 2. Get server reference
459             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
460
461             # Assert that no RID Set has been set
462             res = new_ldb.search(base=server_ref_dn,
463                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
464
465             self.assertFalse("rIDSetReferences" in res[0])
466
467             smbconf = os.path.join(targetdir, "etc/smb.conf")
468
469             # Create a user to allocate a RID Set for itself (the RID master)
470             new_ldb.newuser("ridalloctestuser", "P@ssword!")
471
472             # 3. Assert we get the RID Set
473             res = new_ldb.search(base=server_ref_dn,
474                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
475
476             self.assertTrue("rIDSetReferences" in res[0])
477
478         finally:
479             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
480             shutil.rmtree(targetdir, ignore_errors=True)
481
482     def test_join_time_ridalloc(self):
483         """Perform a join against the RID manager and assert we have a RID Set"""
484
485         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
486         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
487
488         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
489         try:
490             # Connect to the database
491             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
492             smbconf = os.path.join(targetdir, "etc/smb.conf")
493
494             lp = self.get_loadparm()
495             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
496                             session_info=system_session(lp), lp=lp)
497
498             # 1. Get server name
499             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
500                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
501             # 2. Get server reference
502             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
503
504             # 3. Assert we get the RID Set
505             res = new_ldb.search(base=server_ref_dn,
506                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
507
508             self.assertTrue("rIDSetReferences" in res[0])
509         finally:
510             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
511             shutil.rmtree(targetdir, ignore_errors=True)
512
513     def test_rid_set_dbcheck(self):
514         """Perform a join against the RID manager and assert we have a RID Set.
515         Using dbcheck, we assert that we can detect out of range users."""
516
517         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
518         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
519
520         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
521         try:
522             # Connect to the database
523             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
524             smbconf = os.path.join(targetdir, "etc/smb.conf")
525
526             lp = self.get_loadparm()
527             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
528                             session_info=system_session(lp), lp=lp)
529
530             # 1. Get server name
531             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
532                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
533             # 2. Get server reference
534             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
535
536             # 3. Assert we get the RID Set
537             res = new_ldb.search(base=server_ref_dn,
538                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
539
540             self.assertTrue("rIDSetReferences" in res[0])
541             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
542
543             # 4. Add a new user (triggers RID set work)
544             new_ldb.newuser("ridalloctestuser", "P@ssword!")
545
546             # 5. Now fetch the RID SET
547             rid_set_res = new_ldb.search(base=rid_set_dn,
548                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
549                                                                       'rIDAllocationPool'])
550             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
551             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
552
553             # 6. Add user above the ridNextRid and at mid-range.
554             #
555             # We can do this with safety because this is an offline DB that will be
556             # destroyed.
557             m = ldb.Message()
558             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
559             m.dn.add_base(new_ldb.get_default_basedn())
560             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
561             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
562                                                 ldb.FLAG_MOD_ADD,
563                                                 'objectSid')
564             new_ldb.add(m, controls=["relax:0"])
565
566             # 7. Check the RID Set
567             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
568
569             # Should have one error (wrong rIDNextRID)
570             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
571
572             # 8. Assert we get didn't show any other errors
573             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
574
575             rid_set_res = new_ldb.search(base=rid_set_dn,
576                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
577                                                                       'rIDAllocationPool'])
578             last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
579             self.assertEquals(last_allocated_rid, last_rid - 10)
580
581             # 9. Assert that the range wasn't thrown away
582
583             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
584             self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
585         finally:
586             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
587             shutil.rmtree(targetdir, ignore_errors=True)
588
589
590     def test_rid_set_dbcheck_after_seize(self):
591         """Perform a join against the RID manager and assert we have a RID Set.
592         We seize the RID master role, then using dbcheck, we assert that we can
593         detect out of range users (and then bump the RID set as required)."""
594
595         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
596         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
597
598         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
599         try:
600             # Connect to the database
601             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
602             smbconf = os.path.join(targetdir, "etc/smb.conf")
603
604             lp = self.get_loadparm()
605             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
606                             session_info=system_session(lp), lp=lp)
607
608             # 1. Get server name
609             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
610                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
611             # 2. Get server reference
612             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
613
614             # 3. Assert we get the RID Set
615             res = new_ldb.search(base=server_ref_dn,
616                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
617
618             self.assertTrue("rIDSetReferences" in res[0])
619             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
620
621             # 4. Seize the RID Manager role
622             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
623             self.assertCmdSuccess(result, out, err)
624             self.assertEquals(err,"","Shouldn't be any error messages")
625
626             # 5. Add a new user (triggers RID set work)
627             new_ldb.newuser("ridalloctestuser", "P@ssword!")
628
629             # 6. Now fetch the RID SET
630             rid_set_res = new_ldb.search(base=rid_set_dn,
631                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
632                                                                       'rIDAllocationPool'])
633             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
634             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
635
636             # 7. Add user above the ridNextRid and at almost the end of the range.
637             #
638             m = ldb.Message()
639             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
640             m.dn.add_base(new_ldb.get_default_basedn())
641             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
642             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
643                                                 ldb.FLAG_MOD_ADD,
644                                                 'objectSid')
645             new_ldb.add(m, controls=["relax:0"])
646
647             # 8. Add user above the ridNextRid and at the end of the range
648             m = ldb.Message()
649             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
650             m.dn.add_base(new_ldb.get_default_basedn())
651             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
652             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
653                                                 ldb.FLAG_MOD_ADD,
654                                                 'objectSid')
655             new_ldb.add(m, controls=["relax:0"])
656
657             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
658
659             # Should have fixed two errors (wrong ridNextRid)
660             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
661
662             # 9. Assert we get didn't show any other errors
663             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
664
665             # 10. Add another user (checks RID rollover)
666             # We have seized the role, so we can do that.
667             new_ldb.newuser("ridalloctestuser3", "P@ssword!")
668
669             rid_set_res = new_ldb.search(base=rid_set_dn,
670                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
671                                                                       'rIDAllocationPool'])
672             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
673             self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
674         finally:
675             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
676             shutil.rmtree(targetdir, ignore_errors=True)