2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # export DC1=dc1_dns_name
26 # export DC2=dc2_dns_name
27 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
35 from ldb import SCOPE_BASE
37 from samba.dcerpc import drsuapi, misc, drsblobs
38 from samba.drs_utils import drs_DsBind
39 from samba.ndr import ndr_unpack
41 def _linked_attribute_compare(la1, la2):
42 """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
46 # Ascending host object GUID
47 c = cmp(la1.identifier.guid, la2.identifier.guid)
51 # Ascending attribute ID
52 if la1.attid != la2.attid:
53 return -1 if la1.attid < la2.attid else 1
55 la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
56 la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
58 # Ascending 'is present'
59 if la1_active != la2_active:
60 return 1 if la1_active else -1
62 # Ascending target object GUID
63 return cmp(la1_target, la2_target)
66 def __init__(self, attid, flags, identifier, targetGUID):
69 self.identifier = identifier
70 self.targetGUID = targetGUID
72 def __eq__(self, other):
73 return isinstance(other, AbstractLink) and \
74 ((self.attid, self.flags, self.identifier, self.targetGUID) ==
75 (other.attid, other.flags, other.identifier, other.targetGUID))
78 return hash((self.attid, self.flags, self.identifier, self.targetGUID))
81 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
83 req8 = drsuapi.DsGetNCChangesRequest8()
85 req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
86 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
87 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
88 req8.naming_context.dn = unicode(nc_dn_str)
89 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
90 req8.highwatermark.tmp_highest_usn = 0
91 req8.highwatermark.reserved_usn = 0
92 req8.highwatermark.highest_usn = 0
93 req8.uptodateness_vector = None
94 req8.replica_flags = replica_flags
95 req8.max_object_count = 0
96 req8.max_ndr_size = 402116
97 req8.extended_op = exop
99 req8.partial_attribute_set = None
100 req8.partial_attribute_set_ex = None
101 req8.mapping_ctr.num_mappings = 0
102 req8.mapping_ctr.mappings = None
106 def _ds_bind(self, server_name):
107 binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
109 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
110 (drs_handle, supported_extensions) = drs_DsBind(drs)
111 return (drs, drs_handle)
114 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
115 """Intended as a semi-black box test case for DsGetNCChanges
116 implementation for extended operations. It should be testing
117 how DsGetNCChanges handles different input params (mostly invalid).
118 Final goal is to make DsGetNCChanges as binary compatible to
119 Windows implementation as possible"""
122 super(DrsReplicaSyncTestCase, self).setUp()
125 super(DrsReplicaSyncTestCase, self).tearDown()
127 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
128 """Returns (owner, not_owner) pair where:
129 owner: dns name for FSMO owner
130 not_owner: dns name for DC not owning the FSMO"""
131 # collect info to return later
132 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
133 "invocation_id": self.ldb_dc1.get_invocation_id(),
134 "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
135 "server_dn": self.ldb_dc1.get_serverName()}
136 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
137 "invocation_id": self.ldb_dc2.get_invocation_id(),
138 "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
139 "server_dn": self.ldb_dc2.get_serverName()}
141 msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
142 fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
143 fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
145 msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
146 fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
147 fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
149 # determine the owner dc
150 res = self.ldb_dc1.search(fsmo_obj_dn,
151 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
152 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
153 fsmo_owner = res[0]["fSMORoleOwner"][0]
154 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
155 return (fsmo_info_1, fsmo_info_2)
156 return (fsmo_info_2, fsmo_info_1)
158 def _check_exop_failed(self, ctr6, expected_failure):
159 self.assertEqual(ctr6.extended_ret, expected_failure)
160 #self.assertEqual(ctr6.object_count, 0)
161 #self.assertEqual(ctr6.first_object, None)
162 self.assertEqual(ctr6.more_data, False)
163 self.assertEqual(ctr6.nc_object_count, 0)
164 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
165 self.assertEqual(ctr6.linked_attributes_count, 0)
166 self.assertEqual(ctr6.linked_attributes, [])
167 self.assertEqual(ctr6.drs_error[0], 0)
169 def test_FSMONotOwner(self):
170 """Test role transfer with against DC not owner of the role"""
171 fsmo_dn = self.ldb_dc1.get_schema_basedn()
172 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
174 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
175 invocation_id=fsmo_not_owner["invocation_id"],
177 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
179 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
180 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
181 self.assertEqual(level, 6, "Expected level 6 response!")
182 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
183 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
184 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
186 def test_InvalidDestDSA(self):
187 """Test role transfer with invalid destination DSA guid"""
188 fsmo_dn = self.ldb_dc1.get_schema_basedn()
189 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
191 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
192 invocation_id=fsmo_owner["invocation_id"],
194 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
196 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
197 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
198 self.assertEqual(level, 6, "Expected level 6 response!")
199 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
200 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
201 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
203 def test_InvalidDestDSA_ridalloc(self):
204 """Test RID allocation with invalid destination DSA guid"""
205 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
206 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
208 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
209 invocation_id=fsmo_owner["invocation_id"],
211 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
213 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
214 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
215 self.assertEqual(level, 6, "Expected level 6 response!")
216 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
217 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
218 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
220 def test_do_ridalloc(self):
221 """Test doing a RID allocation with a valid destination DSA guid"""
222 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
223 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
225 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
226 invocation_id=fsmo_owner["invocation_id"],
228 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
230 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
231 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
232 self.assertEqual(level, 6, "Expected level 6 response!")
233 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
234 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
236 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
237 self.assertEqual(ctr6.object_count, 3)
238 self.assertNotEqual(ctr6.first_object, None)
239 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
240 self.assertNotEqual(ctr6.first_object.next_object, None)
241 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
242 second_object = ctr6.first_object.next_object.object
243 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
244 third_object = ctr6.first_object.next_object.next_object.object
245 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
247 self.assertEqual(ctr6.more_data, False)
248 self.assertEqual(ctr6.nc_object_count, 0)
249 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
250 self.assertEqual(ctr6.drs_error[0], 0)
251 # We don't check the linked_attributes_count as if the domain
252 # has an RODC, it can gain links on the server account object
254 def test_do_ridalloc_get_anc(self):
255 """Test doing a RID allocation with a valid destination DSA guid and """
256 fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
257 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
259 req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
260 invocation_id=fsmo_owner["invocation_id"],
262 exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
263 replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
265 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
266 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
267 self.assertEqual(level, 6, "Expected level 6 response!")
268 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
269 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
271 self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
272 self.assertEqual(ctr6.object_count, 3)
273 self.assertNotEqual(ctr6.first_object, None)
274 self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
275 self.assertNotEqual(ctr6.first_object.next_object, None)
276 self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
277 second_object = ctr6.first_object.next_object.object
278 self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
279 third_object = ctr6.first_object.next_object.next_object.object
280 self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
281 self.assertEqual(ctr6.more_data, False)
282 self.assertEqual(ctr6.nc_object_count, 0)
283 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
284 self.assertEqual(ctr6.drs_error[0], 0)
285 # We don't check the linked_attributes_count as if the domain
286 # has an RODC, it can gain links on the server account object
288 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
290 super(DrsReplicaSyncSortTestCase, self).setUp()
291 self.base_dn = self.ldb_dc1.get_default_basedn()
292 self.ou = "ou=sort_exop,%s" % self.base_dn
295 "objectclass": "organizationalUnit"})
298 super(DrsReplicaSyncSortTestCase, self).tearDown()
299 # tidyup groups and users
301 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
302 except ldb.LdbError as (enum, string):
303 if enum == ldb.ERR_NO_SUCH_OBJECT:
306 def test_sort_behaviour_single_object(self):
307 """Testing sorting behaviour on single objects"""
308 user1_dn = "cn=test_user1,%s" % self.ou
309 user2_dn = "cn=test_user2,%s" % self.ou
310 user3_dn = "cn=test_user3,%s" % self.ou
311 group_dn = "cn=test_group,%s" % self.ou
313 self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
314 self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
315 self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
316 self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
318 u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
319 attrs=["objectGUID"])[0]['objectGUID'][0]))
320 u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
321 attrs=["objectGUID"])[0]['objectGUID'][0]))
322 u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
323 attrs=["objectGUID"])[0]['objectGUID'][0]))
324 g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
325 attrs=["objectGUID"])[0]['objectGUID'][0]))
327 self.add_linked_attribute(group_dn, user1_dn,
329 self.add_linked_attribute(group_dn, user2_dn,
331 self.add_linked_attribute(group_dn, user3_dn,
333 self.add_linked_attribute(group_dn, user1_dn,
335 self.add_linked_attribute(group_dn, user2_dn,
336 attr='nonSecurityMember')
337 self.add_linked_attribute(group_dn, user3_dn,
338 attr='nonSecurityMember')
340 set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
341 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
344 expected_links = set([set_inactive,
345 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
346 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
349 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
350 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
353 AbstractLink(drsuapi.DRSUAPI_ATTID_member,
354 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
357 AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
358 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
361 AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
362 drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
367 dc_guid_1 = self.ldb_dc1.get_invocation_id()
369 drs, drs_handle = self._ds_bind(self.dnsname_dc1)
371 req8 = self._exop_req8(dest_dsa=None,
372 invocation_id=dc_guid_1,
374 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
376 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
379 for link in ctr.linked_attributes:
380 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
381 link.value.blob).guid
382 no_inactive.append((link, target_guid))
383 self.assertTrue(AbstractLink(link.attid, link.flags,
384 str(link.identifier.guid),
385 str(target_guid)) in expected_links)
387 no_inactive.sort(cmp=_linked_attribute_compare)
389 # assert the two arrays are the same
390 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
391 self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
393 self.remove_linked_attribute(group_dn, user3_dn,
394 attr='nonSecurityMember')
396 # Set the link inactive
397 expected_links.remove(set_inactive)
398 set_inactive.flags = 0
399 expected_links.add(set_inactive)
402 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
403 for link in ctr.linked_attributes:
404 target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
405 link.value.blob).guid
406 has_inactive.append((link, target_guid))
407 self.assertTrue(AbstractLink(link.attid, link.flags,
408 str(link.identifier.guid),
409 str(target_guid)) in expected_links)
411 has_inactive.sort(cmp=_linked_attribute_compare)
413 # assert the two arrays are the same
414 self.assertEqual(len(expected_links), ctr.linked_attributes_count)
415 self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
417 def add_linked_attribute(self, src, dest, attr='member'):
419 m.dn = ldb.Dn(self.ldb_dc1, src)
420 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
421 self.ldb_dc1.modify(m)
423 def remove_linked_attribute(self, src, dest, attr='member'):
425 m.dn = ldb.Dn(self.ldb_dc1, src)
426 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
427 self.ldb_dc1.modify(m)