tests/drs: assert sorted identifier GUIDs across getncchanges
authorGarming Sam <garming@catalyst.net.nz>
Thu, 9 Jun 2016 02:55:57 +0000 (14:55 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 17 Jun 2016 12:13:18 +0000 (14:13 +0200)
Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11960

source4/torture/drs/python/getnc_exop.py

index 78fc14ecc58826284be13711d9740bc9f2f5eb12..0ddac8b9244f0712d06ce6b60d77c670304f7d54 100644 (file)
@@ -79,7 +79,7 @@ class AbstractLink:
 
 class ExopBaseTest:
     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
-                   replica_flags=0):
+                   replica_flags=0, max_objects=0):
         req8 = drsuapi.DsGetNCChangesRequest8()
 
         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
@@ -92,7 +92,7 @@ class ExopBaseTest:
         req8.highwatermark.highest_usn = 0
         req8.uptodateness_vector = None
         req8.replica_flags = replica_flags
-        req8.max_object_count = 0
+        req8.max_object_count = max_objects
         req8.max_ndr_size = 402116
         req8.extended_op = exop
         req8.fsmo_info = 0
@@ -303,8 +303,21 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
             if enum == ldb.ERR_NO_SUCH_OBJECT:
                 pass
 
+    def add_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
+        self.ldb_dc1.modify(m)
+
+    def remove_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
+        self.ldb_dc1.modify(m)
+
     def test_sort_behaviour_single_object(self):
         """Testing sorting behaviour on single objects"""
+
         user1_dn = "cn=test_user1,%s" % self.ou
         user2_dn = "cn=test_user2,%s" % self.ou
         user3_dn = "cn=test_user3,%s" % self.ou
@@ -414,14 +427,49 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
         self.assertEqual(len(expected_links), ctr.linked_attributes_count)
         self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
 
-    def add_linked_attribute(self, src, dest, attr='member'):
-        m = ldb.Message()
-        m.dn = ldb.Dn(self.ldb_dc1, src)
-        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
-        self.ldb_dc1.modify(m)
+    def test_sort_behaviour_ncchanges(self):
+        """Testing sorting behaviour on a group of objects."""
+        user1_dn = "cn=test_user1,%s" % self.ou
+        group_dn = "cn=test_group,%s" % self.ou
+        self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
+        self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
 
-    def remove_linked_attribute(self, src, dest, attr='member'):
-        m = ldb.Message()
-        m.dn = ldb.Dn(self.ldb_dc1, src)
-        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
-        self.ldb_dc1.modify(m)
+        self.add_linked_attribute(group_dn, user1_dn,
+                                  attr='member')
+
+        dc_guid_1 = self.ldb_dc1.get_invocation_id()
+
+        drs, drs_handle = self._ds_bind(self.dnsname_dc1)
+
+        # Make sure the max objects count is high enough
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=dc_guid_1,
+                               nc_dn_str=self.base_dn,
+                               replica_flags=0,
+                               max_objects=100,
+                               exop=drsuapi.DRSUAPI_EXOP_NONE)
+
+        # Loop until we get linked attributes, or we get to the end.
+        # Samba sends linked attributes at the end, unlike Windows.
+        while True:
+            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+            if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
+                break
+            req8.highwatermark = ctr.new_highwatermark
+
+        self.assertTrue(ctr.linked_attributes_count != 0)
+
+        no_inactive = []
+        for link in ctr.linked_attributes:
+            try:
+                target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
+                                     link.value.blob).guid
+            except:
+                target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
+                                         link.value.blob).guid
+            no_inactive.append((link, target_guid))
+
+        no_inactive.sort(cmp=_linked_attribute_compare)
+
+        # assert the two arrays are the same
+        self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)