torture/drs: move ExopBaseTest into DrsBaseTest and extend
[bbaumbach/samba-autobuild/.git] / source4 / torture / drs / python / getnc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 #
24 # Usage:
25 #  export DC1=dc1_dns_name
26 #  export DC2=dc2_dns_name
27 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
29 #
30
31 import drs_base
32 from drs_base import AbstractLink
33
34 import samba.tests
35
36 import ldb
37 from ldb import SCOPE_BASE
38
39 from samba.dcerpc import drsuapi, misc, drsblobs
40 from samba.drs_utils import drs_DsBind
41 from samba.ndr import ndr_unpack, ndr_pack
42
43 def _linked_attribute_compare(la1, la2):
44     """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
45     la1, la1_target = la1
46     la2, la2_target = la2
47
48     # Ascending host object GUID
49     c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
50     if c != 0:
51         return c
52
53     # Ascending attribute ID
54     if la1.attid != la2.attid:
55         return -1 if la1.attid < la2.attid else 1
56
57     la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
58     la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
59
60     # Ascending 'is present'
61     if la1_active != la2_active:
62         return 1 if la1_active else -1
63
64     # Ascending target object GUID
65     return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
66
67
68 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
69     """Intended as a semi-black box test case for DsGetNCChanges
70        implementation for extended operations. It should be testing
71        how DsGetNCChanges handles different input params (mostly invalid).
72        Final goal is to make DsGetNCChanges as binary compatible to
73        Windows implementation as possible"""
74
75     def setUp(self):
76         super(DrsReplicaSyncTestCase, self).setUp()
77         self.base_dn = self.ldb_dc1.get_default_basedn()
78         self.ou = "OU=test_getncchanges,%s" % self.base_dn
79         self.ldb_dc1.add({
80             "dn": self.ou,
81             "objectclass": "organizationalUnit"})
82         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
83         (self.default_hwm, self.default_utdv) = self._get_highest_hwm_utdv(self.ldb_dc1)
84
85     def tearDown(self):
86         try:
87             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
88         except ldb.LdbError as (enum, string):
89             if enum == ldb.ERR_NO_SUCH_OBJECT:
90                 pass
91         super(DrsReplicaSyncTestCase, self).tearDown()
92
93     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
94         """Returns (owner, not_owner) pair where:
95              owner: dns name for FSMO owner
96              not_owner: dns name for DC not owning the FSMO"""
97         # collect info to return later
98         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
99                        "invocation_id": self.ldb_dc1.get_invocation_id(),
100                        "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
101                        "server_dn": self.ldb_dc1.get_serverName()}
102         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
103                        "invocation_id": self.ldb_dc2.get_invocation_id(),
104                        "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
105                        "server_dn": self.ldb_dc2.get_serverName()}
106
107         msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
108         fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
109         fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
110
111         msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
112         fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
113         fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
114
115         # determine the owner dc
116         res = self.ldb_dc1.search(fsmo_obj_dn,
117                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
118         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
119         fsmo_owner = res[0]["fSMORoleOwner"][0]
120         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
121             return (fsmo_info_1, fsmo_info_2)
122         return (fsmo_info_2, fsmo_info_1)
123
124     def _check_exop_failed(self, ctr6, expected_failure):
125         self.assertEqual(ctr6.extended_ret, expected_failure)
126         #self.assertEqual(ctr6.object_count, 0)
127         #self.assertEqual(ctr6.first_object, None)
128         self.assertEqual(ctr6.more_data, False)
129         self.assertEqual(ctr6.nc_object_count, 0)
130         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
131         self.assertEqual(ctr6.linked_attributes_count, 0)
132         self.assertEqual(ctr6.linked_attributes, [])
133         self.assertEqual(ctr6.drs_error[0], 0)
134
135     def test_FSMONotOwner(self):
136         """Test role transfer with against DC not owner of the role"""
137         fsmo_dn = self.ldb_dc1.get_schema_basedn()
138         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
139
140         req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
141                                invocation_id=fsmo_not_owner["invocation_id"],
142                                nc_dn_str=fsmo_dn,
143                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
144
145         (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
146         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
147         self.assertEqual(level, 6, "Expected level 6 response!")
148         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
149         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
150         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
151
152     def test_InvalidDestDSA(self):
153         """Test role transfer with invalid destination DSA guid"""
154         fsmo_dn = self.ldb_dc1.get_schema_basedn()
155         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
156
157         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
158                                invocation_id=fsmo_owner["invocation_id"],
159                                nc_dn_str=fsmo_dn,
160                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
161
162         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
163         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
164         self.assertEqual(level, 6, "Expected level 6 response!")
165         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
166         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
167         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
168
169 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
170     def setUp(self):
171         super(DrsReplicaPrefixMapTestCase, self).setUp()
172         self.base_dn = self.ldb_dc1.get_default_basedn()
173         self.ou = "ou=pfm_exop,%s" % self.base_dn
174         self.ldb_dc1.add({
175             "dn": self.ou,
176             "objectclass": "organizationalUnit"})
177         self.user = "cn=testuser,%s" % self.ou
178         self.ldb_dc1.add({
179             "dn": self.user,
180             "objectclass": "user"})
181
182     def tearDown(self):
183         super(DrsReplicaPrefixMapTestCase, self).tearDown()
184         try:
185             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
186         except ldb.LdbError as (enum, string):
187             if enum == ldb.ERR_NO_SUCH_OBJECT:
188                 pass
189
190     def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
191         partial_attribute_set = drsuapi.DsPartialAttributeSet()
192         partial_attribute_set.attids = attids
193         partial_attribute_set.num_attids = len(attids)
194         return partial_attribute_set
195
196     def test_missing_prefix_map_dsa(self):
197         partial_attribute_set = self.get_partial_attribute_set()
198
199         dc_guid_1 = self.ldb_dc1.get_invocation_id()
200
201         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
202
203         req8 = self._exop_req8(dest_dsa=None,
204                                invocation_id=dc_guid_1,
205                                nc_dn_str=self.user,
206                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
207                                partial_attribute_set=partial_attribute_set)
208
209         try:
210             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
211             self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
212         except RuntimeError:
213             self.fail("Missing prefixmap shouldn't have triggered an error")
214
215     def test_invalid_prefix_map_attid(self):
216         # Request for invalid attid
217         partial_attribute_set = self.get_partial_attribute_set([99999])
218
219         dc_guid_1 = self.ldb_dc1.get_invocation_id()
220         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
221
222         try:
223             pfm = self._samdb_fetch_pfm_and_schi()
224         except KeyError:
225             # On Windows, prefixMap isn't available over LDAP
226             req8 = self._exop_req8(dest_dsa=None,
227                                    invocation_id=dc_guid_1,
228                                    nc_dn_str=self.user,
229                                    exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
230             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
231             pfm = ctr.mapping_ctr
232
233         req8 = self._exop_req8(dest_dsa=None,
234                                invocation_id=dc_guid_1,
235                                nc_dn_str=self.user,
236                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
237                                partial_attribute_set=partial_attribute_set,
238                                mapping_ctr=pfm)
239
240         try:
241             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
242             self.fail("Invalid attid (99999) should have triggered an error")
243         except RuntimeError as (ecode, emsg):
244             self.assertEqual(ecode, 0x000020E2, "Error code should have been "
245                              "WERR_DS_DRA_SCHEMA_MISMATCH")
246
247     def test_secret_prefix_map_attid(self):
248         # Request for a secret attid
249         partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
250
251         dc_guid_1 = self.ldb_dc1.get_invocation_id()
252         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
253
254         try:
255             pfm = self._samdb_fetch_pfm_and_schi()
256         except KeyError:
257             # On Windows, prefixMap isn't available over LDAP
258             req8 = self._exop_req8(dest_dsa=None,
259                                    invocation_id=dc_guid_1,
260                                    nc_dn_str=self.user,
261                                    exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
262             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
263             pfm = ctr.mapping_ctr
264
265
266         req8 = self._exop_req8(dest_dsa=None,
267                                invocation_id=dc_guid_1,
268                                nc_dn_str=self.user,
269                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
270                                partial_attribute_set=partial_attribute_set,
271                                mapping_ctr=pfm)
272
273         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
274
275         found = False
276         for attr in ctr.first_object.object.attribute_ctr.attributes:
277             if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
278                 found = True
279                 break
280
281         self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
282
283         for i, mapping in enumerate(pfm.mappings):
284             # OID: 2.5.4.*
285             # objectClass: 2.5.4.0
286             if mapping.oid.binary_oid == [85, 4]:
287                 idx1 = i
288             # OID: 1.2.840.113556.1.4.*
289             # unicodePwd: 1.2.840.113556.1.4.90
290             elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
291                 idx2 = i
292
293         (pfm.mappings[idx1].id_prefix,
294          pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
295                                           pfm.mappings[idx1].id_prefix)
296
297         tmp = pfm.mappings
298         tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
299         pfm.mappings = tmp
300
301         # 90 for unicodePwd (with new prefix = 0)
302         # 589824, 589827 for objectClass and CN
303         # Use of three ensures sorting is correct
304         partial_attribute_set = self.get_partial_attribute_set([90, 589824, 589827])
305         req8 = self._exop_req8(dest_dsa=None,
306                                invocation_id=dc_guid_1,
307                                nc_dn_str=self.user,
308                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
309                                partial_attribute_set=partial_attribute_set,
310                                mapping_ctr=pfm)
311
312         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
313
314         found = False
315         for attr in ctr.first_object.object.attribute_ctr.attributes:
316             if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
317                 found = True
318                 break
319
320         self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
321
322     def test_regular_prefix_map_attid(self):
323         # Request for a regular (non-secret) attid
324         partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
325
326         dc_guid_1 = self.ldb_dc1.get_invocation_id()
327         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
328
329         try:
330             pfm = self._samdb_fetch_pfm_and_schi()
331         except KeyError:
332             # On Windows, prefixMap isn't available over LDAP
333             req8 = self._exop_req8(dest_dsa=None,
334                                    invocation_id=dc_guid_1,
335                                    nc_dn_str=self.user,
336                                    exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
337             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
338             pfm = ctr.mapping_ctr
339
340
341         req8 = self._exop_req8(dest_dsa=None,
342                                invocation_id=dc_guid_1,
343                                nc_dn_str=self.user,
344                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
345                                partial_attribute_set=partial_attribute_set,
346                                mapping_ctr=pfm)
347
348         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
349
350         found = False
351         for attr in ctr.first_object.object.attribute_ctr.attributes:
352             if attr.attid == drsuapi.DRSUAPI_ATTID_name:
353                 found = True
354                 break
355
356         self.assertTrue(found, "Ensure we get the name attribute back")
357
358         for i, mapping in enumerate(pfm.mappings):
359             # OID: 2.5.4.*
360             # objectClass: 2.5.4.0
361             if mapping.oid.binary_oid == [85, 4]:
362                 idx1 = i
363             # OID: 1.2.840.113556.1.4.*
364             # name: 1.2.840.113556.1.4.1
365             elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
366                 idx2 = i
367
368         (pfm.mappings[idx1].id_prefix,
369          pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
370                                           pfm.mappings[idx1].id_prefix)
371
372         tmp = pfm.mappings
373         tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
374         pfm.mappings = tmp
375
376         # 1 for name (with new prefix = 0)
377         partial_attribute_set = self.get_partial_attribute_set([1])
378         req8 = self._exop_req8(dest_dsa=None,
379                                invocation_id=dc_guid_1,
380                                nc_dn_str=self.user,
381                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
382                                partial_attribute_set=partial_attribute_set,
383                                mapping_ctr=pfm)
384
385         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
386
387         found = False
388         for attr in ctr.first_object.object.attribute_ctr.attributes:
389             if attr.attid == drsuapi.DRSUAPI_ATTID_name:
390                 found = True
391                 break
392
393         self.assertTrue(found, "Ensure we get the name attribute back")
394
395     def test_regular_prefix_map_ex_attid(self):
396         # Request for a regular (non-secret) attid
397         partial_attribute_set = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_name])
398         partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
399
400         dc_guid_1 = self.ldb_dc1.get_invocation_id()
401         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
402
403         try:
404             pfm = self._samdb_fetch_pfm_and_schi()
405         except KeyError:
406             # On Windows, prefixMap isn't available over LDAP
407             req8 = self._exop_req8(dest_dsa=None,
408                                    invocation_id=dc_guid_1,
409                                    nc_dn_str=self.user,
410                                    exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
411             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
412             pfm = ctr.mapping_ctr
413
414
415         req8 = self._exop_req8(dest_dsa=None,
416                                invocation_id=dc_guid_1,
417                                nc_dn_str=self.user,
418                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
419                                partial_attribute_set=partial_attribute_set,
420                                partial_attribute_set_ex=partial_attribute_set_ex,
421                                mapping_ctr=pfm)
422
423         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
424
425         found = False
426         for attr in ctr.first_object.object.attribute_ctr.attributes:
427             if attr.attid == drsuapi.DRSUAPI_ATTID_name:
428                 found = True
429                 break
430
431         self.assertTrue(found, "Ensure we get the name attribute back")
432
433         found = False
434         for attr in ctr.first_object.object.attribute_ctr.attributes:
435             if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
436                 found = True
437                 break
438
439         self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
440
441         for i, mapping in enumerate(pfm.mappings):
442             # OID: 2.5.4.*
443             # objectClass: 2.5.4.0
444             if mapping.oid.binary_oid == [85, 4]:
445                 idx1 = i
446             # OID: 1.2.840.113556.1.4.*
447             # name: 1.2.840.113556.1.4.1
448             # unicodePwd: 1.2.840.113556.1.4.90
449             elif mapping.oid.binary_oid == [42, 134, 72, 134, 247, 20, 1, 4]:
450                 idx2 = i
451
452         (pfm.mappings[idx1].id_prefix,
453          pfm.mappings[idx2].id_prefix) = (pfm.mappings[idx2].id_prefix,
454                                           pfm.mappings[idx1].id_prefix)
455
456         tmp = pfm.mappings
457         tmp[idx1], tmp[idx2] = tmp[idx2], tmp[idx1]
458         pfm.mappings = tmp
459
460         # 1 for name (with new prefix = 0)
461         partial_attribute_set = self.get_partial_attribute_set([1])
462         # 90 for unicodePwd (with new prefix = 0)
463         # HOWEVER: Windows doesn't seem to respect incoming maps for PartialAttrSetEx
464         partial_attribute_set_ex = self.get_partial_attribute_set([drsuapi.DRSUAPI_ATTID_unicodePwd])
465         req8 = self._exop_req8(dest_dsa=None,
466                                invocation_id=dc_guid_1,
467                                nc_dn_str=self.user,
468                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
469                                partial_attribute_set=partial_attribute_set,
470                                partial_attribute_set_ex=partial_attribute_set_ex,
471                                mapping_ctr=pfm)
472
473         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
474
475         found = False
476         for attr in ctr.first_object.object.attribute_ctr.attributes:
477             if attr.attid == drsuapi.DRSUAPI_ATTID_name:
478                 found = True
479                 break
480
481         self.assertTrue(found, "Ensure we get the name attribute back")
482
483         found = False
484         for attr in ctr.first_object.object.attribute_ctr.attributes:
485             if attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
486                 found = True
487                 break
488
489         self.assertTrue(found, "Ensure we get the unicodePwd attribute back")
490
491     def _samdb_fetch_pfm_and_schi(self):
492         """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
493         samdb = self.ldb_dc1
494         res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
495                            attrs=["prefixMap", "schemaInfo"])
496
497         pfm = ndr_unpack(drsblobs.prefixMapBlob,
498                          str(res[0]['prefixMap']))
499
500         schi = drsuapi.DsReplicaOIDMapping()
501         schi.id_prefix = 0
502
503         if 'schemaInfo' in res[0]:
504             schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
505             schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
506         else:
507             schema_info = drsblobs.schemaInfoBlob()
508             schema_info.revision = 0
509             schema_info.marker = 0xFF
510             schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
511             schi.oid.length = len(map(ord, ndr_pack(schema_info)))
512             schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
513
514         pfm.ctr.mappings = pfm.ctr.mappings + [schi]
515         pfm.ctr.num_mappings += 1
516         return pfm.ctr
517
518 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
519     def setUp(self):
520         super(DrsReplicaSyncSortTestCase, self).setUp()
521         self.base_dn = self.ldb_dc1.get_default_basedn()
522         self.ou = "ou=sort_exop,%s" % self.base_dn
523         self.ldb_dc1.add({
524             "dn": self.ou,
525             "objectclass": "organizationalUnit"})
526
527     def tearDown(self):
528         super(DrsReplicaSyncSortTestCase, self).tearDown()
529         # tidyup groups and users
530         try:
531             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
532         except ldb.LdbError as (enum, string):
533             if enum == ldb.ERR_NO_SUCH_OBJECT:
534                 pass
535
536     def add_linked_attribute(self, src, dest, attr='member'):
537         m = ldb.Message()
538         m.dn = ldb.Dn(self.ldb_dc1, src)
539         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
540         self.ldb_dc1.modify(m)
541
542     def remove_linked_attribute(self, src, dest, attr='member'):
543         m = ldb.Message()
544         m.dn = ldb.Dn(self.ldb_dc1, src)
545         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
546         self.ldb_dc1.modify(m)
547
548     def test_sort_behaviour_single_object(self):
549         """Testing sorting behaviour on single objects"""
550
551         user1_dn = "cn=test_user1,%s" % self.ou
552         user2_dn = "cn=test_user2,%s" % self.ou
553         user3_dn = "cn=test_user3,%s" % self.ou
554         group_dn = "cn=test_group,%s" % self.ou
555
556         self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
557         self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
558         self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
559         self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
560
561         u1_guid = misc.GUID(self.ldb_dc1.search(base=user1_dn,
562                       attrs=["objectGUID"])[0]['objectGUID'][0])
563         u2_guid = misc.GUID(self.ldb_dc1.search(base=user2_dn,
564                       attrs=["objectGUID"])[0]['objectGUID'][0])
565         u3_guid = misc.GUID(self.ldb_dc1.search(base=user3_dn,
566                       attrs=["objectGUID"])[0]['objectGUID'][0])
567         g_guid = misc.GUID(self.ldb_dc1.search(base=group_dn,
568                      attrs=["objectGUID"])[0]['objectGUID'][0])
569
570         self.add_linked_attribute(group_dn, user1_dn,
571                                   attr='member')
572         self.add_linked_attribute(group_dn, user2_dn,
573                                   attr='member')
574         self.add_linked_attribute(group_dn, user3_dn,
575                                   attr='member')
576         self.add_linked_attribute(group_dn, user1_dn,
577                                   attr='managedby')
578         self.add_linked_attribute(group_dn, user2_dn,
579                                   attr='nonSecurityMember')
580         self.add_linked_attribute(group_dn, user3_dn,
581                                   attr='nonSecurityMember')
582
583         set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
584                                     drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
585                                     g_guid, u3_guid)
586
587         expected_links = set([set_inactive,
588         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
589                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
590                      g_guid,
591                      u1_guid),
592         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
593                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
594                      g_guid,
595                      u2_guid),
596         AbstractLink(drsuapi.DRSUAPI_ATTID_member,
597                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
598                      g_guid,
599                      u3_guid),
600         AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
601                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
602                      g_guid,
603                      u1_guid),
604         AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
605                      drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
606                      g_guid,
607                      u2_guid),
608         ])
609
610         dc_guid_1 = self.ldb_dc1.get_invocation_id()
611
612         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
613
614         req8 = self._exop_req8(dest_dsa=None,
615                 invocation_id=dc_guid_1,
616                 nc_dn_str=group_dn,
617                 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
618
619         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
620
621         no_inactive = []
622         for link in ctr.linked_attributes:
623             target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
624                                      link.value.blob).guid
625             no_inactive.append((link, target_guid))
626             self.assertTrue(AbstractLink(link.attid, link.flags,
627                                          link.identifier.guid,
628                                          target_guid) in expected_links)
629
630         no_inactive.sort(cmp=_linked_attribute_compare)
631
632         # assert the two arrays are the same
633         self.assertEqual(len(expected_links), ctr.linked_attributes_count)
634         self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
635
636         self.remove_linked_attribute(group_dn, user3_dn,
637                                      attr='nonSecurityMember')
638
639         # Set the link inactive
640         expected_links.remove(set_inactive)
641         set_inactive.flags = 0
642         expected_links.add(set_inactive)
643
644         has_inactive = []
645         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
646         for link in ctr.linked_attributes:
647             target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
648                                      link.value.blob).guid
649             has_inactive.append((link, target_guid))
650             self.assertTrue(AbstractLink(link.attid, link.flags,
651                                          link.identifier.guid,
652                                          target_guid) in expected_links)
653
654         has_inactive.sort(cmp=_linked_attribute_compare)
655
656         # assert the two arrays are the same
657         self.assertEqual(len(expected_links), ctr.linked_attributes_count)
658         self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
659
660     def test_sort_behaviour_ncchanges(self):
661         """Testing sorting behaviour on a group of objects."""
662         user1_dn = "cn=test_user1,%s" % self.ou
663         group_dn = "cn=test_group,%s" % self.ou
664         self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
665         self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
666
667         self.add_linked_attribute(group_dn, user1_dn,
668                                   attr='member')
669
670         dc_guid_1 = self.ldb_dc1.get_invocation_id()
671
672         drs, drs_handle = self._ds_bind(self.dnsname_dc1)
673
674         # Make sure the max objects count is high enough
675         req8 = self._exop_req8(dest_dsa=None,
676                                invocation_id=dc_guid_1,
677                                nc_dn_str=self.base_dn,
678                                replica_flags=0,
679                                max_objects=100,
680                                exop=drsuapi.DRSUAPI_EXOP_NONE)
681
682         # Loop until we get linked attributes, or we get to the end.
683         # Samba sends linked attributes at the end, unlike Windows.
684         while True:
685             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
686             if ctr.more_data == 0 or ctr.linked_attributes_count != 0:
687                 break
688             req8.highwatermark = ctr.new_highwatermark
689
690         self.assertTrue(ctr.linked_attributes_count != 0)
691
692         no_inactive = []
693         for link in ctr.linked_attributes:
694             try:
695                 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
696                                      link.value.blob).guid
697             except:
698                 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
699                                          link.value.blob).guid
700             no_inactive.append((link, target_guid))
701
702         no_inactive.sort(cmp=_linked_attribute_compare)
703
704         # assert the two arrays are the same
705         self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)