3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
24 sys.path.insert(0, "bin/python")
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
28 import samba.getopt as options
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs, security
35 from samba.ndr import ndr_unpack, ndr_pack
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
42 from samba.tests import delete_force
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
62 ldaphost = "ldap://%s" % host
63 ldapshost = "ldaps://%s" % host
66 start = host.rindex("://")
67 host = host.lstrip(start+3)
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
76 class DirsyncBaseTests(samba.tests.TestCase):
79 super(DirsyncBaseTests, self).setUp()
80 self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
81 self.base_dn = self.ldb_admin.domain_dn()
82 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
83 self.user_pass = samba.generate_random_password(12, 16)
84 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85 self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
86 #used for anonymous login
87 print("baseDN: %s" % self.base_dn)
89 def get_user_dn(self, name):
90 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
92 def get_ldb_connection(self, target_username, target_password):
93 creds_tmp = Credentials()
94 creds_tmp.set_username(target_username)
95 creds_tmp.set_password(target_password)
96 creds_tmp.set_domain(creds.get_domain())
97 creds_tmp.set_realm(creds.get_realm())
98 creds_tmp.set_workstation(creds.get_workstation())
99 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100 | gensec.FEATURE_SEAL)
101 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
102 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
106 #tests on ldap add operations
107 class SimpleDirsyncTests(DirsyncBaseTests):
110 super(SimpleDirsyncTests, self).setUp()
112 self.dirsync_user = "test_dirsync_user"
113 self.simple_user = "test_simple_user"
114 self.admin_user = "test_admin_user"
117 self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
118 self.ldb_admin.newuser(self.simple_user, self.user_pass)
119 self.ldb_admin.newuser(self.admin_user, self.user_pass)
120 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
122 user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
123 mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
125 self.sd_utils.dacl_add_ace(self.base_dn, mod)
127 # add admins to the Domain Admins group
128 self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
129 add_members_operation=True)
132 super(SimpleDirsyncTests, self).tearDown()
133 delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
134 delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
135 delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
137 delete_force(self.ldb_admin, self.ouname)
138 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
140 self.ldb_admin.deletegroup("testgroup")
144 #def test_dirsync_errors(self):
146 def test_dirsync_supported(self):
147 """Test the basic of the dirsync is supported"""
148 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
149 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
150 res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151 res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
153 self.ldb_simple.search(self.base_dn,
154 expression="samaccountname=*",
155 controls=["dirsync:1:0:1"])
156 except LdbError as l:
157 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
159 def test_parentGUID_referrals(self):
160 res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
162 res = self.ldb_admin.search(self.base_dn,
163 expression="name=Configuration",
164 controls=["dirsync:1:0:1"])
165 self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
167 def test_ok_not_rootdc(self):
168 """Test if it's ok to do dirsync on another NC that is not the root DC"""
169 self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
170 expression="samaccountname=*",
171 controls=["dirsync:1:0:1"])
173 def test_dirsync_errors(self):
174 """Test if dirsync returns the correct LDAP errors in case of pb"""
175 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
176 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
178 self.ldb_simple.search(self.base_dn,
179 expression="samaccountname=*",
180 controls=["dirsync:1:0:1"])
181 except LdbError as l:
183 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
186 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
187 expression="samaccountname=*",
188 controls=["dirsync:1:0:1"])
189 except LdbError as l:
191 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
194 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
195 expression="samaccountname=*",
196 controls=["dirsync:1:1:1"])
197 except LdbError as l:
199 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
202 self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
203 expression="samaccountname=*",
204 controls=["dirsync:1:0:1"])
205 except LdbError as l:
207 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
210 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
211 expression="samaccountname=*",
212 controls=["dirsync:1:0:1"])
213 except LdbError as l:
215 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
218 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
219 expression="samaccountname=*",
220 controls=["dirsync:1:1:1"])
221 except LdbError as l:
223 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
225 def test_dirsync_attributes(self):
226 """Check behavior with some attributes """
227 res = self.ldb_admin.search(self.base_dn,
228 expression="samaccountname=*",
229 controls=["dirsync:1:0:1"])
230 # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
231 self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
232 # Check that non replicated attributes are not returned
233 self.assertTrue(res.msgs[0].get("badPwdCount") == None)
234 # Check that non forward link are not returned
235 self.assertTrue(res.msgs[0].get("memberof") == None)
237 # Asking for instanceType will return also objectGUID
238 res = self.ldb_admin.search(self.base_dn,
239 expression="samaccountname=Administrator",
240 attrs=["instanceType"],
241 controls=["dirsync:1:0:1"])
242 self.assertTrue(res.msgs[0].get("objectGUID") != None)
243 self.assertTrue(res.msgs[0].get("instanceType") != None)
245 # We don't return an entry if asked for objectGUID
246 res = self.ldb_admin.search(self.base_dn,
247 expression="(distinguishedName=%s)" % str(self.base_dn),
248 attrs=["objectGUID"],
249 controls=["dirsync:1:0:1"])
250 self.assertEquals(len(res.msgs), 0)
252 # a request on the root of a NC didn't return parentGUID
253 res = self.ldb_admin.search(self.base_dn,
254 expression="(distinguishedName=%s)" % str(self.base_dn),
256 controls=["dirsync:1:0:1"])
257 self.assertTrue(res.msgs[0].get("objectGUID") != None)
258 self.assertTrue(res.msgs[0].get("name") != None)
259 self.assertTrue(res.msgs[0].get("parentGUID") == None)
260 self.assertTrue(res.msgs[0].get("instanceType") != None)
262 # Asking for name will return also objectGUID and parentGUID
263 # and instanceType and of course name
264 res = self.ldb_admin.search(self.base_dn,
265 expression="samaccountname=Administrator",
267 controls=["dirsync:1:0:1"])
268 self.assertTrue(res.msgs[0].get("objectGUID") != None)
269 self.assertTrue(res.msgs[0].get("name") != None)
270 self.assertTrue(res.msgs[0].get("parentGUID") != None)
271 self.assertTrue(res.msgs[0].get("instanceType") != None)
273 # Asking for dn will not return not only DN but more like if attrs=*
274 # parentGUID should be returned
275 res = self.ldb_admin.search(self.base_dn,
276 expression="samaccountname=Administrator",
278 controls=["dirsync:1:0:1"])
279 count = len(res.msgs[0])
280 res2 = self.ldb_admin.search(self.base_dn,
281 expression="samaccountname=Administrator",
282 controls=["dirsync:1:0:1"])
283 count2 = len(res2.msgs[0])
284 self.assertEqual(count, count2)
286 # Asking for cn will return nothing on objects that have CN as RDN
287 res = self.ldb_admin.search(self.base_dn,
288 expression="samaccountname=Administrator",
290 controls=["dirsync:1:0:1"])
291 self.assertEqual(len(res.msgs), 0)
292 # Asking for parentGUID will return nothing too
293 res = self.ldb_admin.search(self.base_dn,
294 expression="samaccountname=Administrator",
295 attrs=["parentGUID"],
296 controls=["dirsync:1:0:1"])
297 self.assertEqual(len(res.msgs), 0)
298 ouname="OU=testou,%s" % self.base_dn
300 self.ldb_admin.create_ou(ouname)
302 delta.dn = Dn(self.ldb_admin, str(ouname))
303 delta["cn"] = MessageElement("test ou",
306 self.ldb_admin.modify(delta)
307 res = self.ldb_admin.search(self.base_dn,
308 expression="name=testou",
310 controls=["dirsync:1:0:1"])
312 self.assertEqual(len(res.msgs), 1)
313 self.assertEqual(len(res.msgs[0]), 3)
314 delete_force(self.ldb_admin, ouname)
316 def test_dirsync_with_controls(self):
317 """Check that dirsync return correct informations when dealing with the NC"""
318 res = self.ldb_admin.search(self.base_dn,
319 expression="(distinguishedName=%s)" % str(self.base_dn),
321 controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
323 def test_dirsync_basenc(self):
324 """Check that dirsync return correct informations when dealing with the NC"""
325 res = self.ldb_admin.search(self.base_dn,
326 expression="(distinguishedName=%s)" % str(self.base_dn),
328 controls=["dirsync:1:0:10000"])
329 self.assertEqual(len(res.msgs), 1)
330 self.assertEqual(len(res.msgs[0]), 3)
332 res = self.ldb_admin.search(self.base_dn,
333 expression="(distinguishedName=%s)" % str(self.base_dn),
334 attrs=["ntSecurityDescriptor"],
335 controls=["dirsync:1:0:10000"])
336 self.assertEqual(len(res.msgs), 1)
337 self.assertEqual(len(res.msgs[0]), 3)
339 def test_dirsync_othernc(self):
340 """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
341 res = self.ldb_admin.search(self.base_dn,
342 expression="(objectclass=configuration)",
344 controls=["dirsync:1:0:10000"])
345 self.assertEqual(len(res.msgs), 1)
346 self.assertEqual(len(res.msgs[0]), 4)
348 res = self.ldb_admin.search(self.base_dn,
349 expression="(objectclass=configuration)",
350 attrs=["ntSecurityDescriptor"],
351 controls=["dirsync:1:0:10000"])
352 self.assertEqual(len(res.msgs), 1)
353 self.assertEqual(len(res.msgs[0]), 3)
355 res = self.ldb_admin.search(self.base_dn,
356 expression="(objectclass=domaindns)",
357 attrs=["ntSecurityDescriptor"],
358 controls=["dirsync:1:0:10000"])
361 # only sub nc returns a result when asked for objectGUID
362 res = self.ldb_admin.search(self.base_dn,
363 expression="(objectclass=domaindns)",
364 attrs=["objectGUID"],
365 controls=["dirsync:1:0:0"])
366 self.assertEqual(len(res.msgs), nb - 1)
368 self.assertTrue(res.msgs[0].get("objectGUID") != None)
370 res = self.ldb_admin.search(self.base_dn,
371 expression="(objectclass=configuration)",
372 attrs=["objectGUID"],
373 controls=["dirsync:1:0:0"])
376 def test_dirsync_send_delta(self):
377 """Check that dirsync return correct delta when sending the last cookie"""
378 res = self.ldb_admin.search(self.base_dn,
379 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
380 controls=["dirsync:1:0:10000"])
381 ctl = str(res.controls[0]).split(":")
385 control = str(":".join(ctl))
386 res = self.ldb_admin.search(self.base_dn,
387 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
389 self.assertEqual(len(res), 0)
391 res = self.ldb_admin.search(self.base_dn,
392 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
393 controls=["dirsync:1:0:100000"])
395 ctl = str(res.controls[0]).split(":")
399 control2 = str(":".join(ctl))
402 ouname="OU=testou2,%s" % self.base_dn
404 self.ldb_admin.create_ou(ouname)
405 res = self.ldb_admin.search(self.base_dn,
406 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
408 self.assertEqual(len(res), 1)
409 ctl = str(res.controls[0]).split(":")
413 control3 = str(":".join(ctl))
416 delta.dn = Dn(self.ldb_admin, str(ouname))
418 delta["cn"] = MessageElement("test ou",
421 self.ldb_admin.modify(delta)
422 res = self.ldb_admin.search(self.base_dn,
423 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
426 self.assertEqual(len(res.msgs), 1)
427 # 3 attributes: instanceType, cn and objectGUID
428 self.assertEqual(len(res.msgs[0]), 3)
431 delta.dn = Dn(self.ldb_admin, str(ouname))
432 delta["cn"] = MessageElement([],
435 self.ldb_admin.modify(delta)
436 res = self.ldb_admin.search(self.base_dn,
437 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
440 self.assertEqual(len(res.msgs), 1)
441 # So we won't have much attribute returned but instanceType and GUID
443 # 3 attributes: instanceType and objectGUID and cn but empty
444 self.assertEqual(len(res.msgs[0]), 3)
445 ouname = "OU=newouname,%s" % self.base_dn
446 self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
448 ctl = str(res.controls[0]).split(":")
452 control4 = str(":".join(ctl))
453 res = self.ldb_admin.search(self.base_dn,
454 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
457 self.assertTrue(res[0].get("parentGUID") != None)
458 self.assertTrue(res[0].get("name") != None)
459 delete_force(self.ldb_admin, ouname)
461 def test_dirsync_linkedattributes(self):
462 """Check that dirsync returnd deleted objects too"""
463 # Let's search for members
464 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
465 res = self.ldb_simple.search(self.base_dn,
466 expression="(name=Administrators)",
467 controls=["dirsync:1:1:1"])
469 self.assertTrue(len(res[0].get("member")) > 0)
470 size = len(res[0].get("member"))
472 ctl = str(res.controls[0]).split(":")
476 control1 = str(":".join(ctl))
477 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
478 add_members_operation=True)
480 res = self.ldb_simple.search(self.base_dn,
481 expression="(name=Administrators)",
484 self.assertEqual(len(res[0].get("member")), size + 1)
485 ctl = str(res.controls[0]).split(":")
489 control1 = str(":".join(ctl))
491 # remove the user from the group
492 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
493 add_members_operation=False)
495 res = self.ldb_simple.search(self.base_dn,
496 expression="(name=Administrators)",
499 self.assertEqual(len(res[0].get("member")), size )
501 self.ldb_admin.newgroup("testgroup")
502 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
503 add_members_operation=True)
505 res = self.ldb_admin.search(self.base_dn,
506 expression="(name=testgroup)",
507 controls=["dirsync:1:0:1"])
509 self.assertEqual(len(res[0].get("member")), 1)
510 self.assertTrue(res[0].get("member") != "" )
512 ctl = str(res.controls[0]).split(":")
516 control1 = str(":".join(ctl))
518 # Check that reasking the same question but with an updated cookie
519 # didn't return any results.
521 res = self.ldb_admin.search(self.base_dn,
522 expression="(name=testgroup)",
524 self.assertEqual(len(res), 0)
526 ctl = str(res.controls[0]).split(":")
530 control1 = str(":".join(ctl))
532 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
533 add_members_operation=False)
535 res = self.ldb_admin.search(self.base_dn,
536 expression="(name=testgroup)",
540 self.ldb_admin.deletegroup("testgroup")
541 self.assertEqual(len(res[0].get("member")), 0)
545 def test_dirsync_deleted_items(self):
546 """Check that dirsync returnd deleted objects too"""
548 ouname="OU=testou3,%s" % self.base_dn
550 self.ldb_admin.create_ou(ouname)
551 res = self.ldb_admin.search(self.base_dn,
552 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
553 controls=["dirsync:1:0:1"])
556 if str(e["name"]) == "testou3":
557 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
559 ctl = str(res.controls[0]).split(":")
563 control1 = str(":".join(ctl))
565 # So now delete the object and check that
566 # we can see the object but deleted when admin
567 delete_force(self.ldb_admin, ouname)
569 res = self.ldb_admin.search(self.base_dn,
570 expression="(objectClass=organizationalUnit)",
572 self.assertEqual(len(res), 1)
573 guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
574 self.assertEqual(guid2, guid)
575 self.assertTrue(res[0].get("isDeleted"))
576 self.assertTrue(res[0].get("name") != None)
578 def test_cookie_from_others(self):
579 res = self.ldb_admin.search(self.base_dn,
580 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
581 controls=["dirsync:1:0:1"])
582 ctl = str(res.controls[0]).split(":")
583 cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
584 cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
585 controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
586 res = self.ldb_admin.search(self.base_dn,
587 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
591 class ExtendedDirsyncTests(SimpleDirsyncTests):
593 def test_dirsync_linkedattributes(self):
594 flag_incr_linked = 2147483648
595 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
596 res = self.ldb_admin.search(self.base_dn,
598 expression="(name=Administrators)",
599 controls=["dirsync:1:%d:1" % flag_incr_linked])
601 self.assertTrue(res[0].get("member;range=1-1") != None )
602 self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
603 size = len(res[0].get("member;range=1-1"))
605 ctl = str(res.controls[0]).split(":")
607 ctl[2] = "%d" % flag_incr_linked
609 control1 = str(":".join(ctl))
610 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
611 add_members_operation=True)
612 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
613 add_members_operation=True)
616 res = self.ldb_admin.search(self.base_dn,
617 expression="(name=Administrators)",
620 self.assertEqual(len(res[0].get("member;range=1-1")), 2)
621 ctl = str(res.controls[0]).split(":")
623 ctl[2] = "%d" % flag_incr_linked
625 control1 = str(":".join(ctl))
627 # remove the user from the group
628 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
629 add_members_operation=False)
631 res = self.ldb_admin.search(self.base_dn,
632 expression="(name=Administrators)",
635 self.assertEqual(res[0].get("member;range=1-1"), None )
636 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
638 ctl = str(res.controls[0]).split(":")
640 ctl[2] = "%d" % flag_incr_linked
642 control2 = str(":".join(ctl))
644 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
645 add_members_operation=False)
647 res = self.ldb_admin.search(self.base_dn,
648 expression="(name=Administrators)",
651 self.assertEqual(res[0].get("member;range=1-1"), None )
652 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
654 res = self.ldb_admin.search(self.base_dn,
655 expression="(name=Administrators)",
658 self.assertEqual(res[0].get("member;range=1-1"), None )
659 self.assertEqual(len(res[0].get("member;range=0-0")), 2)
661 def test_dirsync_deleted_items(self):
662 """Check that dirsync returnd deleted objects too"""
664 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
665 ouname="OU=testou3,%s" % self.base_dn
667 self.ldb_admin.create_ou(ouname)
669 # Specify LDAP_DIRSYNC_OBJECT_SECURITY
670 res = self.ldb_simple.search(self.base_dn,
671 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
672 controls=["dirsync:1:1:1"])
676 if str(e["name"]) == "testou3":
677 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
679 self.assertTrue(guid != None)
680 ctl = str(res.controls[0]).split(":")
684 control1 = str(":".join(ctl))
686 # So now delete the object and check that
687 # we can see the object but deleted when admin
688 # we just see the objectGUID when simple user
689 delete_force(self.ldb_admin, ouname)
691 res = self.ldb_simple.search(self.base_dn,
692 expression="(objectClass=organizationalUnit)",
694 self.assertEqual(len(res), 1)
695 guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
696 self.assertEqual(guid2, guid)
697 self.assertEqual(str(res[0].dn), "")
700 if not getattr(opts, "listtests", False):
701 lp = sambaopts.get_loadparm()
702 samba.tests.cmdline_credentials = credopts.get_credentials(lp)
705 TestProgram(module=__name__, opts=subunitopts)