class ExopBaseTest:
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
- replica_flags=0):
+ replica_flags=0, max_objects=0):
req8 = drsuapi.DsGetNCChangesRequest8()
req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.highwatermark.highest_usn = 0
req8.uptodateness_vector = None
req8.replica_flags = replica_flags
- req8.max_object_count = 0
+ req8.max_object_count = max_objects
req8.max_ndr_size = 402116
req8.extended_op = exop
req8.fsmo_info = 0
if enum == ldb.ERR_NO_SUCH_OBJECT:
pass
+ def add_linked_attribute(self, src, dest, attr='member'):
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, src)
+ m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
+ self.ldb_dc1.modify(m)
+
+ def remove_linked_attribute(self, src, dest, attr='member'):
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, src)
+ m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
+ self.ldb_dc1.modify(m)
+
def test_sort_behaviour_single_object(self):
"""Testing sorting behaviour on single objects"""
+
user1_dn = "cn=test_user1,%s" % self.ou
user2_dn = "cn=test_user2,%s" % self.ou
user3_dn = "cn=test_user3,%s" % self.ou
self.assertEqual(len(expected_links), ctr.linked_attributes_count)
self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
- def add_linked_attribute(self, src, dest, attr='member'):
- m = ldb.Message()
- m.dn = ldb.Dn(self.ldb_dc1, src)
- m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
- self.ldb_dc1.modify(m)
+ def test_sort_behaviour_ncchanges(self):
+ """Testing sorting behaviour on a group of objects."""
+ user1_dn = "cn=test_user1,%s" % self.ou
+ group_dn = "cn=test_group,%s" % self.ou
+ self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
+ self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
- def remove_linked_attribute(self, src, dest, attr='member'):
- m = ldb.Message()
- m.dn = ldb.Dn(self.ldb_dc1, src)
- m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
- self.ldb_dc1.modify(m)
+ self.add_linked_attribute(group_dn, user1_dn,
+ attr='member')
+
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1)
+
+ # Make sure the max objects count is high enough
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str=self.base_dn,
+ replica_flags=0,
+ max_objects=100,
+ exop=drsuapi.DRSUAPI_EXOP_NONE)
+
+ # Loop until we get linked attributes, or we get to the end.
+ # Samba sends linked attributes at the end, unlike Windows.
+ while True:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
+ break
+ req8.highwatermark = ctr.new_highwatermark
+
+ self.assertTrue(ctr.linked_attributes_count != 0)
+
+ no_inactive = []
+ for link in ctr.linked_attributes:
+ try:
+ target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
+ link.value.blob).guid
+ except:
+ target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
+ link.value.blob).guid
+ no_inactive.append((link, target_guid))
+
+ no_inactive.sort(cmp=_linked_attribute_compare)
+
+ # assert the two arrays are the same
+ self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)