5a273fc1bb8b38eecfb4c245693e66bdd27d84b4
[samba.git] / source4 / torture / drs / python / ridalloc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various RID allocation scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # Copyright (C) Catalyst IT Ltd. 2016
9 #
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 #
23
24 #
25 # Usage:
26 #  export DC1=dc1_dns_name
27 #  export DC2=dc2_dns_name
28 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
29 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 #
31
32 import drs_base
33 import samba.tests
34
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi, misc
39 from samba.drs_utils import drs_DsBind
40 from samba.samdb import SamDB
41
42 import shutil, tempfile, os
43 from samba.netcmd.main import cmd_sambatool
44 from samba.auth import system_session, admin_session
45 from samba.dbchecker import dbcheck
46 from samba.ndr import ndr_pack
47 from samba.dcerpc import security
48
49 class ExopBaseTest:
50     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
51                    replica_flags=0, max_objects=0, partial_attribute_set=None,
52                    partial_attribute_set_ex=None, mapping_ctr=None):
53         req8 = drsuapi.DsGetNCChangesRequest8()
54
55         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
56         req8.source_dsa_invocation_id = misc.GUID(invocation_id)
57         req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
58         req8.naming_context.dn = unicode(nc_dn_str)
59         req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
60         req8.highwatermark.tmp_highest_usn = 0
61         req8.highwatermark.reserved_usn = 0
62         req8.highwatermark.highest_usn = 0
63         req8.uptodateness_vector = None
64         req8.replica_flags = replica_flags
65         req8.max_object_count = max_objects
66         req8.max_ndr_size = 402116
67         req8.extended_op = exop
68         req8.fsmo_info = 0
69         req8.partial_attribute_set = partial_attribute_set
70         req8.partial_attribute_set_ex = partial_attribute_set_ex
71         if mapping_ctr:
72             req8.mapping_ctr = mapping_ctr
73         else:
74             req8.mapping_ctr.num_mappings = 0
75             req8.mapping_ctr.mappings = None
76
77         return req8
78
79     def _ds_bind(self, server_name):
80         binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
81
82         drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
83         (drs_handle, supported_extensions) = drs_DsBind(drs)
84         return (drs, drs_handle)
85
86
87 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
88     """Intended as a semi-black box test case for DsGetNCChanges
89        implementation for extended operations. It should be testing
90        how DsGetNCChanges handles different input params (mostly invalid).
91        Final goal is to make DsGetNCChanges as binary compatible to
92        Windows implementation as possible"""
93
94     def setUp(self):
95         super(DrsReplicaSyncTestCase, self).setUp()
96
97     def tearDown(self):
98         super(DrsReplicaSyncTestCase, self).tearDown()
99
100     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
101         """Returns (owner, not_owner) pair where:
102              owner: dns name for FSMO owner
103              not_owner: dns name for DC not owning the FSMO"""
104         # collect info to return later
105         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
106                        "invocation_id": self.ldb_dc1.get_invocation_id(),
107                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
108                        "server_dn": self.ldb_dc1.get_serverName()}
109         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
110                        "invocation_id": self.ldb_dc2.get_invocation_id(),
111                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
112                        "server_dn": self.ldb_dc2.get_serverName()}
113
114         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
115         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
116         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
117
118         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
119         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
120         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
121
122         # determine the owner dc
123         res = self.ldb_dc1.search(fsmo_obj_dn,
124                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
125         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
126         fsmo_owner = res[0]["fSMORoleOwner"][0]
127         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
128             return (fsmo_info_1, fsmo_info_2)
129         return (fsmo_info_2, fsmo_info_1)
130
131     def _check_exop_failed(self, ctr6, expected_failure):
132         self.assertEqual(ctr6.extended_ret, expected_failure)
133         #self.assertEqual(ctr6.object_count, 0)
134         #self.assertEqual(ctr6.first_object, None)
135         self.assertEqual(ctr6.more_data, False)
136         self.assertEqual(ctr6.nc_object_count, 0)
137         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
138         self.assertEqual(ctr6.linked_attributes_count, 0)
139         self.assertEqual(ctr6.linked_attributes, [])
140         self.assertEqual(ctr6.drs_error[0], 0)
141
142     def test_InvalidDestDSA_ridalloc(self):
143         """Test RID allocation with invalid destination DSA guid"""
144         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
145         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
146
147         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
148                                invocation_id=fsmo_owner["invocation_id"],
149                                nc_dn_str=fsmo_dn,
150                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
151
152         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
153         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
154         self.assertEqual(level, 6, "Expected level 6 response!")
155         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
156         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
157         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
158
159     def test_do_ridalloc(self):
160         """Test doing a RID allocation with a valid destination DSA guid"""
161         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
162         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
163
164         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
165                                invocation_id=fsmo_owner["invocation_id"],
166                                nc_dn_str=fsmo_dn,
167                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
168
169         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
170         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
171         self.assertEqual(level, 6, "Expected level 6 response!")
172         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
173         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
174         ctr6 = ctr
175         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
176         self.assertEqual(ctr6.object_count, 3)
177         self.assertNotEqual(ctr6.first_object, None)
178         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
179         self.assertNotEqual(ctr6.first_object.next_object, None)
180         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
181         second_object = ctr6.first_object.next_object.object
182         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
183         third_object = ctr6.first_object.next_object.next_object.object
184         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
185
186         self.assertEqual(ctr6.more_data, False)
187         self.assertEqual(ctr6.nc_object_count, 0)
188         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
189         self.assertEqual(ctr6.drs_error[0], 0)
190         # We don't check the linked_attributes_count as if the domain
191         # has an RODC, it can gain links on the server account object
192
193     def test_do_ridalloc_get_anc(self):
194         """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
195         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
196         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
197
198         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
199                                invocation_id=fsmo_owner["invocation_id"],
200                                nc_dn_str=fsmo_dn,
201                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
202                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
203
204         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
205         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
206         self.assertEqual(level, 6, "Expected level 6 response!")
207         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
208         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
209         ctr6 = ctr
210         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
211         self.assertEqual(ctr6.object_count, 3)
212         self.assertNotEqual(ctr6.first_object, None)
213         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
214         self.assertNotEqual(ctr6.first_object.next_object, None)
215         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
216         second_object = ctr6.first_object.next_object.object
217         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
218         third_object = ctr6.first_object.next_object.next_object.object
219         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
220         self.assertEqual(ctr6.more_data, False)
221         self.assertEqual(ctr6.nc_object_count, 0)
222         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
223         self.assertEqual(ctr6.drs_error[0], 0)
224         # We don't check the linked_attributes_count as if the domain
225         # has an RODC, it can gain links on the server account object
226
227     def test_edit_rid_master(self):
228         """Test doing a RID allocation after changing the RID master from the original one.
229            This should set rIDNextRID to 0 on the new RID master."""
230         # 1. a. Transfer role to non-RID master
231         #    b. Check that it succeeds correctly
232         #
233         # 2. a. Call the RID alloc against the former master.
234         #    b. Check that it succeeds.
235         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
236         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
237
238         # 1. Swap RID master role
239         m = ldb.Message()
240         m.dn = ldb.Dn(self.ldb_dc1, "")
241         m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
242                                                   "becomeRidMaster")
243
244         # Make sure that ldb_dc1 == RID Master
245
246         server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
247
248         # self.ldb_dc1 == LOCALDC
249         if server_dn == fsmo_owner['server_dn']:
250             # ldb_dc1 == VAMPIREDC
251             ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
252         else:
253             # Otherwise switch the two
254             ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
255
256         try:
257             # ldb_dc1 is now RID MASTER (as VAMPIREDC)
258             ldb_dc1.modify(m)
259         except ldb.LdbError, (num, msg):
260             self.fail("Failed to reassign RID Master " +  msg)
261
262         try:
263             # 2. Perform a RID alloc
264             req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
265                     invocation_id=fsmo_not_owner["invocation_id"],
266                     nc_dn_str=fsmo_dn,
267                     exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
268
269             (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
270             # 3. Make sure the allocation succeeds
271             try:
272                 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
273             except RuntimeError, e:
274                 self.fail("RID allocation failed: " + str(e))
275
276             fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
277
278             self.assertEqual(level, 6, "Expected level 6 response!")
279             self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
280             self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
281             ctr6 = ctr
282             self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
283             self.assertEqual(ctr6.object_count, 3)
284             self.assertNotEqual(ctr6.first_object, None)
285             self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
286             self.assertNotEqual(ctr6.first_object.next_object, None)
287             self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
288             second_object = ctr6.first_object.next_object.object
289             self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
290             third_object = ctr6.first_object.next_object.next_object.object
291             self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
292         finally:
293             # Swap the RID master back for other tests
294             m = ldb.Message()
295             m.dn = ldb.Dn(ldb_dc2, "")
296             m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
297             try:
298                 ldb_dc2.modify(m)
299             except ldb.LdbError, (num, msg):
300                 self.fail("Failed to restore RID Master " +  msg)
301
302     def test_offline_samba_tool_seized_ridalloc(self):
303         """Perform a join against the non-RID manager and then seize the RID Manager role"""
304
305         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
306         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
307
308         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
309         try:
310             # Connect to the database
311             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
312             smbconf = os.path.join(targetdir, "etc/smb.conf")
313
314             lp = self.get_loadparm()
315             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
316                             session_info=system_session(lp), lp=lp)
317
318             # 1. Get server name
319             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
320                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
321             # 2. Get server reference
322             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
323
324             # Assert that no RID Set has been set
325             res = new_ldb.search(base=server_ref_dn,
326                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
327
328             self.assertFalse("rIDSetReferences" in res[0])
329
330             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
331             self.assertCmdSuccess(result, out, err)
332             self.assertEquals(err,"","Shouldn't be any error messages")
333
334             # 3. Assert we get the RID Set
335             res = new_ldb.search(base=server_ref_dn,
336                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
337
338             self.assertTrue("rIDSetReferences" in res[0])
339         finally:
340             shutil.rmtree(targetdir, ignore_errors=True)
341             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
342
343     def _test_join(self, server, netbios_name):
344         tmpdir = os.path.join(self.tempdir, "targetdir")
345         creds = self.get_credentials()
346         cmd = cmd_sambatool.subcommands['domain'].subcommands['join']
347         result = cmd._run("samba-tool domain join",
348                           creds.get_realm(),
349                           "dc", "-U%s%%%s" % (creds.get_username(),
350                                               creds.get_password()),
351                           '--targetdir=%s' % tmpdir,
352                           '--server=%s' % server,
353                           "--option=netbios name = %s" % netbios_name)
354         return tmpdir
355
356     def _test_force_demote(self, server, netbios_name):
357         creds = self.get_credentials()
358         cmd = cmd_sambatool.subcommands['domain'].subcommands['demote']
359         result = cmd._run("samba-tool domain demote",
360                           "-U%s%%%s" % (creds.get_username(),
361                                               creds.get_password()),
362                           '--server=%s' % server,
363                           "--remove-other-dead-server=%s" % netbios_name)
364
365     def test_offline_manual_seized_ridalloc_with_dbcheck(self):
366         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
367         but do not create the RID set. Confirm that dbcheck correctly creates
368         the RID Set.
369
370         Also check
371         """
372         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
373         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
374
375         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
376         try:
377             # Connect to the database
378             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
379             lp = self.get_loadparm()
380
381             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
382                             session_info=system_session(lp), lp=lp)
383
384             serviceName = new_ldb.get_dsServiceName()
385             m = ldb.Message()
386             m.dn = fsmo_dn
387             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
388                                                    ldb.FLAG_MOD_REPLACE,
389                                                    "fSMORoleOwner")
390             new_ldb.modify(m)
391
392             # 1. Get server name
393             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
394                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
395             # 2. Get server reference
396             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
397
398             # Assert that no RID Set has been set
399             res = new_ldb.search(base=server_ref_dn,
400                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
401
402             self.assertFalse("rIDSetReferences" in res[0])
403
404             smbconf = os.path.join(targetdir, "etc/smb.conf")
405
406             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
407
408             self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
409
410             # 3. Assert we get the RID Set
411             res = new_ldb.search(base=server_ref_dn,
412                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
413
414             self.assertTrue("rIDSetReferences" in res[0])
415         finally:
416             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
417             shutil.rmtree(targetdir, ignore_errors=True)
418
419     def test_offline_manual_seized_ridalloc_add_user(self):
420         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
421         but do not create the RID set. Confirm that user-add correctly creates
422         the RID Set."""
423         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
424         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
425
426         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
427         try:
428             # Connect to the database
429             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
430             lp = self.get_loadparm()
431
432             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
433                             session_info=system_session(lp), lp=lp)
434
435             serviceName = new_ldb.get_dsServiceName()
436             m = ldb.Message()
437             m.dn = fsmo_dn
438             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
439                                                    ldb.FLAG_MOD_REPLACE,
440                                                    "fSMORoleOwner")
441             new_ldb.modify(m)
442
443             # 1. Get server name
444             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
445                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
446             # 2. Get server reference
447             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
448
449             # Assert that no RID Set has been set
450             res = new_ldb.search(base=server_ref_dn,
451                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
452
453             self.assertFalse("rIDSetReferences" in res[0])
454
455             smbconf = os.path.join(targetdir, "etc/smb.conf")
456
457             new_ldb.newuser("ridalloctestuser", "P@ssword!")
458
459             # 3. Assert we get the RID Set
460             res = new_ldb.search(base=server_ref_dn,
461                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
462
463             self.assertTrue("rIDSetReferences" in res[0])
464
465         finally:
466             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
467             shutil.rmtree(targetdir, ignore_errors=True)
468
469     def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
470         """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
471         but do not create the RID set. Confirm that user-add correctly creates
472         the RID Set."""
473         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
474         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
475
476         targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
477         try:
478             # Connect to the database
479             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
480             lp = self.get_loadparm()
481
482             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
483                             session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
484
485             serviceName = new_ldb.get_dsServiceName()
486             m = ldb.Message()
487             m.dn = fsmo_dn
488             m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
489                                                    ldb.FLAG_MOD_REPLACE,
490                                                    "fSMORoleOwner")
491             new_ldb.modify(m)
492
493             # 1. Get server name
494             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
495                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
496             # 2. Get server reference
497             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
498
499             # Assert that no RID Set has been set
500             res = new_ldb.search(base=server_ref_dn,
501                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
502
503             self.assertFalse("rIDSetReferences" in res[0])
504
505             smbconf = os.path.join(targetdir, "etc/smb.conf")
506
507             # Create a user to allocate a RID Set for itself (the RID master)
508             new_ldb.newuser("ridalloctestuser", "P@ssword!")
509
510             # 3. Assert we get the RID Set
511             res = new_ldb.search(base=server_ref_dn,
512                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
513
514             self.assertTrue("rIDSetReferences" in res[0])
515
516         finally:
517             self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
518             shutil.rmtree(targetdir, ignore_errors=True)
519
520     def test_join_time_ridalloc(self):
521         """Perform a join against the RID manager and assert we have a RID Set"""
522
523         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
524         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
525
526         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
527         try:
528             # Connect to the database
529             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
530             smbconf = os.path.join(targetdir, "etc/smb.conf")
531
532             lp = self.get_loadparm()
533             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
534                             session_info=system_session(lp), lp=lp)
535
536             # 1. Get server name
537             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
538                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
539             # 2. Get server reference
540             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
541
542             # 3. Assert we get the RID Set
543             res = new_ldb.search(base=server_ref_dn,
544                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
545
546             self.assertTrue("rIDSetReferences" in res[0])
547         finally:
548             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
549             shutil.rmtree(targetdir, ignore_errors=True)
550
551     def test_rid_set_dbcheck(self):
552         """Perform a join against the RID manager and assert we have a RID Set.
553         Using dbcheck, we assert that we can detect out of range users."""
554
555         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
556         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
557
558         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
559         try:
560             # Connect to the database
561             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
562             smbconf = os.path.join(targetdir, "etc/smb.conf")
563
564             lp = self.get_loadparm()
565             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
566                             session_info=system_session(lp), lp=lp)
567
568             # 1. Get server name
569             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
570                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
571             # 2. Get server reference
572             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
573
574             # 3. Assert we get the RID Set
575             res = new_ldb.search(base=server_ref_dn,
576                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
577
578             self.assertTrue("rIDSetReferences" in res[0])
579             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
580
581             # 4. Add a new user (triggers RID set work)
582             new_ldb.newuser("ridalloctestuser", "P@ssword!")
583
584             # 5. Now fetch the RID SET
585             rid_set_res = new_ldb.search(base=rid_set_dn,
586                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
587                                                                       'rIDAllocationPool'])
588             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
589             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
590
591             # 6. Add user above the ridNextRid and at mid-range.
592             #
593             # We can do this with safety because this is an offline DB that will be
594             # destroyed.
595             m = ldb.Message()
596             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
597             m.dn.add_base(new_ldb.get_default_basedn())
598             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
599             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
600                                                 ldb.FLAG_MOD_ADD,
601                                                 'objectSid')
602             new_ldb.add(m, controls=["relax:0"])
603
604             # 7. Check the RID Set
605             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
606
607             # Should have one error (wrong rIDNextRID)
608             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
609
610             # 8. Assert we get didn't show any other errors
611             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
612
613             rid_set_res = new_ldb.search(base=rid_set_dn,
614                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
615                                                                       'rIDAllocationPool'])
616             last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
617             self.assertEquals(last_allocated_rid, last_rid - 10)
618
619             # 9. Assert that the range wasn't thrown away
620
621             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
622             self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
623         finally:
624             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
625             shutil.rmtree(targetdir, ignore_errors=True)
626
627
628     def test_rid_set_dbcheck_after_seize(self):
629         """Perform a join against the RID manager and assert we have a RID Set.
630         We seize the RID master role, then using dbcheck, we assert that we can
631         detect out of range users (and then bump the RID set as required)."""
632
633         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
634         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
635
636         targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
637         try:
638             # Connect to the database
639             ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
640             smbconf = os.path.join(targetdir, "etc/smb.conf")
641
642             lp = self.get_loadparm()
643             new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
644                             session_info=system_session(lp), lp=lp)
645
646             # 1. Get server name
647             res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
648                                  scope=ldb.SCOPE_BASE, attrs=["serverReference"])
649             # 2. Get server reference
650             server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
651
652             # 3. Assert we get the RID Set
653             res = new_ldb.search(base=server_ref_dn,
654                                  scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
655
656             self.assertTrue("rIDSetReferences" in res[0])
657             rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
658
659             # 4. Seize the RID Manager role
660             (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
661             self.assertCmdSuccess(result, out, err)
662             self.assertEquals(err,"","Shouldn't be any error messages")
663
664             # 5. Add a new user (triggers RID set work)
665             new_ldb.newuser("ridalloctestuser", "P@ssword!")
666
667             # 6. Now fetch the RID SET
668             rid_set_res = new_ldb.search(base=rid_set_dn,
669                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
670                                                                       'rIDAllocationPool'])
671             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
672             last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
673
674             # 7. Add user above the ridNextRid and at almost the end of the range.
675             #
676             m = ldb.Message()
677             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
678             m.dn.add_base(new_ldb.get_default_basedn())
679             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
680             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
681                                                 ldb.FLAG_MOD_ADD,
682                                                 'objectSid')
683             new_ldb.add(m, controls=["relax:0"])
684
685             # 8. Add user above the ridNextRid and at the end of the range
686             m = ldb.Message()
687             m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
688             m.dn.add_base(new_ldb.get_default_basedn())
689             m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
690             m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
691                                                 ldb.FLAG_MOD_ADD,
692                                                 'objectSid')
693             new_ldb.add(m, controls=["relax:0"])
694
695             chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
696
697             # Should have fixed two errors (wrong ridNextRid)
698             self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
699
700             # 9. Assert we get didn't show any other errors
701             chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
702
703             # 10. Add another user (checks RID rollover)
704             # We have seized the role, so we can do that.
705             new_ldb.newuser("ridalloctestuser3", "P@ssword!")
706
707             rid_set_res = new_ldb.search(base=rid_set_dn,
708                                          scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
709                                                                       'rIDAllocationPool'])
710             next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
711             self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
712         finally:
713             self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
714             shutil.rmtree(targetdir, ignore_errors=True)