tests/getnc_exop: Ensure that all attids are valid in a given PAS
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 #
24 # Usage:
25 #  export DC1=dc1_dns_name
26 #  export DC2=dc2_dns_name
27 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
29 #
30
31 import drs_base
32 import samba.tests
33
34 import ldb
35 from ldb import SCOPE_BASE
36
37 from samba.dcerpc import drsuapi, misc, drsblobs
38 from samba.drs_utils import drs_DsBind
39 from samba.ndr import ndr_unpack, ndr_pack
40
41 def _linked_attribute_compare(la1, la2):
42     """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
43     la1, la1_target = la1
44     la2, la2_target = la2
45
46     # Ascending host object GUID
47     c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
48     if c != 0:
49         return c
50
51     # Ascending attribute ID
52     if la1.attid != la2.attid:
53         return -1 if la1.attid < la2.attid else 1
54
55     la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
56     la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
57
58     # Ascending 'is present'
59     if la1_active != la2_active:
60         return 1 if la1_active else -1
61
62     # Ascending target object GUID
63     return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
64
65 class AbstractLink:
66     def __init__(self, attid, flags, identifier, targetGUID):
67         self.attid = attid
68         self.flags = flags
69         self.identifier = identifier
70         self.targetGUID = targetGUID
71
72     def __eq__(self, other):
73         return isinstance(other, AbstractLink) and \
74             ((self.attid, self.flags, self.identifier, self.targetGUID) ==
75              (other.attid, other.flags, other.identifier, other.targetGUID))
76
77     def __hash__(self):
78         return hash((self.attid, self.flags, self.identifier, self.targetGUID))
79
80 class ExopBaseTest:
81     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
82                    replica_flags=0, max_objects=0, partial_attribute_set=None,
83                    mapping_ctr=None):
84         req8 = drsuapi.DsGetNCChangesRequest8()
85
86         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
87         req8.source_dsa_invocation_id = misc.GUID(invocation_id)
88         req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
89         req8.naming_context.dn = unicode(nc_dn_str)
90         req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
91         req8.highwatermark.tmp_highest_usn = 0
92         req8.highwatermark.reserved_usn = 0
93         req8.highwatermark.highest_usn = 0
94         req8.uptodateness_vector = None
95         req8.replica_flags = replica_flags
96         req8.max_object_count = max_objects
97         req8.max_ndr_size = 402116
98         req8.extended_op = exop
99         req8.fsmo_info = 0
100         req8.partial_attribute_set = partial_attribute_set
101         req8.partial_attribute_set_ex = None
102         if mapping_ctr:
103             req8.mapping_ctr = mapping_ctr
104         else:
105             req8.mapping_ctr.num_mappings = 0
106             req8.mapping_ctr.mappings = None
107
108         return req8
109
110     def _ds_bind(self, server_name):
111         binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
112
113         drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
114         (drs_handle, supported_extensions) = drs_DsBind(drs)
115         return (drs, drs_handle)
116
117
118 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
119     """Intended as a semi-black box test case for DsGetNCChanges
120        implementation for extended operations. It should be testing
121        how DsGetNCChanges handles different input params (mostly invalid).
122        Final goal is to make DsGetNCChanges as binary compatible to
123        Windows implementation as possible"""
124
125     def setUp(self):
126         super(DrsReplicaSyncTestCase, self).setUp()
127
128     def tearDown(self):
129         super(DrsReplicaSyncTestCase, self).tearDown()
130
131     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
132         """Returns (owner, not_owner) pair where:
133              owner: dns name for FSMO owner
134              not_owner: dns name for DC not owning the FSMO"""
135         # collect info to return later
136         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
137                        "invocation_id": self.ldb_dc1.get_invocation_id(),
138                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
139                        "server_dn": self.ldb_dc1.get_serverName()}
140         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
141                        "invocation_id": self.ldb_dc2.get_invocation_id(),
142                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
143                        "server_dn": self.ldb_dc2.get_serverName()}
144
145         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
146         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
147         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
148
149         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
150         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
151         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
152
153         # determine the owner dc
154         res = self.ldb_dc1.search(fsmo_obj_dn,
155                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
156         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
157         fsmo_owner = res[0]["fSMORoleOwner"][0]
158         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
159             return (fsmo_info_1, fsmo_info_2)
160         return (fsmo_info_2, fsmo_info_1)
161
162     def _check_exop_failed(self, ctr6, expected_failure):
163         self.assertEqual(ctr6.extended_ret, expected_failure)
164         #self.assertEqual(ctr6.object_count, 0)
165         #self.assertEqual(ctr6.first_object, None)
166         self.assertEqual(ctr6.more_data, False)
167         self.assertEqual(ctr6.nc_object_count, 0)
168         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
169         self.assertEqual(ctr6.linked_attributes_count, 0)
170         self.assertEqual(ctr6.linked_attributes, [])
171         self.assertEqual(ctr6.drs_error[0], 0)
172
173     def test_FSMONotOwner(self):
174         """Test role transfer with against DC not owner of the role"""
175         fsmo_dn = self.ldb_dc1.get_schema_basedn()
176         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
177
178         req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
179                                invocation_id=fsmo_not_owner["invocation_id"],
180                                nc_dn_str=fsmo_dn,
181                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
182
183         (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
184         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
185         self.assertEqual(level, 6, "Expected level 6 response!")
186         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
187         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
188         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
189
190     def test_InvalidDestDSA(self):
191         """Test role transfer with invalid destination DSA guid"""
192         fsmo_dn = self.ldb_dc1.get_schema_basedn()
193         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
194
195         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
196                                invocation_id=fsmo_owner["invocation_id"],
197                                nc_dn_str=fsmo_dn,
198                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
199
200         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
201         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
202         self.assertEqual(level, 6, "Expected level 6 response!")
203         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
204         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
205         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
206
207     def test_InvalidDestDSA_ridalloc(self):
208         """Test RID allocation with invalid destination DSA guid"""
209         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
210         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
211
212         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
213                                invocation_id=fsmo_owner["invocation_id"],
214                                nc_dn_str=fsmo_dn,
215                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
216
217         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
218         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
219         self.assertEqual(level, 6, "Expected level 6 response!")
220         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
221         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
222         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
223
224     def test_do_ridalloc(self):
225         """Test doing a RID allocation with a valid destination DSA guid"""
226         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
227         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
228
229         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
230                                invocation_id=fsmo_owner["invocation_id"],
231                                nc_dn_str=fsmo_dn,
232                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
233
234         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
235         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
236         self.assertEqual(level, 6, "Expected level 6 response!")
237         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
238         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
239         ctr6 = ctr
240         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
241         self.assertEqual(ctr6.object_count, 3)
242         self.assertNotEqual(ctr6.first_object, None)
243         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
244         self.assertNotEqual(ctr6.first_object.next_object, None)
245         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
246         second_object = ctr6.first_object.next_object.object
247         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
248         third_object = ctr6.first_object.next_object.next_object.object
249         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
250
251         self.assertEqual(ctr6.more_data, False)
252         self.assertEqual(ctr6.nc_object_count, 0)
253         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
254         self.assertEqual(ctr6.drs_error[0], 0)
255         # We don't check the linked_attributes_count as if the domain
256         # has an RODC, it can gain links on the server account object
257
258     def test_do_ridalloc_get_anc(self):
259         """Test doing a RID allocation with a valid destination DSA guid and """
260         fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
261         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
262
263         req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
264                                invocation_id=fsmo_owner["invocation_id"],
265                                nc_dn_str=fsmo_dn,
266                                exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
267                                replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
268
269         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
270         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
271         self.assertEqual(level, 6, "Expected level 6 response!")
272         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
273         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
274         ctr6 = ctr
275         self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
276         self.assertEqual(ctr6.object_count, 3)
277         self.assertNotEqual(ctr6.first_object, None)
278         self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
279         self.assertNotEqual(ctr6.first_object.next_object, None)
280         self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
281         second_object = ctr6.first_object.next_object.object
282         self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
283         third_object = ctr6.first_object.next_object.next_object.object
284         self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
285         self.assertEqual(ctr6.more_data, False)
286         self.assertEqual(ctr6.nc_object_count, 0)
287         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
288         self.assertEqual(ctr6.drs_error[0], 0)
289         # We don't check the linked_attributes_count as if the domain
290         # has an RODC, it can gain links on the server account object
291
292 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
293     def setUp(self):
294         super(DrsReplicaPrefixMapTestCase, self).setUp()
295         self.base_dn = self.ldb_dc1.get_default_basedn()
296         self.ou = "ou=pfm_exop,%s" % self.base_dn
297         self.ldb_dc1.add({
298             "dn": self.ou,
299             "objectclass": "organizationalUnit"})
300         self.user = "cn=testuser,%s" % self.ou
301         self.ldb_dc1.add({
302             "dn": self.user,
303             "objectclass": "user"})
304
305     def tearDown(self):
306         super(DrsReplicaPrefixMapTestCase, self).tearDown()
307         try:
308             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
309         except ldb.LdbError as (enum, string):
310             if enum == ldb.ERR_NO_SUCH_OBJECT:
311                 pass
312
313     def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
314         partial_attribute_set = drsuapi.DsPartialAttributeSet()
315         partial_attribute_set.attids = attids
316         partial_attribute_set.num_attids = len(attids)
317         return partial_attribute_set
318
319     def test_missing_prefix_map_dsa(self):
320         partial_attribute_set = self.get_partial_attribute_set()
321
322         dc_guid_1 = self.ldb_dc1.get_invocation_id()
323
324         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
325
326         req8 = self._exop_req8(dest_dsa=None,
327                                invocation_id=dc_guid_1,
328                                nc_dn_str=self.user,
329                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
330                                partial_attribute_set=partial_attribute_set)
331
332         try:
333             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
334             self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
335         except Exception:
336             self.fail("Missing prefixmap shouldn't have triggered an error")
337
338     def test_invalid_prefix_map_attid(self):
339         # Request for invalid attid
340         partial_attribute_set = self.get_partial_attribute_set([99999])
341
342         pfm = self._samdb_fetch_pfm_and_schi()
343
344         dc_guid_1 = self.ldb_dc1.get_invocation_id()
345
346         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
347
348         req8 = self._exop_req8(dest_dsa=None,
349                                invocation_id=dc_guid_1,
350                                nc_dn_str=self.user,
351                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
352                                partial_attribute_set=partial_attribute_set,
353                                mapping_ctr=pfm)
354
355         try:
356             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
357             self.fail("Invalid attid (99999) should have triggered an error")
358         except Exception as (ecode, emsg):
359             self.assertEqual(ecode, 0x000020E2, "Error code should have been "
360                              "WERR_DS_DRA_SCHEMA_MISMATCH")
361
362     def _samdb_fetch_pfm_and_schi(self):
363         """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
364         samdb = self.ldb_dc1
365         res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
366                            attrs=["prefixMap", "schemaInfo"])
367
368         pfm = ndr_unpack(drsblobs.prefixMapBlob,
369                          str(res[0]['prefixMap']))
370
371         schi = drsuapi.DsReplicaOIDMapping()
372         schi.id_prefix = 0
373
374         if 'schemaInfo' in res[0]:
375             schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
376             schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
377         else:
378             schema_info = drsblobs.schemaInfoBlob()
379             schema_info.revision = 0
380             schema_info.marker = 0xFF
381             schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
382             schi.oid.length = len(map(ord, ndr_pack(schema_info)))
383             schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
384
385         pfm.ctr.mappings = pfm.ctr.mappings + [schi]
386         pfm.ctr.num_mappings += 1
387         return pfm.ctr
388
389 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
390     def setUp(self):
391         super(DrsReplicaSyncSortTestCase, self).setUp()
392         self.base_dn = self.ldb_dc1.get_default_basedn()
393         self.ou = "ou=sort_exop,%s" % self.base_dn
394         self.ldb_dc1.add({
395             "dn": self.ou,
396             "objectclass": "organizationalUnit"})
397
398     def tearDown(self):
399         super(DrsReplicaSyncSortTestCase, self).tearDown()
400         # tidyup groups and users
401         try:
402             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
403         except ldb.LdbError as (enum, string):
404             if enum == ldb.ERR_NO_SUCH_OBJECT:
405                 pass
406
407     def add_linked_attribute(self, src, dest, attr='member'):
408         m = ldb.Message()
409         m.dn = ldb.Dn(self.ldb_dc1, src)
410         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
411         self.ldb_dc1.modify(m)
412
413     def remove_linked_attribute(self, src, dest, attr='member'):
414         m = ldb.Message()
415         m.dn = ldb.Dn(self.ldb_dc1, src)
416         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
417         self.ldb_dc1.modify(m)
418
419     def test_sort_behaviour_single_object(self):
420         """Testing sorting behaviour on single objects"""
421
422         user1_dn = "cn=test_user1,%s" % self.ou
423         user2_dn = "cn=test_user2,%s" % self.ou
424         user3_dn = "cn=test_user3,%s" % self.ou
425         group_dn = "cn=test_group,%s" % self.ou
426
427         self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
428         self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
429         self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
430         self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
431
432         u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
433                       attrs=["objectGUID"])[0]['objectGUID'][0]))
434         u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
435                       attrs=["objectGUID"])[0]['objectGUID'][0]))
436         u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
437                       attrs=["objectGUID"])[0]['objectGUID'][0]))
438         g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
439                      attrs=["objectGUID"])[0]['objectGUID'][0]))
440
441         self.add_linked_attribute(group_dn, user1_dn,
442                                   attr='member')
443         self.add_linked_attribute(group_dn, user2_dn,
444                                   attr='member')
445         self.add_linked_attribute(group_dn, user3_dn,
446                                   attr='member')
447         self.add_linked_attribute(group_dn, user1_dn,
448                                   attr='managedby')
449         self.add_linked_attribute(group_dn, user2_dn,
450                                   attr='nonSecurityMember')
451         self.add_linked_attribute(group_dn, user3_dn,
452                                   attr='nonSecurityMember')
453
454         set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
455                                     drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
456                                     g_guid, u3_guid)
457
458         expected_links = set([set_inactive,
459         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
460                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
461                      g_guid,
462                      u1_guid),
463         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
464                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
465                      g_guid,
466                      u2_guid),
467         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
468                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
469                      g_guid,
470                      u3_guid),
471         AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
472                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
473                      g_guid,
474                      u1_guid),
475         AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
476                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
477                      g_guid,
478                      u2_guid),
479         ])
480
481         dc_guid_1 = self.ldb_dc1.get_invocation_id()
482
483         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
484
485         req8 = self._exop_req8(dest_dsa=None,
486                 invocation_id=dc_guid_1,
487                 nc_dn_str=group_dn,
488                 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
489
490         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
491
492         no_inactive = []
493         for link in ctr.linked_attributes:
494             target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
495                                      link.value.blob).guid
496             no_inactive.append((link, target_guid))
497             self.assertTrue(AbstractLink(link.attid, link.flags,
498                                          str(link.identifier.guid),
499                                          str(target_guid)) in expected_links)
500
501         no_inactive.sort(cmp=_linked_attribute_compare)
502
503         # assert the two arrays are the same
504         self.assertEqual(len(expected_links), ctr.linked_attributes_count)
505         self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
506
507         self.remove_linked_attribute(group_dn, user3_dn,
508                                      attr='nonSecurityMember')
509
510         # Set the link inactive
511         expected_links.remove(set_inactive)
512         set_inactive.flags = 0
513         expected_links.add(set_inactive)
514
515         has_inactive = []
516         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
517         for link in ctr.linked_attributes:
518             target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
519                                      link.value.blob).guid
520             has_inactive.append((link, target_guid))
521             self.assertTrue(AbstractLink(link.attid, link.flags,
522                                          str(link.identifier.guid),
523                                          str(target_guid)) in expected_links)
524
525         has_inactive.sort(cmp=_linked_attribute_compare)
526
527         # assert the two arrays are the same
528         self.assertEqual(len(expected_links), ctr.linked_attributes_count)
529         self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
530
531     def test_sort_behaviour_ncchanges(self):
532         """Testing sorting behaviour on a group of objects."""
533         user1_dn = "cn=test_user1,%s" % self.ou
534         group_dn = "cn=test_group,%s" % self.ou
535         self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
536         self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
537
538         self.add_linked_attribute(group_dn, user1_dn,
539                                   attr='member')
540
541         dc_guid_1 = self.ldb_dc1.get_invocation_id()
542
543         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
544
545         # Make sure the max objects count is high enough
546         req8 = self._exop_req8(dest_dsa=None,
547                                invocation_id=dc_guid_1,
548                                nc_dn_str=self.base_dn,
549                                replica_flags=0,
550                                max_objects=100,
551                                exop=drsuapi.DRSUAPI_EXOP_NONE)
552
553         # Loop until we get linked attributes, or we get to the end.
554         # Samba sends linked attributes at the end, unlike Windows.
555         while True:
556             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
557             if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
558                 break
559             req8.highwatermark = ctr.new_highwatermark
560
561         self.assertTrue(ctr.linked_attributes_count != 0)
562
563         no_inactive = []
564         for link in ctr.linked_attributes:
565             try:
566                 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
567                                      link.value.blob).guid
568             except:
569                 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
570                                          link.value.blob).guid
571             no_inactive.append((link, target_guid))
572
573         no_inactive.sort(cmp=_linked_attribute_compare)
574
575         # assert the two arrays are the same
576         self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)