selftest/drs: Show we return the correct 3 objects for DRSUAPI_EXOP_FSMO_RID_ALLOC
authorAndrew Bartlett <abartlet@samba.org>
Fri, 4 Mar 2016 00:12:42 +0000 (13:12 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 6 Jun 2016 14:36:23 +0000 (16:36 +0200)
This does not depend on DRSUAPI_DRS_GET_ANC.

This test is not new, but it was not previously being run.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
source4/selftest/tests.py
source4/torture/drs/python/getnc_exop.py

index f7bbd04bd9e843e0379268393c81c615624801c8..4866cd72ee72dce7102d077a0e45daebeed97e6c 100755 (executable)
@@ -652,6 +652,11 @@ for env in ['vampire_dc', 'promoted_dc']:
                            name="samba4.drs.repl_move.python(%s)" % env,
                            environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
                            extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+    planoldpythontestsuite(env, "getnc_exop",
+                           extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+                           name="samba4.drs.getnc_exop.python(%s)" % env,
+                           environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+                           extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
 
 
 for env in ["ad_dc_ntvfs", "s4member", "rodc", "promoted_dc", "ad_dc", "ad_member"]:
index 904c0133334719b8c2e1afaa09866fef7b7a78eb..6b796bab923f4a3e578b2ce7da009d159265e075 100644 (file)
@@ -4,6 +4,7 @@
 # Tests various schema replication scenarios
 #
 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -30,6 +31,7 @@
 import drs_base
 import samba.tests
 
+import ldb
 from ldb import SCOPE_BASE
 
 from samba.dcerpc import drsuapi, misc, drsblobs
@@ -49,7 +51,8 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
     def tearDown(self):
         super(DrsReplicaSyncTestCase, self).tearDown()
 
-    def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop):
+    def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
+                   replica_flags=0):
         req8 = drsuapi.DsGetNCChangesRequest8()
     
         req8.destination_dsa_guid = misc.GUID(dest_dsa)
@@ -61,7 +64,7 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
         req8.highwatermark.reserved_usn = 0
         req8.highwatermark.highest_usn = 0
         req8.uptodateness_vector = None
-        req8.replica_flags = 0
+        req8.replica_flags = replica_flags
         req8.max_object_count = 0
         req8.max_ndr_size = 402116
         req8.extended_op = exop
@@ -87,10 +90,21 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
         # collect info to return later
         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
                        "invocation_id": self.ldb_dc1.get_invocation_id(),
-                       "ntds_guid": self.ldb_dc1.get_ntds_GUID()}
+                       "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
+                       "server_dn": self.ldb_dc1.get_serverName()}
         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
                        "invocation_id": self.ldb_dc2.get_invocation_id(),
-                       "ntds_guid": self.ldb_dc2.get_ntds_GUID()}
+                       "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
+                       "server_dn": self.ldb_dc2.get_serverName()}
+
+        msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
+        fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
+        fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
+
+        msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
+        fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
+        fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
+
         # determine the owner dc
         res = self.ldb_dc1.search(fsmo_obj_dn,
                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
@@ -108,7 +122,7 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
         self.assertEqual(ctr6.nc_object_count, 0)
         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
         self.assertEqual(ctr6.linked_attributes_count, 0)
-        self.assertEqual(ctr6.linked_attributes, None)
+        self.assertEqual(ctr6.linked_attributes, [])
         self.assertEqual(ctr6.drs_error[0], 0)
 
     def test_FSMONotOwner(self):
@@ -144,3 +158,88 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
+    def test_InvalidDestDSA_ridalloc(self):
+        """Test RID allocation with invalid destination DSA guid"""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
+    def test_do_ridalloc(self):
+        """Test doing a RID allocation with a valid destination DSA guid"""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+        ctr6 = ctr
+        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
+        self.assertEqual(ctr6.object_count, 3)
+        self.assertNotEqual(ctr6.first_object, None)
+        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
+        self.assertNotEqual(ctr6.first_object.next_object, None)
+        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
+        second_object = ctr6.first_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
+        third_object = ctr6.first_object.next_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
+
+        self.assertEqual(ctr6.more_data, False)
+        self.assertEqual(ctr6.nc_object_count, 0)
+        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
+        self.assertEqual(ctr6.drs_error[0], 0)
+        # We don't check the linked_attributes_count as if the domain
+        # has an RODC, it can gain links on the server account object
+
+    def test_do_ridalloc_get_anc(self):
+        """Test doing a RID allocation with a valid destination DSA guid and """
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
+                               replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+        ctr6 = ctr
+        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
+        self.assertEqual(ctr6.object_count, 3)
+        self.assertNotEqual(ctr6.first_object, None)
+        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
+        self.assertNotEqual(ctr6.first_object.next_object, None)
+        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
+        second_object = ctr6.first_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
+        third_object = ctr6.first_object.next_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
+        self.assertEqual(ctr6.more_data, False)
+        self.assertEqual(ctr6.nc_object_count, 0)
+        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
+        self.assertEqual(ctr6.drs_error[0], 0)
+        # We don't check the linked_attributes_count as if the domain
+        # has an RODC, it can gain links on the server account object