pyldb: avoid segfault when adding an element with no name
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / ridalloc_exop.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various RID allocation scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
9 #
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 #
23
24 #
25 # Usage:
26 #  export DC1=dc1_dns_name
27 #  export DC2=dc2_dns_name
28 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 #
31
32 import drs_base
33 import samba.tests
34
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
41
42 import shutil
43 import tempfile
44 import os
45 from samba.auth import system_session, admin_session
46 from samba.dbchecker import dbcheck
47 from samba.ndr import ndr_pack
48 from samba.dcerpc import security
49
50
51 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
52     """Intended as a semi-black box test case for DsGetNCChanges
53        implementation for extended operations. It should be testing
54        how DsGetNCChanges handles different input params (mostly invalid).
55        Final goal is to make DsGetNCChanges as binary compatible to
56        Windows implementation as possible"""
57
58     def setUp(self):
59         super(DrsReplicaSyncTestCase, self).setUp()
60
61     def tearDown(self):
62         super(DrsReplicaSyncTestCase, self).tearDown()
63
64     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
65         """Returns (owner, not_owner) pair where:
66              owner: dns name for FSMO owner
67              not_owner: dns name for DC not owning the FSMO"""
68         # collect info to return later
69         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
70                        "invocation_id": self.ldb_dc1.get_invocation_id(),
71                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
72                        "server_dn": self.ldb_dc1.get_serverName()}
73         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
74                        "invocation_id": self.ldb_dc2.get_invocation_id(),
75                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
76                        "server_dn": self.ldb_dc2.get_serverName()}
77
78         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
79         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0].decode('utf8'))
80         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
81
82         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
83         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0].decode('utf8'))
84         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
85
86         # determine the owner dc
87         res = self.ldb_dc1.search(fsmo_obj_dn,
88                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
89         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!" % fsmo_obj_dn
90         fsmo_owner = res[0]["fSMORoleOwner"][0]
91         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
92             return (fsmo_info_1, fsmo_info_2)
93         return (fsmo_info_2, fsmo_info_1)
94
95     def _check_exop_failed(self, ctr6, expected_failure):
96         self.assertEqual(ctr6.extended_ret, expected_failure)
97         #self.assertEqual(ctr6.object_count, 0)
98         #self.assertEqual(ctr6.first_object, None)
99         self.assertEqual(ctr6.more_data, False)
100         self.assertEqual(ctr6.nc_object_count, 0)
101         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
102         self.assertEqual(ctr6.linked_attributes_count, 0)
103         self.assertEqual(ctr6.linked_attributes, [])
104         self.assertEqual(ctr6.drs_error[0], 0)
105
106     def test_InvalidDestDSA_ridalloc(self):
107         """Test RID allocation with invalid destination DSA guid"""
108         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
109         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
110
111         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
112                                invocation_id=fsmo_owner["invocation_id"],
113                                nc_dn_str=fsmo_dn,
114                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
115
116         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
117         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
118         self.assertEqual(level, 6, "Expected level 6 response!")
119         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
120         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
121         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
122
123     def test_do_ridalloc(self):
124         """Test doing a RID allocation with a valid destination DSA guid"""
125         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
126         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
127
128         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
129                                invocation_id=fsmo_owner["invocation_id"],
130                                nc_dn_str=fsmo_dn,
131                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
132
133         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
134         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
135         self.assertEqual(level, 6, "Expected level 6 response!")
136         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
137         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
138         ctr6 = ctr
139         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
140         self.assertEqual(ctr6.object_count, 3)
141         self.assertNotEqual(ctr6.first_object, None)
142         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
143         self.assertNotEqual(ctr6.first_object.next_object, None)
144         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
145         second_object = ctr6.first_object.next_object.object
146         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
147         third_object = ctr6.first_object.next_object.next_object.object
148         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
149
150         self.assertEqual(ctr6.more_data, False)
151         self.assertEqual(ctr6.nc_object_count, 0)
152         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
153         self.assertEqual(ctr6.drs_error[0], 0)
154         # We don't check the linked_attributes_count as if the domain
155         # has an RODC, it can gain links on the server account object
156
157     def test_do_ridalloc_get_anc(self):
158         """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
159         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
160         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
161
162         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
163                                invocation_id=fsmo_owner["invocation_id"],
164                                nc_dn_str=fsmo_dn,
165                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
166                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
167
168         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
169         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
170         self.assertEqual(level, 6, "Expected level 6 response!")
171         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
172         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
173         ctr6 = ctr
174         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
175         self.assertEqual(ctr6.object_count, 3)
176         self.assertNotEqual(ctr6.first_object, None)
177         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
178         self.assertNotEqual(ctr6.first_object.next_object, None)
179         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
180         second_object = ctr6.first_object.next_object.object
181         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
182         third_object = ctr6.first_object.next_object.next_object.object
183         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
184         self.assertEqual(ctr6.more_data, False)
185         self.assertEqual(ctr6.nc_object_count, 0)
186         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
187         self.assertEqual(ctr6.drs_error[0], 0)
188         # We don't check the linked_attributes_count as if the domain
189         # has an RODC, it can gain links on the server account object
190
191     def test_edit_rid_master(self):
192         """Test doing a RID allocation after changing the RID master from the original one.
193            This should set rIDNextRID to 0 on the new RID master."""
194         # 1. a. Transfer role to non-RID master
195         #    b. Check that it succeeds correctly
196         #
197         # 2. a. Call the RID alloc against the former master.
198         #    b. Check that it succeeds.
199         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
200         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
201
202         # 1. Swap RID master role
203         m = ldb.Message()
204         m.dn = ldb.Dn(self.ldb_dc1, "")
205         m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
206                                                   "becomeRidMaster")
207
208         # Make sure that ldb_dc1 == RID Master
209
210         server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
211
212         # self.ldb_dc1 == LOCALDC
213         if server_dn == fsmo_owner['server_dn']:
214             # ldb_dc1 == VAMPIREDC
215             ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
216         else:
217             # Otherwise switch the two
218             ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
219
220         try:
221             # ldb_dc1 is now RID MASTER (as VAMPIREDC)
222             ldb_dc1.modify(m)
223         except ldb.LdbError as e1:
224             (num, msg) = e1.args
225             self.fail("Failed to reassign RID Master " + msg)
226
227         try:
228             # 2. Perform a RID alloc
229             req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
230                                    invocation_id=fsmo_not_owner["invocation_id"],
231                                    nc_dn_str=fsmo_dn,
232                                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
233
234             (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
235             # 3. Make sure the allocation succeeds
236             try:
237                 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
238             except RuntimeError as e:
239                 self.fail("RID allocation failed: " + str(e))
240
241             fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
242
243             self.assertEqual(level, 6, "Expected level 6 response!")
244             self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
245             self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
246             ctr6 = ctr
247             self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
248             self.assertEqual(ctr6.object_count, 3)
249             self.assertNotEqual(ctr6.first_object, None)
250             self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
251             self.assertNotEqual(ctr6.first_object.next_object, None)
252             self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
253             second_object = ctr6.first_object.next_object.object
254             self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
255             third_object = ctr6.first_object.next_object.next_object.object
256             self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
257         finally:
258             # Swap the RID master back for other tests
259             m = ldb.Message()
260             m.dn = ldb.Dn(ldb_dc2, "")
261             m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
262             try:
263                 ldb_dc2.modify(m)
264             except ldb.LdbError as e:
265                 (num, msg) = e.args
266                 self.fail("Failed to restore RID Master " + msg)
267
268     def test_offline_samba_tool_seized_ridalloc(self):
269         """Perform a join against the non-RID manager and then seize the RID Manager role"""
270
271         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
272         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
273
274         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
275         try:
276             # Connect to the database
277             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
278             smbconf = os.path.join(targetdir, "etc/smb.conf")
279
280             lp = self.get_loadparm()
281             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
282                             session_info=system_session(lp), lp=lp)
283
284             # 1. Get server name
285             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
286                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
287             # 2. Get server reference
288             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
289
290             # Assert that no RID Set has been set
291             res = new_ldb.search(base=server_ref_dn,
292                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
293
294             self.assertFalse("rIDSetReferences" in res[0])
295
296             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
297             self.assertCmdSuccess(result, out, err)
298             self.assertEquals(err, "", "Shouldn't be any error messages")
299
300             # 3. Assert we get the RID Set
301             res = new_ldb.search(base=server_ref_dn,
302                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
303
304             self.assertTrue("rIDSetReferences" in res[0])
305         finally:
306             shutil.rmtree(targetdir, ignore_errors=True)
307             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
308
309     def _test_join(self, server, netbios_name):
310         tmpdir = os.path.join(self.tempdir, "targetdir")
311         creds = self.get_credentials()
312         (result, out, err) = self.runsubcmd("domain", "join",
313                                             creds.get_realm(),
314                                             "dc", "-U%s%%%s" % (creds.get_username(),
315                                                                 creds.get_password()),
316                                             '--targetdir=%s' % tmpdir,
317                                             '--server=%s' % server,
318                                             "--option=netbios name = %s" % netbios_name)
319         self.assertCmdSuccess(result, out, err)
320         return tmpdir
321
322     def _test_force_demote(self, server, netbios_name):
323         creds = self.get_credentials()
324         (result, out, err) = self.runsubcmd("domain", "demote",
325                                             "-U%s%%%s" % (creds.get_username(),
326                                                           creds.get_password()),
327                                             '--server=%s' % server,
328                                             "--remove-other-dead-server=%s" % netbios_name)
329         self.assertCmdSuccess(result, out, err)
330
331     def test_offline_manual_seized_ridalloc_with_dbcheck(self):
332         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
333         but do not create the RID set. Confirm that dbcheck correctly creates
334         the RID Set.
335
336         Also check
337         """
338         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
339         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
340
341         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
342         try:
343             # Connect to the database
344             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
345             lp = self.get_loadparm()
346
347             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
348                             session_info=system_session(lp), lp=lp)
349
350             serviceName = new_ldb.get_dsServiceName()
351             m = ldb.Message()
352             m.dn = fsmo_dn
353             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
354                                                     ldb.FLAG_MOD_REPLACE,
355                                                     "fSMORoleOwner")
356             new_ldb.modify(m)
357
358             # 1. Get server name
359             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
360                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
361             # 2. Get server reference
362             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
363
364             # Assert that no RID Set has been set
365             res = new_ldb.search(base=server_ref_dn,
366                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
367
368             self.assertFalse("rIDSetReferences" in res[0])
369
370             smbconf = os.path.join(targetdir, "etc/smb.conf")
371
372             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
373
374             self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
375
376             # 3. Assert we get the RID Set
377             res = new_ldb.search(base=server_ref_dn,
378                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
379
380             self.assertTrue("rIDSetReferences" in res[0])
381         finally:
382             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
383             shutil.rmtree(targetdir, ignore_errors=True)
384
385     def test_offline_manual_seized_ridalloc_add_user(self):
386         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
387         but do not create the RID set. Confirm that user-add correctly creates
388         the RID Set."""
389         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
390         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
391
392         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
393         try:
394             # Connect to the database
395             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
396             lp = self.get_loadparm()
397
398             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
399                             session_info=system_session(lp), lp=lp)
400
401             serviceName = new_ldb.get_dsServiceName()
402             m = ldb.Message()
403             m.dn = fsmo_dn
404             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
405                                                     ldb.FLAG_MOD_REPLACE,
406                                                     "fSMORoleOwner")
407             new_ldb.modify(m)
408
409             # 1. Get server name
410             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
411                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
412             # 2. Get server reference
413             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
414
415             # Assert that no RID Set has been set
416             res = new_ldb.search(base=server_ref_dn,
417                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
418
419             self.assertFalse("rIDSetReferences" in res[0])
420
421             smbconf = os.path.join(targetdir, "etc/smb.conf")
422
423             new_ldb.newuser("ridalloctestuser", "P@ssword!")
424
425             # 3. Assert we get the RID Set
426             res = new_ldb.search(base=server_ref_dn,
427                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
428
429             self.assertTrue("rIDSetReferences" in res[0])
430
431         finally:
432             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
433             shutil.rmtree(targetdir, ignore_errors=True)
434
435     def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
436         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
437         but do not create the RID set. Confirm that user-add correctly creates
438         the RID Set."""
439         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
440         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
441
442         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
443         try:
444             # Connect to the database
445             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
446             lp = self.get_loadparm()
447
448             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
449                             session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
450
451             serviceName = new_ldb.get_dsServiceName()
452             m = ldb.Message()
453             m.dn = fsmo_dn
454             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
455                                                     ldb.FLAG_MOD_REPLACE,
456                                                     "fSMORoleOwner")
457             new_ldb.modify(m)
458
459             # 1. Get server name
460             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
461                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
462             # 2. Get server reference
463             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
464
465             # Assert that no RID Set has been set
466             res = new_ldb.search(base=server_ref_dn,
467                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
468
469             self.assertFalse("rIDSetReferences" in res[0])
470
471             smbconf = os.path.join(targetdir, "etc/smb.conf")
472
473             # Create a user to allocate a RID Set for itself (the RID master)
474             new_ldb.newuser("ridalloctestuser", "P@ssword!")
475
476             # 3. Assert we get the RID Set
477             res = new_ldb.search(base=server_ref_dn,
478                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
479
480             self.assertTrue("rIDSetReferences" in res[0])
481
482         finally:
483             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
484             shutil.rmtree(targetdir, ignore_errors=True)
485
486     def test_join_time_ridalloc(self):
487         """Perform a join against the RID manager and assert we have a RID Set"""
488
489         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
490         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
491
492         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
493         try:
494             # Connect to the database
495             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
496             smbconf = os.path.join(targetdir, "etc/smb.conf")
497
498             lp = self.get_loadparm()
499             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
500                             session_info=system_session(lp), lp=lp)
501
502             # 1. Get server name
503             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
504                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
505             # 2. Get server reference
506             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
507
508             # 3. Assert we get the RID Set
509             res = new_ldb.search(base=server_ref_dn,
510                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
511
512             self.assertTrue("rIDSetReferences" in res[0])
513         finally:
514             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
515             shutil.rmtree(targetdir, ignore_errors=True)
516
517     def test_rid_set_dbcheck(self):
518         """Perform a join against the RID manager and assert we have a RID Set.
519         Using dbcheck, we assert that we can detect out of range users."""
520
521         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
522         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
523
524         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
525         try:
526             # Connect to the database
527             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
528             smbconf = os.path.join(targetdir, "etc/smb.conf")
529
530             lp = self.get_loadparm()
531             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
532                             session_info=system_session(lp), lp=lp)
533
534             # 1. Get server name
535             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
536                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
537             # 2. Get server reference
538             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
539
540             # 3. Assert we get the RID Set
541             res = new_ldb.search(base=server_ref_dn,
542                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
543
544             self.assertTrue("rIDSetReferences" in res[0])
545             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
546
547             # 4. Add a new user (triggers RID set work)
548             new_ldb.newuser("ridalloctestuser", "P@ssword!")
549
550             # 5. Now fetch the RID SET
551             rid_set_res = new_ldb.search(base=rid_set_dn,
552                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
553                                                                       'rIDAllocationPool'])
554             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
555             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
556
557             # 6. Add user above the ridNextRid and at mid-range.
558             #
559             # We can do this with safety because this is an offline DB that will be
560             # destroyed.
561             m = ldb.Message()
562             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
563             m.dn.add_base(new_ldb.get_default_basedn())
564             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
565             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
566                                                 ldb.FLAG_MOD_ADD,
567                                                 'objectSid')
568             new_ldb.add(m, controls=["relax:0"])
569
570             # 7. Check the RID Set
571             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
572
573             # Should have one error (wrong rIDNextRID)
574             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
575
576             # 8. Assert we get didn't show any other errors
577             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
578
579             rid_set_res = new_ldb.search(base=rid_set_dn,
580                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
581                                                                       'rIDAllocationPool'])
582             last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
583             self.assertEquals(last_allocated_rid, last_rid - 10)
584
585             # 9. Assert that the range wasn't thrown away
586
587             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
588             self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
589         finally:
590             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
591             shutil.rmtree(targetdir, ignore_errors=True)
592
593     def test_rid_set_dbcheck_after_seize(self):
594         """Perform a join against the RID manager and assert we have a RID Set.
595         We seize the RID master role, then using dbcheck, we assert that we can
596         detect out of range users (and then bump the RID set as required)."""
597
598         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
599         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
600
601         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
602         try:
603             # Connect to the database
604             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
605             smbconf = os.path.join(targetdir, "etc/smb.conf")
606
607             lp = self.get_loadparm()
608             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
609                             session_info=system_session(lp), lp=lp)
610
611             # 1. Get server name
612             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
613                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
614             # 2. Get server reference
615             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0].decode('utf8'))
616
617             # 3. Assert we get the RID Set
618             res = new_ldb.search(base=server_ref_dn,
619                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
620
621             self.assertTrue("rIDSetReferences" in res[0])
622             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0].decode('utf8'))
623             # 4. Seize the RID Manager role
624             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
625             self.assertCmdSuccess(result, out, err)
626             self.assertEquals(err, "", "Shouldn't be any error messages")
627
628             # 5. Add a new user (triggers RID set work)
629             new_ldb.newuser("ridalloctestuser", "P@ssword!")
630
631             # 6. Now fetch the RID SET
632             rid_set_res = new_ldb.search(base=rid_set_dn,
633                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
634                                                                       'rIDAllocationPool'])
635             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
636             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
637
638             # 7. Add user above the ridNextRid and at almost the end of the range.
639             #
640             m = ldb.Message()
641             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
642             m.dn.add_base(new_ldb.get_default_basedn())
643             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
644             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
645                                                 ldb.FLAG_MOD_ADD,
646                                                 'objectSid')
647             new_ldb.add(m, controls=["relax:0"])
648
649             # 8. Add user above the ridNextRid and at the end of the range
650             m = ldb.Message()
651             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
652             m.dn.add_base(new_ldb.get_default_basedn())
653             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
654             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
655                                                 ldb.FLAG_MOD_ADD,
656                                                 'objectSid')
657             new_ldb.add(m, controls=["relax:0"])
658
659             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
660
661             # Should have fixed two errors (wrong ridNextRid)
662             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
663
664             # 9. Assert we get didn't show any other errors
665             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
666
667             # 10. Add another user (checks RID rollover)
668             # We have seized the role, so we can do that.
669             new_ldb.newuser("ridalloctestuser3", "P@ssword!")
670
671             rid_set_res = new_ldb.search(base=rid_set_dn,
672                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
673                                                                       'rIDAllocationPool'])
674             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
675             self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
676         finally:
677             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
678             shutil.rmtree(targetdir, ignore_errors=True)