3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 # Copyright (C) Catalyst.Net Ltd
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 sys.path.insert(0, "bin/python")
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
28 import samba.getopt as options
32 from ldb import LdbError, SCOPE_BASE
33 from ldb import Message, MessageElement, Dn
34 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
35 from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_RODC_ATTRIBUTE
36 from samba.dcerpc import security, misc, drsblobs
37 from samba.ndr import ndr_unpack, ndr_pack
39 from samba.auth import system_session
40 from samba import gensec, sd_utils
41 from samba.samdb import SamDB
42 from samba.credentials import Credentials, DONT_USE_KERBEROS
44 from samba.tests import delete_force
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 subunitopts = SubunitOptions(parser)
55 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
64 ldaphost = "ldap://%s" % host
67 start = host.rindex("://")
68 host = host.lstrip(start + 3)
70 lp = sambaopts.get_loadparm()
71 creds = credopts.get_credentials(lp)
78 class DirsyncBaseTests(samba.tests.TestCase):
82 self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
83 self.base_dn = self.ldb_admin.domain_dn()
84 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
85 self.user_pass = samba.generate_random_password(12, 16)
86 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
87 self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
88 # used for anonymous login
89 print("baseDN: %s" % self.base_dn)
91 userou = "OU=dirsync-test"
92 self.ou = f"{userou},{self.base_dn}"
93 samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
94 self.ldb_admin.create_ou(self.ou)
95 self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
98 self.dirsync_user = "test_dirsync_user"
99 self.simple_user = "test_simple_user"
100 self.admin_user = "test_admin_user"
101 self.dirsync_pass = self.user_pass
102 self.simple_pass = self.user_pass
103 self.admin_pass = self.user_pass
105 self.ldb_admin.newuser(self.dirsync_user, self.dirsync_pass, userou=userou)
106 self.ldb_admin.newuser(self.simple_user, self.simple_pass, userou=userou)
107 self.ldb_admin.newuser(self.admin_user, self.admin_pass, userou=userou)
108 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
110 user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
111 mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
113 self.sd_utils.dacl_add_ace(self.base_dn, mod)
114 self.addCleanup(self.sd_utils.dacl_delete_aces, self.base_dn, mod)
116 # add admins to the Domain Admins group
117 self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
118 add_members_operation=True)
120 def get_user_dn(self, name):
121 return ldb.Dn(self.ldb_admin, "CN={0},{1}".format(name, self.ou))
123 def get_ldb_connection(self, target_username, target_password):
124 creds_tmp = Credentials()
125 creds_tmp.set_username(target_username)
126 creds_tmp.set_password(target_password)
127 creds_tmp.set_domain(creds.get_domain())
128 creds_tmp.set_realm(creds.get_realm())
129 creds_tmp.set_workstation(creds.get_workstation())
130 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
131 | gensec.FEATURE_SEAL)
132 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
133 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
136 # tests on ldap add operations
137 class SimpleDirsyncTests(DirsyncBaseTests):
139 # def test_dirsync_errors(self):
141 def test_dirsync_supported(self):
142 """Test the basic of the dirsync is supported"""
143 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
144 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
145 res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
146 res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
148 self.ldb_simple.search(self.base_dn,
149 expression="samaccountname=*",
150 controls=["dirsync:1:0:1"])
151 except LdbError as l:
152 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
154 def test_parentGUID_referrals(self):
155 res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
157 res = self.ldb_admin.search(self.base_dn,
158 expression="name=Configuration",
159 controls=["dirsync:1:0:1"])
160 self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
162 def test_ok_not_rootdc(self):
163 """Test if it's ok to do dirsync on another NC that is not the root DC"""
164 self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
165 expression="samaccountname=*",
166 controls=["dirsync:1:0:1"])
168 def test_dirsync_errors(self):
169 """Test if dirsync returns the correct LDAP errors in case of pb"""
170 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
171 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
173 self.ldb_simple.search(self.base_dn,
174 expression="samaccountname=*",
175 controls=["dirsync:1:0:1"])
176 except LdbError as l:
178 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
181 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
182 expression="samaccountname=*",
183 controls=["dirsync:1:0:1"])
184 except LdbError as l:
186 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
189 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
190 expression="samaccountname=*",
191 controls=["dirsync:1:1:1"])
192 except LdbError as l:
194 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
197 self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
198 expression="samaccountname=*",
199 controls=["dirsync:1:0:1"])
200 except LdbError as l:
202 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
205 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
206 expression="samaccountname=*",
207 controls=["dirsync:1:0:1"])
208 except LdbError as l:
210 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
213 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
214 expression="samaccountname=*",
215 controls=["dirsync:1:1:1"])
216 except LdbError as l:
218 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
220 def test_dirsync_attributes(self):
221 """Check behavior with some attributes """
222 res = self.ldb_admin.search(self.base_dn,
223 expression="samaccountname=*",
224 controls=["dirsync:1:0:1"])
225 # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
226 self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None)
227 # Check that non replicated attributes are not returned
228 self.assertTrue(res.msgs[0].get("badPwdCount") is None)
229 # Check that non forward link are not returned
230 self.assertTrue(res.msgs[0].get("memberof") is None)
232 # Asking for instanceType will return also objectGUID
233 res = self.ldb_admin.search(self.base_dn,
234 expression="samaccountname=Administrator",
235 attrs=["instanceType"],
236 controls=["dirsync:1:0:1"])
237 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
238 self.assertTrue(res.msgs[0].get("instanceType") is not None)
240 # We don't return an entry if asked for objectGUID
241 res = self.ldb_admin.search(self.base_dn,
242 expression="(distinguishedName=%s)" % str(self.base_dn),
243 attrs=["objectGUID"],
244 controls=["dirsync:1:0:1"])
245 self.assertEqual(len(res.msgs), 0)
247 # a request on the root of a NC didn't return parentGUID
248 res = self.ldb_admin.search(self.base_dn,
249 expression="(distinguishedName=%s)" % str(self.base_dn),
251 controls=["dirsync:1:0:1"])
252 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
253 self.assertTrue(res.msgs[0].get("name") is not None)
254 self.assertTrue(res.msgs[0].get("parentGUID") is None)
255 self.assertTrue(res.msgs[0].get("instanceType") is not None)
257 # Asking for name will return also objectGUID and parentGUID
258 # and instanceType and of course name
259 res = self.ldb_admin.search(self.base_dn,
260 expression="samaccountname=Administrator",
262 controls=["dirsync:1:0:1"])
263 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
264 self.assertTrue(res.msgs[0].get("name") is not None)
265 self.assertTrue(res.msgs[0].get("parentGUID") is not None)
266 self.assertTrue(res.msgs[0].get("instanceType") is not None)
268 # Asking for dn will not return not only DN but more like if attrs=*
269 # parentGUID should be returned
270 res = self.ldb_admin.search(self.base_dn,
271 expression="samaccountname=Administrator",
273 controls=["dirsync:1:0:1"])
274 count = len(res.msgs[0])
275 res2 = self.ldb_admin.search(self.base_dn,
276 expression="samaccountname=Administrator",
277 controls=["dirsync:1:0:1"])
278 count2 = len(res2.msgs[0])
279 self.assertEqual(count, count2)
281 # Asking for cn will return nothing on objects that have CN as RDN
282 res = self.ldb_admin.search(self.base_dn,
283 expression="samaccountname=Administrator",
285 controls=["dirsync:1:0:1"])
286 self.assertEqual(len(res.msgs), 0)
287 # Asking for parentGUID will return nothing too
288 res = self.ldb_admin.search(self.base_dn,
289 expression="samaccountname=Administrator",
290 attrs=["parentGUID"],
291 controls=["dirsync:1:0:1"])
292 self.assertEqual(len(res.msgs), 0)
293 ouname = "OU=testou,%s" % self.ou
295 self.ldb_admin.create_ou(ouname)
297 delta.dn = Dn(self.ldb_admin, ouname)
298 delta["cn"] = MessageElement("test ou",
301 self.ldb_admin.modify(delta)
302 res = self.ldb_admin.search(self.base_dn,
303 expression="name=testou",
305 controls=["dirsync:1:0:1"])
307 self.assertEqual(len(res.msgs), 1)
308 self.assertEqual(len(res.msgs[0]), 3)
309 delete_force(self.ldb_admin, ouname)
311 def test_dirsync_with_controls(self):
312 """Check that dirsync return correct information when dealing with the NC"""
313 res = self.ldb_admin.search(self.base_dn,
314 expression="(distinguishedName=%s)" % str(self.base_dn),
316 controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
318 def test_dirsync_basenc(self):
319 """Check that dirsync return correct information when dealing with the NC"""
320 res = self.ldb_admin.search(self.base_dn,
321 expression="(distinguishedName=%s)" % str(self.base_dn),
323 controls=["dirsync:1:0:10000"])
324 self.assertEqual(len(res.msgs), 1)
325 self.assertEqual(len(res.msgs[0]), 3)
327 res = self.ldb_admin.search(self.base_dn,
328 expression="(distinguishedName=%s)" % str(self.base_dn),
329 attrs=["ntSecurityDescriptor"],
330 controls=["dirsync:1:0:10000"])
331 self.assertEqual(len(res.msgs), 1)
332 self.assertEqual(len(res.msgs[0]), 3)
334 def test_dirsync_othernc(self):
335 """Check that dirsync return information for entries that are normally referrals (ie. other NCs)"""
336 res = self.ldb_admin.search(self.base_dn,
337 expression="(objectclass=configuration)",
339 controls=["dirsync:1:0:10000"])
340 self.assertEqual(len(res.msgs), 1)
341 self.assertEqual(len(res.msgs[0]), 4)
343 res = self.ldb_admin.search(self.base_dn,
344 expression="(objectclass=configuration)",
345 attrs=["ntSecurityDescriptor"],
346 controls=["dirsync:1:0:10000"])
347 self.assertEqual(len(res.msgs), 1)
348 self.assertEqual(len(res.msgs[0]), 3)
350 res = self.ldb_admin.search(self.base_dn,
351 expression="(objectclass=domaindns)",
352 attrs=["ntSecurityDescriptor"],
353 controls=["dirsync:1:0:10000"])
356 # only sub nc returns a result when asked for objectGUID
357 res = self.ldb_admin.search(self.base_dn,
358 expression="(objectclass=domaindns)",
359 attrs=["objectGUID"],
360 controls=["dirsync:1:0:0"])
361 self.assertEqual(len(res.msgs), nb - 1)
363 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
365 res = self.ldb_admin.search(self.base_dn,
366 expression="(objectclass=configuration)",
367 attrs=["objectGUID"],
368 controls=["dirsync:1:0:0"])
370 def test_dirsync_send_delta(self):
371 """Check that dirsync return correct delta when sending the last cookie"""
372 res = self.ldb_admin.search(self.base_dn,
373 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
374 controls=["dirsync:1:0:10000"])
375 ctl = str(res.controls[0]).split(":")
379 control = str(":".join(ctl))
380 res = self.ldb_admin.search(self.base_dn,
381 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
383 self.assertEqual(len(res), 0)
385 res = self.ldb_admin.search(self.base_dn,
386 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
387 controls=["dirsync:1:0:100000"])
389 ctl = str(res.controls[0]).split(":")
393 control2 = str(":".join(ctl))
396 ouname = "OU=testou2,%s" % self.base_dn
398 self.ldb_admin.create_ou(ouname)
399 res = self.ldb_admin.search(self.base_dn,
400 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
402 self.assertEqual(len(res), 1)
403 ctl = str(res.controls[0]).split(":")
407 control3 = str(":".join(ctl))
410 delta.dn = Dn(self.ldb_admin, str(ouname))
412 delta["cn"] = MessageElement("test ou",
415 self.ldb_admin.modify(delta)
416 res = self.ldb_admin.search(self.base_dn,
417 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
420 self.assertEqual(len(res.msgs), 1)
421 # 3 attributes: instanceType, cn and objectGUID
422 self.assertEqual(len(res.msgs[0]), 3)
425 delta.dn = Dn(self.ldb_admin, str(ouname))
426 delta["cn"] = MessageElement([],
429 self.ldb_admin.modify(delta)
430 res = self.ldb_admin.search(self.base_dn,
431 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
434 self.assertEqual(len(res.msgs), 1)
435 # So we won't have much attribute returned but instanceType and GUID
437 # 3 attributes: instanceType and objectGUID and cn but empty
438 self.assertEqual(len(res.msgs[0]), 3)
439 ouname = "OU=newouname,%s" % self.base_dn
440 self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
442 ctl = str(res.controls[0]).split(":")
446 control4 = str(":".join(ctl))
447 res = self.ldb_admin.search(self.base_dn,
448 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
451 self.assertTrue(res[0].get("parentGUID") is not None)
452 self.assertTrue(res[0].get("name") is not None)
453 delete_force(self.ldb_admin, ouname)
455 def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
456 """Check that dirsync returned deleted objects too"""
457 # Let's search for members
458 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
459 res = self.ldb_simple.search(self.base_dn,
460 expression="(name=Administrators)",
461 controls=["dirsync:1:1:1"])
463 self.assertTrue(len(res[0].get("member")) > 0)
464 size = len(res[0].get("member"))
466 ctl = str(res.controls[0]).split(":")
470 control1 = str(":".join(ctl))
471 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
472 add_members_operation=True)
474 res = self.ldb_simple.search(self.base_dn,
475 expression="(name=Administrators)",
478 self.assertEqual(len(res[0].get("member")), size + 1)
479 ctl = str(res.controls[0]).split(":")
483 control1 = str(":".join(ctl))
485 # remove the user from the group
486 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
487 add_members_operation=False)
489 res = self.ldb_simple.search(self.base_dn,
490 expression="(name=Administrators)",
493 self.assertEqual(len(res[0].get("member")), size)
495 self.ldb_admin.newgroup("testgroup")
496 self.addCleanup(self.ldb_admin.deletegroup, "testgroup")
497 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
498 add_members_operation=True)
500 res = self.ldb_admin.search(self.base_dn,
501 expression="(name=testgroup)",
502 controls=["dirsync:1:0:1"])
504 self.assertEqual(len(res[0].get("member")), 1)
505 self.assertTrue(res[0].get("member") != "")
507 ctl = str(res.controls[0]).split(":")
511 control1 = str(":".join(ctl))
513 # Check that reasking the same question but with an updated cookie
514 # didn't return any results.
516 res = self.ldb_admin.search(self.base_dn,
517 expression="(name=testgroup)",
519 self.assertEqual(len(res), 0)
521 ctl = str(res.controls[0]).split(":")
525 control1 = str(":".join(ctl))
527 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
528 add_members_operation=False)
530 res = self.ldb_admin.search(self.base_dn,
531 expression="(name=testgroup)",
535 self.assertEqual(len(res[0].get("member")), 0)
537 def test_dirsync_deleted_items(self):
538 """Check that dirsync returned deleted objects too"""
540 ouname = "OU=testou3,%s" % self.base_dn
542 self.ldb_admin.create_ou(ouname)
543 res = self.ldb_admin.search(self.base_dn,
544 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
545 controls=["dirsync:1:0:1"])
548 if str(e["name"]) == "testou3":
549 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
551 ctl = str(res.controls[0]).split(":")
555 control1 = str(":".join(ctl))
557 # So now delete the object and check that
558 # we can see the object but deleted when admin
559 delete_force(self.ldb_admin, ouname)
561 res = self.ldb_admin.search(self.base_dn,
562 expression="(objectClass=organizationalUnit)",
564 self.assertEqual(len(res), 1)
565 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
566 self.assertEqual(guid2, guid)
567 self.assertTrue(res[0].get("isDeleted"))
568 self.assertTrue(res[0].get("name") is not None)
570 def test_cookie_from_others(self):
571 res = self.ldb_admin.search(self.base_dn,
572 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
573 controls=["dirsync:1:0:1"])
574 ctl = str(res.controls[0]).split(":")
575 cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
576 cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
577 controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
578 res = self.ldb_admin.search(self.base_dn,
579 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
582 def test_dirsync_linkedattributes_range(self):
583 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
584 res = self.ldb_admin.search(self.base_dn,
585 attrs=["member;range=1-1"],
586 expression="(name=Administrators)",
587 controls=["dirsync:1:0:0"])
589 self.assertTrue(len(res) > 0)
590 self.assertTrue(res[0].get("member;range=1-1") is None)
591 self.assertTrue(res[0].get("member") is not None)
592 self.assertTrue(len(res[0].get("member")) > 0)
594 def test_dirsync_linkedattributes_range_user(self):
595 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
597 res = self.ldb_simple.search(self.base_dn,
598 attrs=["member;range=1-1"],
599 expression="(name=Administrators)",
600 controls=["dirsync:1:0:0"])
601 except LdbError as e:
603 self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
607 def test_dirsync_linkedattributes(self):
608 flag_incr_linked = 2147483648
609 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
610 res = self.ldb_admin.search(self.base_dn,
612 expression="(name=Administrators)",
613 controls=["dirsync:1:%d:1" % flag_incr_linked])
615 self.assertTrue(res[0].get("member;range=1-1") is not None)
616 self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
617 size = len(res[0].get("member;range=1-1"))
619 ctl = str(res.controls[0]).split(":")
621 ctl[2] = "%d" % flag_incr_linked
623 control1 = str(":".join(ctl))
624 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
625 add_members_operation=True)
626 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
627 add_members_operation=True)
629 res = self.ldb_admin.search(self.base_dn,
630 expression="(name=Administrators)",
633 self.assertEqual(len(res[0].get("member;range=1-1")), 2)
634 ctl = str(res.controls[0]).split(":")
636 ctl[2] = "%d" % flag_incr_linked
638 control1 = str(":".join(ctl))
640 # remove the user from the group
641 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
642 add_members_operation=False)
644 res = self.ldb_admin.search(self.base_dn,
645 expression="(name=Administrators)",
648 self.assertEqual(res[0].get("member;range=1-1"), None)
649 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
651 ctl = str(res.controls[0]).split(":")
653 ctl[2] = "%d" % flag_incr_linked
655 control2 = str(":".join(ctl))
657 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
658 add_members_operation=False)
660 res = self.ldb_admin.search(self.base_dn,
661 expression="(name=Administrators)",
664 self.assertEqual(res[0].get("member;range=1-1"), None)
665 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
667 res = self.ldb_admin.search(self.base_dn,
668 expression="(name=Administrators)",
671 self.assertEqual(res[0].get("member;range=1-1"), None)
672 self.assertEqual(len(res[0].get("member;range=0-0")), 2)
674 def test_dirsync_extended_dn(self):
675 """Check that dirsync works together with the extended_dn control"""
676 # Let's search for members
677 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
678 res = self.ldb_simple.search(self.base_dn,
679 expression="(name=Administrators)",
680 controls=["dirsync:1:1:1"])
682 self.assertTrue(len(res[0].get("member")) > 0)
683 size = len(res[0].get("member"))
685 resEX1 = self.ldb_simple.search(self.base_dn,
686 expression="(name=Administrators)",
687 controls=["dirsync:1:1:1","extended_dn:1:1"])
688 self.assertTrue(len(resEX1[0].get("member")) > 0)
689 sizeEX1 = len(resEX1[0].get("member"))
690 self.assertEqual(sizeEX1, size)
691 self.assertIn(res[0]["member"][0], resEX1[0]["member"][0])
692 self.assertIn(b"<GUID=", resEX1[0]["member"][0])
693 self.assertIn(b">;<SID=S-1-5-21-", resEX1[0]["member"][0])
695 resEX0 = self.ldb_simple.search(self.base_dn,
696 expression="(name=Administrators)",
697 controls=["dirsync:1:1:1","extended_dn:1:0"])
698 self.assertTrue(len(resEX0[0].get("member")) > 0)
699 sizeEX0 = len(resEX0[0].get("member"))
700 self.assertEqual(sizeEX0, size)
701 self.assertIn(res[0]["member"][0], resEX0[0]["member"][0])
702 self.assertIn(b"<GUID=", resEX0[0]["member"][0])
703 self.assertIn(b">;<SID=010500000000000515", resEX0[0]["member"][0])
705 def test_dirsync_deleted_items_OBJECT_SECURITY(self):
706 """Check that dirsync returned deleted objects too"""
708 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
709 ouname = "OU=testou3,%s" % self.base_dn
711 self.ldb_admin.create_ou(ouname)
713 # Specify LDAP_DIRSYNC_OBJECT_SECURITY
714 res = self.ldb_simple.search(self.base_dn,
715 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
716 controls=["dirsync:1:1:1"])
720 if str(e["name"]) == "testou3":
721 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
723 self.assertTrue(guid is not None)
724 ctl = str(res.controls[0]).split(":")
728 control1 = str(":".join(ctl))
730 # So now delete the object and check that
731 # we can see the object but deleted when admin
732 # we just see the objectGUID when simple user
733 delete_force(self.ldb_admin, ouname)
735 res = self.ldb_simple.search(self.base_dn,
736 expression="(objectClass=organizationalUnit)",
738 self.assertEqual(len(res), 1)
739 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
740 self.assertEqual(guid2, guid)
741 self.assertEqual(str(res[0].dn), "")
743 class SpecialDirsyncTests(DirsyncBaseTests):
748 self.schema_dn = self.ldb_admin.get_schema_basedn()
750 # the tests work by setting the 'Confidential' or 'RODC Filtered' bit in the searchFlags
751 # for an existing schema attribute. This only works against Windows if
752 # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
753 # schema attribute being modified. There are only a few attributes that
754 # meet this criteria (most of which only apply to 'user' objects)
755 self.conf_attr = "homePostalAddress"
756 attr_cn = "CN=Address-Home"
757 # schemaIdGuid for homePostalAddress (used for ACE tests)
758 self.attr_dn = f"{attr_cn},{self.schema_dn}"
760 userou = "OU=conf-attr-test"
761 self.ou = "{0},{1}".format(userou, self.base_dn)
762 samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
763 self.ldb_admin.create_ou(self.ou)
764 self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
766 # add a test object with this attribute set
767 self.conf_value = "abcdef"
768 self.conf_user = "conf-user"
769 self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
770 self.conf_dn = self.get_user_dn(self.conf_user)
771 self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
773 # sanity-check the flag is not already set (this'll cause problems if
774 # previous test run didn't clean up properly)
776 search_flags = int(self.get_attr_search_flags(self.attr_dn))
777 if search_flags & SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE:
778 self.set_attr_search_flags(self.attr_dn, str(search_flags &~ (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE)))
779 search_flags = int(self.get_attr_search_flags(self.attr_dn))
780 self.assertEqual(0, search_flags & (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE),
781 f"{self.conf_attr} searchFlags did not reset to omit SEARCH_FLAG_CONFIDENTIAL and SEARCH_FLAG_RODC_ATTRIBUTE ({search_flags})")
783 # work out the original 'searchFlags' value before we overwrite it
784 old_value = self.get_attr_search_flags(self.attr_dn)
786 self.set_attr_search_flags(self.attr_dn, str(self.flag_under_test))
788 # reset the value after the test completes
789 self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
791 def add_attr(self, dn, attr, value):
794 m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
795 self.ldb_admin.modify(m)
797 def set_attr_search_flags(self, attr_dn, flags):
798 """Modifies the searchFlags for an object in the schema"""
800 m.dn = Dn(self.ldb_admin, attr_dn)
801 m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
803 self.ldb_admin.modify(m)
805 # note we have to update the schema for this change to take effect (on
807 self.ldb_admin.set_schema_update_now()
809 def get_attr_search_flags(self, attr_dn):
810 res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
811 attrs=['searchFlags'])
812 return res[0]['searchFlags'][0]
814 def find_under_current_ou(self, res):
816 if msg.dn == self.conf_dn:
818 self.fail(f"Failed to find object {self.conf_dn} in {len(res)} results")
821 class ConfidentialDirsyncTests(SpecialDirsyncTests):
824 self.flag_under_test = SEARCH_FLAG_CONFIDENTIAL
827 def test_unicodePwd_normal(self):
828 res = self.ldb_admin.search(self.base_dn,
829 attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
830 expression=f"(samAccountName={self.conf_user})")
834 self.assertTrue("samAccountName" in msg)
835 # This form ensures this is a case insensitive comparison
836 self.assertTrue(msg.get("samAccountName"))
837 self.assertTrue(msg.get("unicodePwd") is None)
838 self.assertTrue(msg.get("supplementalCredentials") is None)
840 def _test_dirsync_unicodePwd(self, ldb_conn, control=None, insist_on_empty_element=False):
841 res = ldb_conn.search(self.base_dn,
842 attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
843 expression=f"(samAccountName={self.conf_user})",
846 msg = self.find_under_current_ou(res)
848 self.assertTrue("samAccountName" in msg)
849 # This form ensures this is a case insensitive comparison
850 self.assertTrue(msg.get("samAccountName"))
851 if insist_on_empty_element:
852 self.assertTrue(msg.get("unicodePwd") is not None)
853 self.assertEqual(len(msg.get("unicodePwd")), 0)
854 self.assertTrue(msg.get("supplementalCredentials") is not None)
855 self.assertEqual(len(msg.get("supplementalCredentials")), 0)
857 self.assertTrue(msg.get("unicodePwd") is None
858 or len(msg.get("unicodePwd")) == 0)
859 self.assertTrue(msg.get("supplementalCredentials") is None
860 or len(msg.get("supplementalCredentials")) == 0)
862 def test_dirsync_unicodePwd_OBJ_SEC(self):
863 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
864 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
866 def test_dirsync_unicodePwd_OBJ_SEC_insist_on_empty_element(self):
867 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
868 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
870 def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC(self):
871 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
872 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
874 def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC_insist_on_empty_element(self):
875 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
876 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
878 def test_dirsync_unicodePwd_with_GET_CHANGES(self):
879 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
880 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0")
882 def test_dirsync_unicodePwd_with_GET_CHANGES_insist_on_empty_element(self):
883 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
884 self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0", insist_on_empty_element=True)
886 def test_normal(self):
887 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
888 res = ldb_conn.search(self.base_dn,
889 attrs=[self.conf_attr, "samAccountName"],
890 expression=f"(samAccountName={self.conf_user})")
893 self.assertTrue("samAccountName" in msg)
894 # This form ensures this is a case insensitive comparison
895 self.assertTrue(msg.get("samAccountName"))
896 self.assertTrue(msg.get(self.conf_attr) is None)
898 def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
899 res = ldb_conn.search(self.base_dn,
900 attrs=[self.conf_attr, "samAccountName"],
901 expression=f"(samAccountName={self.conf_user})",
902 controls=["dirsync:1:1:0"])
904 msg = self.find_under_current_ou(res)
905 self.assertTrue("samAccountName" in msg)
906 # This form ensures this is a case insensitive comparison
907 self.assertTrue(msg.get("samAccountName"))
908 if insist_on_empty_element:
909 self.assertTrue(msg.get(self.conf_attr) is not None)
910 self.assertEqual(len(msg.get(self.conf_attr)), 0)
912 self.assertTrue(msg.get(self.conf_attr) is None
913 or len(msg.get(self.conf_attr)) == 0)
915 def test_dirsync_OBJECT_SECURITY(self):
916 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
917 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
919 def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
920 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
921 self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
923 def test_dirsync_with_GET_CHANGES(self):
924 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
925 res = ldb_conn.search(self.base_dn,
926 attrs=[self.conf_attr, "samAccountName"],
927 expression=f"(samAccountName={self.conf_user})",
928 controls=["dirsync:1:0:0"])
930 msg = self.find_under_current_ou(res)
931 # This form ensures this is a case insensitive comparison
932 self.assertTrue(msg.get("samAccountName"))
933 self.assertTrue(msg.get(self.conf_attr))
934 self.assertEqual(len(msg.get(self.conf_attr)), 1)
936 def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY(self):
937 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
938 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
940 def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY_insist_on_empty_element(self):
941 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
942 self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
944 class FilteredDirsyncTests(SpecialDirsyncTests):
947 self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE
951 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
952 res = ldb_conn.search(self.base_dn,
953 attrs=[self.conf_attr, "samAccountName"],
954 expression=f"(samAccountName={self.conf_user})")
957 self.assertTrue("samAccountName" in msg)
958 # This form ensures this is a case insensitive comparison
959 self.assertTrue(msg.get("samAccountName"))
960 self.assertTrue(msg.get(self.conf_attr))
961 self.assertEqual(len(msg.get(self.conf_attr)), 1)
963 def _test_dirsync_OBJECT_SECURITY(self, ldb_conn):
964 res = ldb_conn.search(self.base_dn,
965 attrs=[self.conf_attr, "samAccountName"],
966 expression=f"(samAccountName={self.conf_user})",
967 controls=["dirsync:1:1:0"])
969 msg = self.find_under_current_ou(res)
970 self.assertTrue("samAccountName" in msg)
971 # This form ensures this is a case insensitive comparison
972 self.assertTrue(msg.get("samAccountName"))
973 self.assertTrue(msg.get(self.conf_attr))
974 self.assertEqual(len(msg.get(self.conf_attr)), 1)
976 def test_dirsync_OBJECT_SECURITY(self):
977 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
978 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
980 def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
981 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
982 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
984 def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
985 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
986 res = ldb_conn.search(self.base_dn,
987 expression=f"(samAccountName={self.conf_user})",
988 controls=["dirsync:1:0:0"])
990 msg = self.find_under_current_ou(res)
991 # This form ensures this is a case insensitive comparison
992 self.assertTrue(msg.get("samAccountName"))
993 if insist_on_empty_element:
994 self.assertTrue(msg.get(self.conf_attr) is not None)
995 self.assertEqual(len(msg.get(self.conf_attr)), 0)
997 self.assertTrue(msg.get(self.conf_attr) is None
998 or len(msg.get(self.conf_attr)) == 0)
1000 def test_dirsync_with_GET_CHANGES(self):
1001 self._test_dirsync_with_GET_CHANGES()
1003 def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
1004 self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
1006 def test_dirsync_with_GET_CHANGES_attr(self):
1007 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1009 res = ldb_conn.search(self.base_dn,
1010 attrs=[self.conf_attr, "samAccountName"],
1011 expression=f"(samAccountName={self.conf_user})",
1012 controls=["dirsync:1:0:0"])
1013 self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1014 except ldb.LdbError as e:
1015 (errno, errstr) = e.args
1016 self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
1018 class ConfidentialFilteredDirsyncTests(SpecialDirsyncTests):
1021 self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE|SEARCH_FLAG_CONFIDENTIAL
1024 def test_attr(self):
1025 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1026 res = ldb_conn.search(self.base_dn,
1027 attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
1028 expression=f"(samAccountName={self.conf_user})")
1031 self.assertTrue(msg.get("samAccountName"))
1032 self.assertTrue(msg.get(self.conf_attr) is None)
1034 def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
1035 res = ldb_conn.search(self.base_dn,
1036 attrs=[self.conf_attr, "samAccountName"],
1037 expression=f"(samAccountName={self.conf_user})",
1038 controls=["dirsync:1:1:0"])
1040 msg = self.find_under_current_ou(res)
1041 self.assertTrue("samAccountName" in msg)
1042 # This form ensures this is a case insensitive comparison
1043 self.assertTrue(msg.get("samAccountName"))
1044 if insist_on_empty_element:
1045 self.assertTrue(msg.get(self.conf_attr) is not None)
1046 self.assertEqual(len(msg.get(self.conf_attr)), 0)
1048 self.assertTrue(msg.get(self.conf_attr) is None
1049 or len(msg.get(self.conf_attr)) == 0)
1051 def test_dirsync_OBJECT_SECURITY(self):
1052 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1053 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
1055 def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
1056 ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1057 self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
1059 def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
1060 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1061 self._test_dirsync_OBJECT_SECURITY(ldb_conn)
1063 def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES_insist_on_empty_element(self):
1064 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1065 self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
1067 def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
1068 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1069 res = ldb_conn.search(self.base_dn,
1070 expression=f"(samAccountName={self.conf_user})",
1071 controls=["dirsync:1:0:0"])
1073 msg = self.find_under_current_ou(res)
1074 # This form ensures this is a case insensitive comparison
1075 self.assertTrue(msg.get("samAccountName"))
1076 if insist_on_empty_element:
1077 self.assertTrue(msg.get(self.conf_attr) is not None)
1078 self.assertEqual(len(msg.get(self.conf_attr)), 0)
1080 self.assertTrue(msg.get(self.conf_attr) is None
1081 or len(msg.get(self.conf_attr)) == 0)
1083 def test_dirsync_with_GET_CHANGES(self):
1084 self._test_dirsync_with_GET_CHANGES()
1086 def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
1087 self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
1089 def test_dirsync_with_GET_CHANGES_attr(self):
1090 ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1092 res = ldb_conn.search(self.base_dn,
1093 attrs=[self.conf_attr, "samAccountName"],
1094 expression=f"(samAccountName={self.conf_user})",
1095 controls=["dirsync:1:0:0"])
1096 self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1097 except ldb.LdbError as e:
1098 (errno, errstr) = e.args
1099 self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
1102 if not getattr(opts, "listtests", False):
1103 lp = sambaopts.get_loadparm()
1104 samba.tests.cmdline_credentials = credopts.get_credentials(lp)
1107 TestProgram(module=__name__, opts=subunitopts)