tests/ridalloc_exop: Add a new suite of tests for RID allocation
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_exop.py
index 941d323382003fa0df4ff656d15feeeb40ed2aa2..246d8593358bca5df26b80c2d1e422cfa5805ea0 100644 (file)
@@ -204,167 +204,6 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
 
-    def test_InvalidDestDSA_ridalloc(self):
-        """Test RID allocation with invalid destination DSA guid"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-
-    def test_do_ridalloc(self):
-        """Test doing a RID allocation with a valid destination DSA guid"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-        ctr6 = ctr
-        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-        self.assertEqual(ctr6.object_count, 3)
-        self.assertNotEqual(ctr6.first_object, None)
-        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
-        self.assertNotEqual(ctr6.first_object.next_object, None)
-        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-        second_object = ctr6.first_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
-        third_object = ctr6.first_object.next_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
-
-        self.assertEqual(ctr6.more_data, False)
-        self.assertEqual(ctr6.nc_object_count, 0)
-        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
-        self.assertEqual(ctr6.drs_error[0], 0)
-        # We don't check the linked_attributes_count as if the domain
-        # has an RODC, it can gain links on the server account object
-
-    def test_do_ridalloc_get_anc(self):
-        """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
-                               replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-        ctr6 = ctr
-        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-        self.assertEqual(ctr6.object_count, 3)
-        self.assertNotEqual(ctr6.first_object, None)
-        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
-        self.assertNotEqual(ctr6.first_object.next_object, None)
-        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-        second_object = ctr6.first_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
-        third_object = ctr6.first_object.next_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
-        self.assertEqual(ctr6.more_data, False)
-        self.assertEqual(ctr6.nc_object_count, 0)
-        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
-        self.assertEqual(ctr6.drs_error[0], 0)
-        # We don't check the linked_attributes_count as if the domain
-        # has an RODC, it can gain links on the server account object
-
-    def test_edit_rid_master(self):
-        """Test doing a RID allocation after changing the RID master from the original one.
-           This should set rIDNextRID to 0 on the new RID master."""
-        # 1. a. Transfer role to non-RID master
-        #    b. Check that it succeeds correctly
-        #
-        # 2. a. Call the RID alloc against the former master.
-        #    b. Check that it succeeds.
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        # 1. Swap RID master role
-        m = ldb.Message()
-        m.dn = ldb.Dn(self.ldb_dc1, "")
-        m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
-                                                  "becomeRidMaster")
-
-        # Make sure that ldb_dc1 == RID Master
-
-        server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
-
-        # self.ldb_dc1 == LOCALDC
-        if server_dn == fsmo_owner['server_dn']:
-            # ldb_dc1 == VAMPIREDC
-            ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
-        else:
-            # Otherwise switch the two
-            ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
-
-        try:
-            # ldb_dc1 is now RID MASTER (as VAMPIREDC)
-            ldb_dc1.modify(m)
-        except ldb.LdbError, (num, msg):
-            self.fail("Failed to reassign RID Master " +  msg)
-
-        try:
-            # 2. Perform a RID alloc
-            req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
-                    invocation_id=fsmo_not_owner["invocation_id"],
-                    nc_dn_str=fsmo_dn,
-                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-            (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
-            # 3. Make sure the allocation succeeds
-            try:
-                (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-            except RuntimeError, e:
-                self.fail("RID allocation failed: " + str(e))
-
-            fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-
-            self.assertEqual(level, 6, "Expected level 6 response!")
-            self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
-            self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
-            ctr6 = ctr
-            self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-            self.assertEqual(ctr6.object_count, 3)
-            self.assertNotEqual(ctr6.first_object, None)
-            self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
-            self.assertNotEqual(ctr6.first_object.next_object, None)
-            self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-            second_object = ctr6.first_object.next_object.object
-            self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
-            third_object = ctr6.first_object.next_object.next_object.object
-            self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
-        finally:
-            # Swap the RID master back for other tests
-            m = ldb.Message()
-            m.dn = ldb.Dn(ldb_dc2, "")
-            m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
-            try:
-                ldb_dc2.modify(m)
-            except ldb.LdbError, (num, msg):
-                self.fail("Failed to restore RID Master " +  msg)
-
-
 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
     def setUp(self):
         super(DrsReplicaPrefixMapTestCase, self).setUp()