2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # export DC1=dc1_dns_name
26 # export DC2=dc2_dns_name
27 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
35 from ldb import SCOPE_BASE
37 from samba.dcerpc import drsuapi, misc, drsblobs
38 from samba.drs_utils import drs_DsBind
39 from samba.ndr import ndr_unpack, ndr_pack
41 def _linked_attribute_compare(la1, la2):
42 """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
46 # Ascending host object GUID
47 c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
51 # Ascending attribute ID
52 if la1.attid != la2.attid:
53 return -1 if la1.attid < la2.attid else 1
55 la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
56 la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
58 # Ascending 'is present'
59 if la1_active != la2_active:
60 return 1 if la1_active else -1
62 # Ascending target object GUID
63 return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
66 def __init__(self, attid, flags, identifier, targetGUID):
69 self.identifier = identifier
70 self.targetGUID = targetGUID
72 def __eq__(self, other):
73 return isinstance(other, AbstractLink) and \
74 ((self.attid, self.flags, self.identifier, self.targetGUID) ==
75 (other.attid, other.flags, other.identifier, other.targetGUID))
78 return hash((self.attid, self.flags, self.identifier, self.targetGUID))
81 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
82 replica_flags=0, max_objects=0, partial_attribute_set=None,
84 req8 = drsuapi.DsGetNCChangesRequest8()
86 req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
87 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
88 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
89 req8.naming_context.dn = unicode(nc_dn_str)
90 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
91 req8.highwatermark.tmp_highest_usn = 0
92 req8.highwatermark.reserved_usn = 0
93 req8.highwatermark.highest_usn = 0
94 req8.uptodateness_vector = None
95 req8.replica_flags = replica_flags
96 req8.max_object_count = max_objects
97 req8.max_ndr_size = 402116
98 req8.extended_op = exop
100 req8.partial_attribute_set = partial_attribute_set
101 req8.partial_attribute_set_ex = None
103 req8.mapping_ctr = mapping_ctr
105 req8.mapping_ctr.num_mappings = 0
106 req8.mapping_ctr.mappings = None
110 def _ds_bind(self, server_name):
111 binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
113 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
114 (drs_handle, supported_extensions) = drs_DsBind(drs)
115 return (drs, drs_handle)
118 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
119 """Intended as a semi-black box test case for DsGetNCChanges
120 implementation for extended operations. It should be testing
121 how DsGetNCChanges handles different input params (mostly invalid).
122 Final goal is to make DsGetNCChanges as binary compatible to
123 Windows implementation as possible"""
126 super(DrsReplicaSyncTestCase, self).setUp()
129 super(DrsReplicaSyncTestCase, self).tearDown()
131 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
132 """Returns (owner, not_owner) pair where:
133 owner: dns name for FSMO owner
134 not_owner: dns name for DC not owning the FSMO"""
135 # collect info to return later
136 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
137 "invocation_id": self.ldb_dc1.get_invocation_id(),
138 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
139 "server_dn": self.ldb_dc1.get_serverName()}
140 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
141 "invocation_id": self.ldb_dc2.get_invocation_id(),
142 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
143 "server_dn": self.ldb_dc2.get_serverName()}
145 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
146 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
147 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
149 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
150 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
151 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
153 # determine the owner dc
154 res = self.ldb_dc1.search(fsmo_obj_dn,
155 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
156 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
157 fsmo_owner = res[0]["fSMORoleOwner"][0]
158 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
159 return (fsmo_info_1, fsmo_info_2)
160 return (fsmo_info_2, fsmo_info_1)
162 def _check_exop_failed(self, ctr6, expected_failure):
163 self.assertEqual(ctr6.extended_ret, expected_failure)
164 #self.assertEqual(ctr6.object_count, 0)
165 #self.assertEqual(ctr6.first_object, None)
166 self.assertEqual(ctr6.more_data, False)
167 self.assertEqual(ctr6.nc_object_count, 0)
168 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
169 self.assertEqual(ctr6.linked_attributes_count, 0)
170 self.assertEqual(ctr6.linked_attributes, [])
171 self.assertEqual(ctr6.drs_error[0], 0)
173 def test_FSMONotOwner(self):
174 """Test role transfer with against DC not owner of the role"""
175 fsmo_dn = self.ldb_dc1.get_schema_basedn()
176 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
178 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
179 invocation_id=fsmo_not_owner["invocation_id"],
181 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
183 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
184 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
185 self.assertEqual(level, 6, "Expected level 6 response!")
186 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
187 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
188 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
190 def test_InvalidDestDSA(self):
191 """Test role transfer with invalid destination DSA guid"""
192 fsmo_dn = self.ldb_dc1.get_schema_basedn()
193 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
195 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
196 invocation_id=fsmo_owner["invocation_id"],
198 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
200 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
201 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
202 self.assertEqual(level, 6, "Expected level 6 response!")
203 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
204 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
205 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
207 def test_InvalidDestDSA_ridalloc(self):
208 """Test RID allocation with invalid destination DSA guid"""
209 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
210 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
212 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
213 invocation_id=fsmo_owner["invocation_id"],
215 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
217 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
218 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
219 self.assertEqual(level, 6, "Expected level 6 response!")
220 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
221 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
222 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
224 def test_do_ridalloc(self):
225 """Test doing a RID allocation with a valid destination DSA guid"""
226 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
227 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
229 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
230 invocation_id=fsmo_owner["invocation_id"],
232 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
234 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
235 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
236 self.assertEqual(level, 6, "Expected level 6 response!")
237 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
238 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
240 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
241 self.assertEqual(ctr6.object_count, 3)
242 self.assertNotEqual(ctr6.first_object, None)
243 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
244 self.assertNotEqual(ctr6.first_object.next_object, None)
245 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
246 second_object = ctr6.first_object.next_object.object
247 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
248 third_object = ctr6.first_object.next_object.next_object.object
249 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
251 self.assertEqual(ctr6.more_data, False)
252 self.assertEqual(ctr6.nc_object_count, 0)
253 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
254 self.assertEqual(ctr6.drs_error[0], 0)
255 # We don't check the linked_attributes_count as if the domain
256 # has an RODC, it can gain links on the server account object
258 def test_do_ridalloc_get_anc(self):
259 """Test doing a RID allocation with a valid destination DSA guid and """
260 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
261 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
263 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
264 invocation_id=fsmo_owner["invocation_id"],
266 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
267 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
269 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
270 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
271 self.assertEqual(level, 6, "Expected level 6 response!")
272 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
273 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
275 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
276 self.assertEqual(ctr6.object_count, 3)
277 self.assertNotEqual(ctr6.first_object, None)
278 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
279 self.assertNotEqual(ctr6.first_object.next_object, None)
280 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
281 second_object = ctr6.first_object.next_object.object
282 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
283 third_object = ctr6.first_object.next_object.next_object.object
284 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
285 self.assertEqual(ctr6.more_data, False)
286 self.assertEqual(ctr6.nc_object_count, 0)
287 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
288 self.assertEqual(ctr6.drs_error[0], 0)
289 # We don't check the linked_attributes_count as if the domain
290 # has an RODC, it can gain links on the server account object
292 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
294 super(DrsReplicaPrefixMapTestCase, self).setUp()
295 self.base_dn = self.ldb_dc1.get_default_basedn()
296 self.ou = "ou=pfm_exop,%s" % self.base_dn
299 "objectclass": "organizationalUnit"})
300 self.user = "cn=testuser,%s" % self.ou
303 "objectclass": "user"})
306 super(DrsReplicaPrefixMapTestCase, self).tearDown()
308 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
309 except ldb.LdbError as (enum, string):
310 if enum == ldb.ERR_NO_SUCH_OBJECT:
313 def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
314 partial_attribute_set = drsuapi.DsPartialAttributeSet()
315 partial_attribute_set.attids = attids
316 partial_attribute_set.num_attids = len(attids)
317 return partial_attribute_set
319 def test_missing_prefix_map_dsa(self):
320 partial_attribute_set = self.get_partial_attribute_set()
322 dc_guid_1 = self.ldb_dc1.get_invocation_id()
324 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
326 req8 = self._exop_req8(dest_dsa=None,
327 invocation_id=dc_guid_1,
329 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
330 partial_attribute_set=partial_attribute_set)
333 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
334 self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
336 self.fail("Missing prefixmap shouldn't have triggered an error")
338 def test_invalid_prefix_map_attid(self):
339 # Request for invalid attid
340 partial_attribute_set = self.get_partial_attribute_set([99999])
342 pfm = self._samdb_fetch_pfm_and_schi()
344 dc_guid_1 = self.ldb_dc1.get_invocation_id()
346 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
348 req8 = self._exop_req8(dest_dsa=None,
349 invocation_id=dc_guid_1,
351 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
352 partial_attribute_set=partial_attribute_set,
356 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
357 self.fail("Invalid attid (99999) should have triggered an error")
358 except Exception as (ecode, emsg):
359 self.assertEqual(ecode, 0x000020E2, "Error code should have been "
360 "WERR_DS_DRA_SCHEMA_MISMATCH")
362 def _samdb_fetch_pfm_and_schi(self):
363 """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
365 res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
366 attrs=["prefixMap", "schemaInfo"])
368 pfm = ndr_unpack(drsblobs.prefixMapBlob,
369 str(res[0]['prefixMap']))
371 schi = drsuapi.DsReplicaOIDMapping()
374 if 'schemaInfo' in res[0]:
375 schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
376 schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
378 schema_info = drsblobs.schemaInfoBlob()
379 schema_info.revision = 0
380 schema_info.marker = 0xFF
381 schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
382 schi.oid.length = len(map(ord, ndr_pack(schema_info)))
383 schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
385 pfm.ctr.mappings = pfm.ctr.mappings + [schi]
386 pfm.ctr.num_mappings += 1
389 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
391 super(DrsReplicaSyncSortTestCase, self).setUp()
392 self.base_dn = self.ldb_dc1.get_default_basedn()
393 self.ou = "ou=sort_exop,%s" % self.base_dn
396 "objectclass": "organizationalUnit"})
399 super(DrsReplicaSyncSortTestCase, self).tearDown()
400 # tidyup groups and users
402 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
403 except ldb.LdbError as (enum, string):
404 if enum == ldb.ERR_NO_SUCH_OBJECT:
407 def add_linked_attribute(self, src, dest, attr='member'):
409 m.dn = ldb.Dn(self.ldb_dc1, src)
410 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
411 self.ldb_dc1.modify(m)
413 def remove_linked_attribute(self, src, dest, attr='member'):
415 m.dn = ldb.Dn(self.ldb_dc1, src)
416 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
417 self.ldb_dc1.modify(m)
419 def test_sort_behaviour_single_object(self):
420 """Testing sorting behaviour on single objects"""
422 user1_dn = "cn=test_user1,%s" % self.ou
423 user2_dn = "cn=test_user2,%s" % self.ou
424 user3_dn = "cn=test_user3,%s" % self.ou
425 group_dn = "cn=test_group,%s" % self.ou
427 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
428 self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
429 self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
430 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
432 u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
433 attrs=["objectGUID"])[0]['objectGUID'][0]))
434 u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
435 attrs=["objectGUID"])[0]['objectGUID'][0]))
436 u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
437 attrs=["objectGUID"])[0]['objectGUID'][0]))
438 g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
439 attrs=["objectGUID"])[0]['objectGUID'][0]))
441 self.add_linked_attribute(group_dn, user1_dn,
443 self.add_linked_attribute(group_dn, user2_dn,
445 self.add_linked_attribute(group_dn, user3_dn,
447 self.add_linked_attribute(group_dn, user1_dn,
449 self.add_linked_attribute(group_dn, user2_dn,
450 attr='nonSecurityMember')
451 self.add_linked_attribute(group_dn, user3_dn,
452 attr='nonSecurityMember')
454 set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
455 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
458 expected_links = set([set_inactive,
459 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
460 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
463 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
464 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
467 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
468 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
471 AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
472 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
475 AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
476 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
481 dc_guid_1 = self.ldb_dc1.get_invocation_id()
483 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
485 req8 = self._exop_req8(dest_dsa=None,
486 invocation_id=dc_guid_1,
488 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
490 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
493 for link in ctr.linked_attributes:
494 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
495 link.value.blob).guid
496 no_inactive.append((link, target_guid))
497 self.assertTrue(AbstractLink(link.attid, link.flags,
498 str(link.identifier.guid),
499 str(target_guid)) in expected_links)
501 no_inactive.sort(cmp=_linked_attribute_compare)
503 # assert the two arrays are the same
504 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
505 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
507 self.remove_linked_attribute(group_dn, user3_dn,
508 attr='nonSecurityMember')
510 # Set the link inactive
511 expected_links.remove(set_inactive)
512 set_inactive.flags = 0
513 expected_links.add(set_inactive)
516 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
517 for link in ctr.linked_attributes:
518 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
519 link.value.blob).guid
520 has_inactive.append((link, target_guid))
521 self.assertTrue(AbstractLink(link.attid, link.flags,
522 str(link.identifier.guid),
523 str(target_guid)) in expected_links)
525 has_inactive.sort(cmp=_linked_attribute_compare)
527 # assert the two arrays are the same
528 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
529 self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
531 def test_sort_behaviour_ncchanges(self):
532 """Testing sorting behaviour on a group of objects."""
533 user1_dn = "cn=test_user1,%s" % self.ou
534 group_dn = "cn=test_group,%s" % self.ou
535 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
536 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
538 self.add_linked_attribute(group_dn, user1_dn,
541 dc_guid_1 = self.ldb_dc1.get_invocation_id()
543 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
545 # Make sure the max objects count is high enough
546 req8 = self._exop_req8(dest_dsa=None,
547 invocation_id=dc_guid_1,
548 nc_dn_str=self.base_dn,
551 exop=drsuapi.DRSUAPI_EXOP_NONE)
553 # Loop until we get linked attributes, or we get to the end.
554 # Samba sends linked attributes at the end, unlike Windows.
556 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
557 if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
559 req8.highwatermark = ctr.new_highwatermark
561 self.assertTrue(ctr.linked_attributes_count != 0)
564 for link in ctr.linked_attributes:
566 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
567 link.value.blob).guid
569 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
570 link.value.blob).guid
571 no_inactive.append((link, target_guid))
573 no_inactive.sort(cmp=_linked_attribute_compare)
575 # assert the two arrays are the same
576 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)