2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # export DC1=dc1_dns_name
26 # export DC2=dc2_dns_name
27 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
35 from ldb import SCOPE_BASE
37 from samba.dcerpc import drsuapi, misc, drsblobs
38 from samba.drs_utils import drs_DsBind
39 from samba.ndr import ndr_unpack, ndr_pack
41 def _linked_attribute_compare(la1, la2):
42 """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
46 # Ascending host object GUID
47 c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
51 # Ascending attribute ID
52 if la1.attid != la2.attid:
53 return -1 if la1.attid < la2.attid else 1
55 la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
56 la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
58 # Ascending 'is present'
59 if la1_active != la2_active:
60 return 1 if la1_active else -1
62 # Ascending target object GUID
63 return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
66 def __init__(self, attid, flags, identifier, targetGUID):
69 self.identifier = identifier
70 self.targetGUID = targetGUID
72 def __eq__(self, other):
73 return isinstance(other, AbstractLink) and \
74 ((self.attid, self.flags, self.identifier, self.targetGUID) ==
75 (other.attid, other.flags, other.identifier, other.targetGUID))
78 return hash((self.attid, self.flags, self.identifier, self.targetGUID))
81 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
82 replica_flags=0, max_objects=0, partial_attribute_set=None,
83 partial_attribute_set_ex=None, mapping_ctr=None):
84 req8 = drsuapi.DsGetNCChangesRequest8()
86 req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
87 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
88 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
89 req8.naming_context.dn = unicode(nc_dn_str)
90 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
91 req8.highwatermark.tmp_highest_usn = 0
92 req8.highwatermark.reserved_usn = 0
93 req8.highwatermark.highest_usn = 0
94 req8.uptodateness_vector = None
95 req8.replica_flags = replica_flags
96 req8.max_object_count = max_objects
97 req8.max_ndr_size = 402116
98 req8.extended_op = exop
100 req8.partial_attribute_set = partial_attribute_set
101 req8.partial_attribute_set_ex = partial_attribute_set_ex
103 req8.mapping_ctr = mapping_ctr
105 req8.mapping_ctr.num_mappings = 0
106 req8.mapping_ctr.mappings = None
110 def _ds_bind(self, server_name):
111 binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
113 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
114 (drs_handle, supported_extensions) = drs_DsBind(drs)
115 return (drs, drs_handle)
118 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
119 """Intended as a semi-black box test case for DsGetNCChanges
120 implementation for extended operations. It should be testing
121 how DsGetNCChanges handles different input params (mostly invalid).
122 Final goal is to make DsGetNCChanges as binary compatible to
123 Windows implementation as possible"""
126 super(DrsReplicaSyncTestCase, self).setUp()
129 super(DrsReplicaSyncTestCase, self).tearDown()
131 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
132 """Returns (owner, not_owner) pair where:
133 owner: dns name for FSMO owner
134 not_owner: dns name for DC not owning the FSMO"""
135 # collect info to return later
136 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
137 "invocation_id": self.ldb_dc1.get_invocation_id(),
138 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
139 "server_dn": self.ldb_dc1.get_serverName()}
140 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
141 "invocation_id": self.ldb_dc2.get_invocation_id(),
142 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
143 "server_dn": self.ldb_dc2.get_serverName()}
145 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
146 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
147 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
149 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
150 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
151 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
153 # determine the owner dc
154 res = self.ldb_dc1.search(fsmo_obj_dn,
155 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
156 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
157 fsmo_owner = res[0]["fSMORoleOwner"][0]
158 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
159 return (fsmo_info_1, fsmo_info_2)
160 return (fsmo_info_2, fsmo_info_1)
162 def _check_exop_failed(self, ctr6, expected_failure):
163 self.assertEqual(ctr6.extended_ret, expected_failure)
164 #self.assertEqual(ctr6.object_count, 0)
165 #self.assertEqual(ctr6.first_object, None)
166 self.assertEqual(ctr6.more_data, False)
167 self.assertEqual(ctr6.nc_object_count, 0)
168 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
169 self.assertEqual(ctr6.linked_attributes_count, 0)
170 self.assertEqual(ctr6.linked_attributes, [])
171 self.assertEqual(ctr6.drs_error[0], 0)
173 def test_FSMONotOwner(self):
174 """Test role transfer with against DC not owner of the role"""
175 fsmo_dn = self.ldb_dc1.get_schema_basedn()
176 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
178 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
179 invocation_id=fsmo_not_owner["invocation_id"],
181 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
183 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
184 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
185 self.assertEqual(level, 6, "Expected level 6 response!")
186 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
187 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
188 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
190 def test_InvalidDestDSA(self):
191 """Test role transfer with invalid destination DSA guid"""
192 fsmo_dn = self.ldb_dc1.get_schema_basedn()
193 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
195 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
196 invocation_id=fsmo_owner["invocation_id"],
198 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
200 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
201 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
202 self.assertEqual(level, 6, "Expected level 6 response!")
203 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
204 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
205 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
207 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
209 super(DrsReplicaPrefixMapTestCase, self).setUp()
210 self.base_dn = self.ldb_dc1.get_default_basedn()
211 self.ou = "ou=pfm_exop,%s" % self.base_dn
214 "objectclass": "organizationalUnit"})
215 self.user = "cn=testuser,%s" % self.ou
218 "objectclass": "user"})
221 super(DrsReplicaPrefixMapTestCase, self).tearDown()
223 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
224 except ldb.LdbError as (enum, string):
225 if enum == ldb.ERR_NO_SUCH_OBJECT:
228 def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
229 partial_attribute_set = drsuapi.DsPartialAttributeSet()
230 partial_attribute_set.attids = attids
231 partial_attribute_set.num_attids = len(attids)
232 return partial_attribute_set
234 def test_missing_prefix_map_dsa(self):
235 partial_attribute_set = self.get_partial_attribute_set()
237 dc_guid_1 = self.ldb_dc1.get_invocation_id()
239 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
241 req8 = self._exop_req8(dest_dsa=None,
242 invocation_id=dc_guid_1,
244 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
245 partial_attribute_set=partial_attribute_set)
248 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
249 self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
251 self.fail("Missing prefixmap shouldn't have triggered an error")
253 def test_invalid_prefix_map_attid(self):
254 # Request for invalid attid
255 partial_attribute_set = self.get_partial_attribute_set([99999])
257 dc_guid_1 = self.ldb_dc1.get_invocation_id()
258 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
261 pfm = self._samdb_fetch_pfm_and_schi()
263 # On Windows, prefixMap isn't available over LDAP
264 req8 = self._exop_req8(dest_dsa=None,
265 invocation_id=dc_guid_1,
267 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
268 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
269 pfm = ctr.mapping_ctr
271 req8 = self._exop_req8(dest_dsa=None,
272 invocation_id=dc_guid_1,
274 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
275 partial_attribute_set=partial_attribute_set,
279 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
280 self.fail("Invalid attid (99999) should have triggered an error")
281 except RuntimeError as (ecode, emsg):
282 self.assertEqual(ecode, 0x000020E2, "Error code should have been "
283 "WERR_DS_DRA_SCHEMA_MISMATCH")
285 def test_secret_prefix_map_attid(self):
286 # Request for a secret attid
287 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
289 dc_guid_1 = self.ldb_dc1.get_invocation_id()
290 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
293 pfm = self._samdb_fetch_pfm_and_schi()
295 # On Windows, prefixMap isn't available over LDAP
296 req8 = self._exop_req8(dest_dsa=None,
297 invocation_id=dc_guid_1,
299 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
300 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
301 pfm = ctr.mapping_ctr
304 req8 = self._exop_req8(dest_dsa=None,
305 invocation_id=dc_guid_1,
307 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
308 partial_attribute_set=partial_attribute_set,
311 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
314 for attr in ctr.first_object.object.attribute_ctr.attributes:
315 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
319 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
321 for i, mapping in enumerate(pfm.mappings):
323 # objectClass: 2.5.4.0
324 if mapping.oid.binary_oid == [85, 4]:
326 # OID: 1.2.840.113556.1.4.*
327 # unicodePwd: 1.2.840.113556.1.4.90
328 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
331 (pfm.mappings[idx1].id_prefix,
332 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
333 pfm.mappings[idx1].id_prefix)
336 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
339 # 90 for unicodePwd (with new prefix = 0)
340 # 589824, 589827 for objectClass and CN
341 # Use of three ensures sorting is correct
342 partial_attribute_set = self.get_partial_attribute_set([90, 589824, 589827])
343 req8 = self._exop_req8(dest_dsa=None,
344 invocation_id=dc_guid_1,
346 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
347 partial_attribute_set=partial_attribute_set,
350 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
353 for attr in ctr.first_object.object.attribute_ctr.attributes:
354 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
358 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
360 def test_regular_prefix_map_attid(self):
361 # Request for a regular (non-secret) attid
362 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
364 dc_guid_1 = self.ldb_dc1.get_invocation_id()
365 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
368 pfm = self._samdb_fetch_pfm_and_schi()
370 # On Windows, prefixMap isn't available over LDAP
371 req8 = self._exop_req8(dest_dsa=None,
372 invocation_id=dc_guid_1,
374 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
375 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
376 pfm = ctr.mapping_ctr
379 req8 = self._exop_req8(dest_dsa=None,
380 invocation_id=dc_guid_1,
382 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
383 partial_attribute_set=partial_attribute_set,
386 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
389 for attr in ctr.first_object.object.attribute_ctr.attributes:
390 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
394 self.assertTrue(found, "Ensure we get the name attribute back")
396 for i, mapping in enumerate(pfm.mappings):
398 # objectClass: 2.5.4.0
399 if mapping.oid.binary_oid == [85, 4]:
401 # OID: 1.2.840.113556.1.4.*
402 # name: 1.2.840.113556.1.4.1
403 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
406 (pfm.mappings[idx1].id_prefix,
407 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
408 pfm.mappings[idx1].id_prefix)
411 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
414 # 1 for name (with new prefix = 0)
415 partial_attribute_set = self.get_partial_attribute_set([1])
416 req8 = self._exop_req8(dest_dsa=None,
417 invocation_id=dc_guid_1,
419 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
420 partial_attribute_set=partial_attribute_set,
423 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
426 for attr in ctr.first_object.object.attribute_ctr.attributes:
427 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
431 self.assertTrue(found, "Ensure we get the name attribute back")
433 def test_regular_prefix_map_ex_attid(self):
434 # Request for a regular (non-secret) attid
435 partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
436 partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
438 dc_guid_1 = self.ldb_dc1.get_invocation_id()
439 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
442 pfm = self._samdb_fetch_pfm_and_schi()
444 # On Windows, prefixMap isn't available over LDAP
445 req8 = self._exop_req8(dest_dsa=None,
446 invocation_id=dc_guid_1,
448 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
449 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
450 pfm = ctr.mapping_ctr
453 req8 = self._exop_req8(dest_dsa=None,
454 invocation_id=dc_guid_1,
456 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
457 partial_attribute_set=partial_attribute_set,
458 partial_attribute_set_ex=partial_attribute_set_ex,
461 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
464 for attr in ctr.first_object.object.attribute_ctr.attributes:
465 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
469 self.assertTrue(found, "Ensure we get the name attribute back")
472 for attr in ctr.first_object.object.attribute_ctr.attributes:
473 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
477 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
479 for i, mapping in enumerate(pfm.mappings):
481 # objectClass: 2.5.4.0
482 if mapping.oid.binary_oid == [85, 4]:
484 # OID: 1.2.840.113556.1.4.*
485 # name: 1.2.840.113556.1.4.1
486 # unicodePwd: 1.2.840.113556.1.4.90
487 elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
490 (pfm.mappings[idx1].id_prefix,
491 pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
492 pfm.mappings[idx1].id_prefix)
495 tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
498 # 1 for name (with new prefix = 0)
499 partial_attribute_set = self.get_partial_attribute_set([1])
500 # 90 for unicodePwd (with new prefix = 0)
501 # HOWEVER: Windows doesn't seem to respect incoming maps for PartialAttrSetEx
502 partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
503 req8 = self._exop_req8(dest_dsa=None,
504 invocation_id=dc_guid_1,
506 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
507 partial_attribute_set=partial_attribute_set,
508 partial_attribute_set_ex=partial_attribute_set_ex,
511 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
514 for attr in ctr.first_object.object.attribute_ctr.attributes:
515 if attr.attid == drsuapi.DRSUAPI_ATTID_name:
519 self.assertTrue(found, "Ensure we get the name attribute back")
522 for attr in ctr.first_object.object.attribute_ctr.attributes:
523 if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
527 self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
529 def _samdb_fetch_pfm_and_schi(self):
530 """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
532 res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
533 attrs=["prefixMap", "schemaInfo"])
535 pfm = ndr_unpack(drsblobs.prefixMapBlob,
536 str(res[0]['prefixMap']))
538 schi = drsuapi.DsReplicaOIDMapping()
541 if 'schemaInfo' in res[0]:
542 schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
543 schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
545 schema_info = drsblobs.schemaInfoBlob()
546 schema_info.revision = 0
547 schema_info.marker = 0xFF
548 schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
549 schi.oid.length = len(map(ord, ndr_pack(schema_info)))
550 schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
552 pfm.ctr.mappings = pfm.ctr.mappings + [schi]
553 pfm.ctr.num_mappings += 1
556 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
558 super(DrsReplicaSyncSortTestCase, self).setUp()
559 self.base_dn = self.ldb_dc1.get_default_basedn()
560 self.ou = "ou=sort_exop,%s" % self.base_dn
563 "objectclass": "organizationalUnit"})
566 super(DrsReplicaSyncSortTestCase, self).tearDown()
567 # tidyup groups and users
569 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
570 except ldb.LdbError as (enum, string):
571 if enum == ldb.ERR_NO_SUCH_OBJECT:
574 def add_linked_attribute(self, src, dest, attr='member'):
576 m.dn = ldb.Dn(self.ldb_dc1, src)
577 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
578 self.ldb_dc1.modify(m)
580 def remove_linked_attribute(self, src, dest, attr='member'):
582 m.dn = ldb.Dn(self.ldb_dc1, src)
583 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
584 self.ldb_dc1.modify(m)
586 def test_sort_behaviour_single_object(self):
587 """Testing sorting behaviour on single objects"""
589 user1_dn = "cn=test_user1,%s" % self.ou
590 user2_dn = "cn=test_user2,%s" % self.ou
591 user3_dn = "cn=test_user3,%s" % self.ou
592 group_dn = "cn=test_group,%s" % self.ou
594 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
595 self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
596 self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
597 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
599 u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
600 attrs=["objectGUID"])[0]['objectGUID'][0]))
601 u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
602 attrs=["objectGUID"])[0]['objectGUID'][0]))
603 u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
604 attrs=["objectGUID"])[0]['objectGUID'][0]))
605 g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
606 attrs=["objectGUID"])[0]['objectGUID'][0]))
608 self.add_linked_attribute(group_dn, user1_dn,
610 self.add_linked_attribute(group_dn, user2_dn,
612 self.add_linked_attribute(group_dn, user3_dn,
614 self.add_linked_attribute(group_dn, user1_dn,
616 self.add_linked_attribute(group_dn, user2_dn,
617 attr='nonSecurityMember')
618 self.add_linked_attribute(group_dn, user3_dn,
619 attr='nonSecurityMember')
621 set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
622 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
625 expected_links = set([set_inactive,
626 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
627 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
630 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
631 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
634 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
635 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
638 AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
639 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
642 AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
643 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
648 dc_guid_1 = self.ldb_dc1.get_invocation_id()
650 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
652 req8 = self._exop_req8(dest_dsa=None,
653 invocation_id=dc_guid_1,
655 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
657 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
660 for link in ctr.linked_attributes:
661 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
662 link.value.blob).guid
663 no_inactive.append((link, target_guid))
664 self.assertTrue(AbstractLink(link.attid, link.flags,
665 str(link.identifier.guid),
666 str(target_guid)) in expected_links)
668 no_inactive.sort(cmp=_linked_attribute_compare)
670 # assert the two arrays are the same
671 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
672 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
674 self.remove_linked_attribute(group_dn, user3_dn,
675 attr='nonSecurityMember')
677 # Set the link inactive
678 expected_links.remove(set_inactive)
679 set_inactive.flags = 0
680 expected_links.add(set_inactive)
683 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
684 for link in ctr.linked_attributes:
685 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
686 link.value.blob).guid
687 has_inactive.append((link, target_guid))
688 self.assertTrue(AbstractLink(link.attid, link.flags,
689 str(link.identifier.guid),
690 str(target_guid)) in expected_links)
692 has_inactive.sort(cmp=_linked_attribute_compare)
694 # assert the two arrays are the same
695 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
696 self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
698 def test_sort_behaviour_ncchanges(self):
699 """Testing sorting behaviour on a group of objects."""
700 user1_dn = "cn=test_user1,%s" % self.ou
701 group_dn = "cn=test_group,%s" % self.ou
702 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
703 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
705 self.add_linked_attribute(group_dn, user1_dn,
708 dc_guid_1 = self.ldb_dc1.get_invocation_id()
710 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
712 # Make sure the max objects count is high enough
713 req8 = self._exop_req8(dest_dsa=None,
714 invocation_id=dc_guid_1,
715 nc_dn_str=self.base_dn,
718 exop=drsuapi.DRSUAPI_EXOP_NONE)
720 # Loop until we get linked attributes, or we get to the end.
721 # Samba sends linked attributes at the end, unlike Windows.
723 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
724 if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
726 req8.highwatermark = ctr.new_highwatermark
728 self.assertTrue(ctr.linked_attributes_count != 0)
731 for link in ctr.linked_attributes:
733 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
734 link.value.blob).guid
736 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
737 link.value.blob).guid
738 no_inactive.append((link, target_guid))
740 no_inactive.sort(cmp=_linked_attribute_compare)
742 # assert the two arrays are the same
743 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)