2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # export DC1=dc1_dns_name
26 # export DC2=dc2_dns_name
27 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 from drs_base import AbstractLink
37 from ldb import SCOPE_BASE
39 from samba.dcerpc import drsuapi, misc, drsblobs
40 from samba.drs_utils import drs_DsBind
41 from samba.ndr import ndr_unpack, ndr_pack
43 def _linked_attribute_compare(la1, la2):
44 """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
48 # Ascending host object GUID
49 c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
53 # Ascending attribute ID
54 if la1.attid != la2.attid:
55 return -1 if la1.attid < la2.attid else 1
57 la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
58 la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
60 # Ascending 'is present'
61 if la1_active != la2_active:
62 return 1 if la1_active else -1
64 # Ascending target object GUID
65 return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
68 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
69 """Intended as a semi-black box test case for DsGetNCChanges
70 implementation for extended operations. It should be testing
71 how DsGetNCChanges handles different input params (mostly invalid).
72 Final goal is to make DsGetNCChanges as binary compatible to
73 Windows implementation as possible"""
76 super(DrsReplicaSyncTestCase, self).setUp()
77 self.base_dn = self.ldb_dc1.get_default_basedn()
78 self.ou = "OU=test_getncchanges,%s" % self.base_dn
81 "objectclass": "organizationalUnit"})
82 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
83 (self.default_hwm, self.default_utdv) = self._get_highest_hwm_utdv(self.ldb_dc1)
87 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
88 except ldb.LdbError as (enum, string):
89 if enum == ldb.ERR_NO_SUCH_OBJECT:
91 super(DrsReplicaSyncTestCase, self).tearDown()
93 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
94 """Returns (owner, not_owner) pair where:
95 owner: dns name for FSMO owner
96 not_owner: dns name for DC not owning the FSMO"""
97 # collect info to return later
98 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
99 "invocation_id": self.ldb_dc1.get_invocation_id(),
100 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
101 "server_dn": self.ldb_dc1.get_serverName()}
102 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
103 "invocation_id": self.ldb_dc2.get_invocation_id(),
104 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
105 "server_dn": self.ldb_dc2.get_serverName()}
107 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
108 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
109 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
111 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
112 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
113 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
115 # determine the owner dc
116 res = self.ldb_dc1.search(fsmo_obj_dn,
117 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
118 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
119 fsmo_owner = res[0]["fSMORoleOwner"][0]
120 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
121 return (fsmo_info_1, fsmo_info_2)
122 return (fsmo_info_2, fsmo_info_1)
124 def _check_exop_failed(self, ctr6, expected_failure):
125 self.assertEqual(ctr6.extended_ret, expected_failure)
126 #self.assertEqual(ctr6.object_count, 0)
127 #self.assertEqual(ctr6.first_object, None)
128 self.assertEqual(ctr6.more_data, False)
129 self.assertEqual(ctr6.nc_object_count, 0)
130 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
131 self.assertEqual(ctr6.linked_attributes_count, 0)
132 self.assertEqual(ctr6.linked_attributes, [])
133 self.assertEqual(ctr6.drs_error[0], 0)
135 def test_FSMONotOwner(self):
136 """Test role transfer with against DC not owner of the role"""
137 fsmo_dn = self.ldb_dc1.get_schema_basedn()
138 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
140 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
141 invocation_id=fsmo_not_owner["invocation_id"],
143 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
145 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
146 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
147 self.assertEqual(level, 6, "Expected level 6 response!")
148 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
149 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
150 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
152 def test_InvalidDestDSA(self):
153 """Test role transfer with invalid destination DSA guid"""
154 fsmo_dn = self.ldb_dc1.get_schema_basedn()
155 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
157 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
158 invocation_id=fsmo_owner["invocation_id"],
160 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
162 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
163 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
164 self.assertEqual(level, 6, "Expected level 6 response!")
165 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
166 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
167 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
169 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
171 super(DrsReplicaPrefixMapTestCase, self).setUp()
172 self.base_dn = self.ldb_dc1.get_default_basedn()
173 self.ou = "ou=pfm_exop,%s" % self.base_dn
176 "objectclass": "organizationalUnit"})
177 self.user = "cn=testuser,%s" % self.ou
180 "objectclass": "user"})
183 super(DrsReplicaPrefixMapTestCase, self).tearDown()
185 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
186 except ldb.LdbError as (enum, string):
187 if enum == ldb.ERR_NO_SUCH_OBJECT:
190 def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
191 partial_attribute_set = drsuapi.DsPartialAttributeSet()
192 partial_attribute_set.attids = attids
193 partial_attribute_set.num_attids = len(attids)
194 return partial_attribute_set
196 def test_missing_prefix_map_dsa(self):
197 partial_attribute_set = self.get_partial_attribute_set()
199 dc_guid_1 = self.ldb_dc1.get_invocation_id()
201 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
203 req8 = self._exop_req8(dest_dsa=None,
204 invocation_id=dc_guid_1,
206 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
207 partial_attribute_set=partial_attribute_set)
210 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
211 self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
213 self.fail("Missing prefixmap shouldn't have triggered an error")
215 def test_invalid_prefix_map_attid(self):
216 # Request for invalid attid
217 partial_attribute_set = self.get_partial_attribute_set([99999])
219 dc_guid_1 = self.ldb_dc1.get_invocation_id()
220 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
223 pfm = self._samdb_fetch_pfm_and_schi()
225 # On Windows, prefixMap isn't available over LDAP
226 req8 = self._exop_req8(dest_dsa=None,
227 invocation_id=dc_guid_1,
229 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
230 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
231 pfm = ctr.mapping_ctr
233 req8 = self._exop_req8(dest_dsa=None,
234 invocation_id=dc_guid_1,
236 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
237 partial_attribute_set=partial_attribute_set,
241 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
242 self.fail("Invalid attid (99999) should have triggered an error")
243 except RuntimeError as (ecode, emsg):
244 self.assertEqual(ecode, 0x000020E2, "Error code should have been "
245 "WERR_DS_DRA_SCHEMA_MISMATCH")
247 def test_secret_prefix_map_attid(self):
248 # Request for a secret attid
249 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
251 dc_guid_1 = self.ldb_dc1.get_invocation_id()
252 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
255 pfm = self._samdb_fetch_pfm_and_schi()
257 # On Windows, prefixMap isn't available over LDAP
258 req8 = self._exop_req8(dest_dsa=None,
259 invocation_id=dc_guid_1,
261 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
262 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
263 pfm = ctr.mapping_ctr
266 req8 = self._exop_req8(dest_dsa=None,
267 invocation_id=dc_guid_1,
269 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
270 partial_attribute_set=partial_attribute_set,
273 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
276 for attr in ctr.first_object.object.attribute_ctr.attributes:
277 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
281 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
283 for i, mapping in enumerate(pfm.mappings):
285 # objectClass: 2.5.4.0
286 if mapping.oid.binary_oid == [85, 4]:
288 # OID: 1.2.840.113556.1.4.*
289 # unicodePwd: 1.2.840.113556.1.4.90
290 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
293 (pfm.mappings[idx1].id_prefix,
294 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
295 pfm.mappings[idx1].id_prefix)
298 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
301 # 90 for unicodePwd (with new prefix = 0)
302 # 589824, 589827 for objectClass and CN
303 # Use of three ensures sorting is correct
304 partial_attribute_set = self.get_partial_attribute_set([90, 589824, 589827])
305 req8 = self._exop_req8(dest_dsa=None,
306 invocation_id=dc_guid_1,
308 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
309 partial_attribute_set=partial_attribute_set,
312 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
315 for attr in ctr.first_object.object.attribute_ctr.attributes:
316 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
320 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
322 def test_regular_prefix_map_attid(self):
323 # Request for a regular (non-secret) attid
324 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
326 dc_guid_1 = self.ldb_dc1.get_invocation_id()
327 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
330 pfm = self._samdb_fetch_pfm_and_schi()
332 # On Windows, prefixMap isn't available over LDAP
333 req8 = self._exop_req8(dest_dsa=None,
334 invocation_id=dc_guid_1,
336 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
337 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
338 pfm = ctr.mapping_ctr
341 req8 = self._exop_req8(dest_dsa=None,
342 invocation_id=dc_guid_1,
344 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
345 partial_attribute_set=partial_attribute_set,
348 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
351 for attr in ctr.first_object.object.attribute_ctr.attributes:
352 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
356 self.assertTrue(found, "Ensure we get the name attribute back")
358 for i, mapping in enumerate(pfm.mappings):
360 # objectClass: 2.5.4.0
361 if mapping.oid.binary_oid == [85, 4]:
363 # OID: 1.2.840.113556.1.4.*
364 # name: 1.2.840.113556.1.4.1
365 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
368 (pfm.mappings[idx1].id_prefix,
369 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
370 pfm.mappings[idx1].id_prefix)
373 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
376 # 1 for name (with new prefix = 0)
377 partial_attribute_set = self.get_partial_attribute_set([1])
378 req8 = self._exop_req8(dest_dsa=None,
379 invocation_id=dc_guid_1,
381 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
382 partial_attribute_set=partial_attribute_set,
385 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
388 for attr in ctr.first_object.object.attribute_ctr.attributes:
389 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
393 self.assertTrue(found, "Ensure we get the name attribute back")
395 def test_regular_prefix_map_ex_attid(self):
396 # Request for a regular (non-secret) attid
397 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
398 partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
400 dc_guid_1 = self.ldb_dc1.get_invocation_id()
401 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
404 pfm = self._samdb_fetch_pfm_and_schi()
406 # On Windows, prefixMap isn't available over LDAP
407 req8 = self._exop_req8(dest_dsa=None,
408 invocation_id=dc_guid_1,
410 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
411 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
412 pfm = ctr.mapping_ctr
415 req8 = self._exop_req8(dest_dsa=None,
416 invocation_id=dc_guid_1,
418 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
419 partial_attribute_set=partial_attribute_set,
420 partial_attribute_set_ex=partial_attribute_set_ex,
423 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
426 for attr in ctr.first_object.object.attribute_ctr.attributes:
427 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
431 self.assertTrue(found, "Ensure we get the name attribute back")
434 for attr in ctr.first_object.object.attribute_ctr.attributes:
435 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
439 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
441 for i, mapping in enumerate(pfm.mappings):
443 # objectClass: 2.5.4.0
444 if mapping.oid.binary_oid == [85, 4]:
446 # OID: 1.2.840.113556.1.4.*
447 # name: 1.2.840.113556.1.4.1
448 # unicodePwd: 1.2.840.113556.1.4.90
449 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
452 (pfm.mappings[idx1].id_prefix,
453 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
454 pfm.mappings[idx1].id_prefix)
457 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
460 # 1 for name (with new prefix = 0)
461 partial_attribute_set = self.get_partial_attribute_set([1])
462 # 90 for unicodePwd (with new prefix = 0)
463 # HOWEVER: Windows doesn't seem to respect incoming maps for PartialAttrSetEx
464 partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
465 req8 = self._exop_req8(dest_dsa=None,
466 invocation_id=dc_guid_1,
468 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
469 partial_attribute_set=partial_attribute_set,
470 partial_attribute_set_ex=partial_attribute_set_ex,
473 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
476 for attr in ctr.first_object.object.attribute_ctr.attributes:
477 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
481 self.assertTrue(found, "Ensure we get the name attribute back")
484 for attr in ctr.first_object.object.attribute_ctr.attributes:
485 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
489 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
491 def _samdb_fetch_pfm_and_schi(self):
492 """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
494 res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
495 attrs=["prefixMap", "schemaInfo"])
497 pfm = ndr_unpack(drsblobs.prefixMapBlob,
498 str(res[0]['prefixMap']))
500 schi = drsuapi.DsReplicaOIDMapping()
503 if 'schemaInfo' in res[0]:
504 schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
505 schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
507 schema_info = drsblobs.schemaInfoBlob()
508 schema_info.revision = 0
509 schema_info.marker = 0xFF
510 schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
511 schi.oid.length = len(map(ord, ndr_pack(schema_info)))
512 schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
514 pfm.ctr.mappings = pfm.ctr.mappings + [schi]
515 pfm.ctr.num_mappings += 1
518 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
520 super(DrsReplicaSyncSortTestCase, self).setUp()
521 self.base_dn = self.ldb_dc1.get_default_basedn()
522 self.ou = "ou=sort_exop,%s" % self.base_dn
525 "objectclass": "organizationalUnit"})
528 super(DrsReplicaSyncSortTestCase, self).tearDown()
529 # tidyup groups and users
531 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
532 except ldb.LdbError as (enum, string):
533 if enum == ldb.ERR_NO_SUCH_OBJECT:
536 def add_linked_attribute(self, src, dest, attr='member'):
538 m.dn = ldb.Dn(self.ldb_dc1, src)
539 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
540 self.ldb_dc1.modify(m)
542 def remove_linked_attribute(self, src, dest, attr='member'):
544 m.dn = ldb.Dn(self.ldb_dc1, src)
545 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
546 self.ldb_dc1.modify(m)
548 def test_sort_behaviour_single_object(self):
549 """Testing sorting behaviour on single objects"""
551 user1_dn = "cn=test_user1,%s" % self.ou
552 user2_dn = "cn=test_user2,%s" % self.ou
553 user3_dn = "cn=test_user3,%s" % self.ou
554 group_dn = "cn=test_group,%s" % self.ou
556 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
557 self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
558 self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
559 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
561 u1_guid = misc.GUID(self.ldb_dc1.search(base=user1_dn,
562 attrs=["objectGUID"])[0]['objectGUID'][0])
563 u2_guid = misc.GUID(self.ldb_dc1.search(base=user2_dn,
564 attrs=["objectGUID"])[0]['objectGUID'][0])
565 u3_guid = misc.GUID(self.ldb_dc1.search(base=user3_dn,
566 attrs=["objectGUID"])[0]['objectGUID'][0])
567 g_guid = misc.GUID(self.ldb_dc1.search(base=group_dn,
568 attrs=["objectGUID"])[0]['objectGUID'][0])
570 self.add_linked_attribute(group_dn, user1_dn,
572 self.add_linked_attribute(group_dn, user2_dn,
574 self.add_linked_attribute(group_dn, user3_dn,
576 self.add_linked_attribute(group_dn, user1_dn,
578 self.add_linked_attribute(group_dn, user2_dn,
579 attr='nonSecurityMember')
580 self.add_linked_attribute(group_dn, user3_dn,
581 attr='nonSecurityMember')
583 set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
584 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
587 expected_links = set([set_inactive,
588 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
589 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
592 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
593 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
596 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
597 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
600 AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
601 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
604 AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
605 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
610 dc_guid_1 = self.ldb_dc1.get_invocation_id()
612 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
614 req8 = self._exop_req8(dest_dsa=None,
615 invocation_id=dc_guid_1,
617 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
619 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
622 for link in ctr.linked_attributes:
623 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
624 link.value.blob).guid
625 no_inactive.append((link, target_guid))
626 self.assertTrue(AbstractLink(link.attid, link.flags,
627 link.identifier.guid,
628 target_guid) in expected_links)
630 no_inactive.sort(cmp=_linked_attribute_compare)
632 # assert the two arrays are the same
633 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
634 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
636 self.remove_linked_attribute(group_dn, user3_dn,
637 attr='nonSecurityMember')
639 # Set the link inactive
640 expected_links.remove(set_inactive)
641 set_inactive.flags = 0
642 expected_links.add(set_inactive)
645 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
646 for link in ctr.linked_attributes:
647 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
648 link.value.blob).guid
649 has_inactive.append((link, target_guid))
650 self.assertTrue(AbstractLink(link.attid, link.flags,
651 link.identifier.guid,
652 target_guid) in expected_links)
654 has_inactive.sort(cmp=_linked_attribute_compare)
656 # assert the two arrays are the same
657 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
658 self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
660 def test_sort_behaviour_ncchanges(self):
661 """Testing sorting behaviour on a group of objects."""
662 user1_dn = "cn=test_user1,%s" % self.ou
663 group_dn = "cn=test_group,%s" % self.ou
664 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
665 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
667 self.add_linked_attribute(group_dn, user1_dn,
670 dc_guid_1 = self.ldb_dc1.get_invocation_id()
672 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
674 # Make sure the max objects count is high enough
675 req8 = self._exop_req8(dest_dsa=None,
676 invocation_id=dc_guid_1,
677 nc_dn_str=self.base_dn,
680 exop=drsuapi.DRSUAPI_EXOP_NONE)
682 # Loop until we get linked attributes, or we get to the end.
683 # Samba sends linked attributes at the end, unlike Windows.
685 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
686 if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
688 req8.highwatermark = ctr.new_highwatermark
690 self.assertTrue(ctr.linked_attributes_count != 0)
693 for link in ctr.linked_attributes:
695 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
696 link.value.blob).guid
698 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
699 link.value.blob).guid
700 no_inactive.append((link, target_guid))
702 no_inactive.sort(cmp=_linked_attribute_compare)
704 # assert the two arrays are the same
705 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)