3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
24 sys.path.insert(0, "bin/python")
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
28 import samba.getopt as options
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs, security
35 from samba.ndr import ndr_unpack, ndr_pack
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
42 from samba.tests import delete_force
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
62 ldaphost = "ldap://%s" % host
63 ldapshost = "ldaps://%s" % host
66 start = host.rindex("://")
67 host = host.lstrip(start + 3)
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
77 class DirsyncBaseTests(samba.tests.TestCase):
80 super(DirsyncBaseTests, self).setUp()
81 self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
82 self.base_dn = self.ldb_admin.domain_dn()
83 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
84 self.user_pass = samba.generate_random_password(12, 16)
85 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
86 self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
87 # used for anonymous login
88 print("baseDN: %s" % self.base_dn)
90 def get_user_dn(self, name):
91 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
93 def get_ldb_connection(self, target_username, target_password):
94 creds_tmp = Credentials()
95 creds_tmp.set_username(target_username)
96 creds_tmp.set_password(target_password)
97 creds_tmp.set_domain(creds.get_domain())
98 creds_tmp.set_realm(creds.get_realm())
99 creds_tmp.set_workstation(creds.get_workstation())
100 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
101 | gensec.FEATURE_SEAL)
102 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
103 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
107 # tests on ldap add operations
108 class SimpleDirsyncTests(DirsyncBaseTests):
111 super(SimpleDirsyncTests, self).setUp()
113 self.dirsync_user = "test_dirsync_user"
114 self.simple_user = "test_simple_user"
115 self.admin_user = "test_admin_user"
118 self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
119 self.ldb_admin.newuser(self.simple_user, self.user_pass)
120 self.ldb_admin.newuser(self.admin_user, self.user_pass)
121 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
123 user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
124 mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
126 self.sd_utils.dacl_add_ace(self.base_dn, mod)
128 # add admins to the Domain Admins group
129 self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
130 add_members_operation=True)
133 super(SimpleDirsyncTests, self).tearDown()
134 delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
135 delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
136 delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
138 delete_force(self.ldb_admin, self.ouname)
139 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
141 self.ldb_admin.deletegroup("testgroup")
145 # def test_dirsync_errors(self):
147 def test_dirsync_supported(self):
148 """Test the basic of the dirsync is supported"""
149 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
150 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
151 res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
152 res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
154 self.ldb_simple.search(self.base_dn,
155 expression="samaccountname=*",
156 controls=["dirsync:1:0:1"])
157 except LdbError as l:
158 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
160 def test_parentGUID_referrals(self):
161 res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
163 res = self.ldb_admin.search(self.base_dn,
164 expression="name=Configuration",
165 controls=["dirsync:1:0:1"])
166 self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
168 def test_ok_not_rootdc(self):
169 """Test if it's ok to do dirsync on another NC that is not the root DC"""
170 self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
171 expression="samaccountname=*",
172 controls=["dirsync:1:0:1"])
174 def test_dirsync_errors(self):
175 """Test if dirsync returns the correct LDAP errors in case of pb"""
176 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
177 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
179 self.ldb_simple.search(self.base_dn,
180 expression="samaccountname=*",
181 controls=["dirsync:1:0:1"])
182 except LdbError as l:
184 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
187 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
188 expression="samaccountname=*",
189 controls=["dirsync:1:0:1"])
190 except LdbError as l:
192 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
195 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
196 expression="samaccountname=*",
197 controls=["dirsync:1:1:1"])
198 except LdbError as l:
200 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
203 self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
204 expression="samaccountname=*",
205 controls=["dirsync:1:0:1"])
206 except LdbError as l:
208 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
211 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
212 expression="samaccountname=*",
213 controls=["dirsync:1:0:1"])
214 except LdbError as l:
216 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
219 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
220 expression="samaccountname=*",
221 controls=["dirsync:1:1:1"])
222 except LdbError as l:
224 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
226 def test_dirsync_attributes(self):
227 """Check behavior with some attributes """
228 res = self.ldb_admin.search(self.base_dn,
229 expression="samaccountname=*",
230 controls=["dirsync:1:0:1"])
231 # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
232 self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
233 # Check that non replicated attributes are not returned
234 self.assertTrue(res.msgs[0].get("badPwdCount") == None)
235 # Check that non forward link are not returned
236 self.assertTrue(res.msgs[0].get("memberof") == None)
238 # Asking for instanceType will return also objectGUID
239 res = self.ldb_admin.search(self.base_dn,
240 expression="samaccountname=Administrator",
241 attrs=["instanceType"],
242 controls=["dirsync:1:0:1"])
243 self.assertTrue(res.msgs[0].get("objectGUID") != None)
244 self.assertTrue(res.msgs[0].get("instanceType") != None)
246 # We don't return an entry if asked for objectGUID
247 res = self.ldb_admin.search(self.base_dn,
248 expression="(distinguishedName=%s)" % str(self.base_dn),
249 attrs=["objectGUID"],
250 controls=["dirsync:1:0:1"])
251 self.assertEquals(len(res.msgs), 0)
253 # a request on the root of a NC didn't return parentGUID
254 res = self.ldb_admin.search(self.base_dn,
255 expression="(distinguishedName=%s)" % str(self.base_dn),
257 controls=["dirsync:1:0:1"])
258 self.assertTrue(res.msgs[0].get("objectGUID") != None)
259 self.assertTrue(res.msgs[0].get("name") != None)
260 self.assertTrue(res.msgs[0].get("parentGUID") == None)
261 self.assertTrue(res.msgs[0].get("instanceType") != None)
263 # Asking for name will return also objectGUID and parentGUID
264 # and instanceType and of course name
265 res = self.ldb_admin.search(self.base_dn,
266 expression="samaccountname=Administrator",
268 controls=["dirsync:1:0:1"])
269 self.assertTrue(res.msgs[0].get("objectGUID") != None)
270 self.assertTrue(res.msgs[0].get("name") != None)
271 self.assertTrue(res.msgs[0].get("parentGUID") != None)
272 self.assertTrue(res.msgs[0].get("instanceType") != None)
274 # Asking for dn will not return not only DN but more like if attrs=*
275 # parentGUID should be returned
276 res = self.ldb_admin.search(self.base_dn,
277 expression="samaccountname=Administrator",
279 controls=["dirsync:1:0:1"])
280 count = len(res.msgs[0])
281 res2 = self.ldb_admin.search(self.base_dn,
282 expression="samaccountname=Administrator",
283 controls=["dirsync:1:0:1"])
284 count2 = len(res2.msgs[0])
285 self.assertEqual(count, count2)
287 # Asking for cn will return nothing on objects that have CN as RDN
288 res = self.ldb_admin.search(self.base_dn,
289 expression="samaccountname=Administrator",
291 controls=["dirsync:1:0:1"])
292 self.assertEqual(len(res.msgs), 0)
293 # Asking for parentGUID will return nothing too
294 res = self.ldb_admin.search(self.base_dn,
295 expression="samaccountname=Administrator",
296 attrs=["parentGUID"],
297 controls=["dirsync:1:0:1"])
298 self.assertEqual(len(res.msgs), 0)
299 ouname = "OU=testou,%s" % self.base_dn
301 self.ldb_admin.create_ou(ouname)
303 delta.dn = Dn(self.ldb_admin, str(ouname))
304 delta["cn"] = MessageElement("test ou",
307 self.ldb_admin.modify(delta)
308 res = self.ldb_admin.search(self.base_dn,
309 expression="name=testou",
311 controls=["dirsync:1:0:1"])
313 self.assertEqual(len(res.msgs), 1)
314 self.assertEqual(len(res.msgs[0]), 3)
315 delete_force(self.ldb_admin, ouname)
317 def test_dirsync_with_controls(self):
318 """Check that dirsync return correct informations when dealing with the NC"""
319 res = self.ldb_admin.search(self.base_dn,
320 expression="(distinguishedName=%s)" % str(self.base_dn),
322 controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
324 def test_dirsync_basenc(self):
325 """Check that dirsync return correct informations when dealing with the NC"""
326 res = self.ldb_admin.search(self.base_dn,
327 expression="(distinguishedName=%s)" % str(self.base_dn),
329 controls=["dirsync:1:0:10000"])
330 self.assertEqual(len(res.msgs), 1)
331 self.assertEqual(len(res.msgs[0]), 3)
333 res = self.ldb_admin.search(self.base_dn,
334 expression="(distinguishedName=%s)" % str(self.base_dn),
335 attrs=["ntSecurityDescriptor"],
336 controls=["dirsync:1:0:10000"])
337 self.assertEqual(len(res.msgs), 1)
338 self.assertEqual(len(res.msgs[0]), 3)
340 def test_dirsync_othernc(self):
341 """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
342 res = self.ldb_admin.search(self.base_dn,
343 expression="(objectclass=configuration)",
345 controls=["dirsync:1:0:10000"])
346 self.assertEqual(len(res.msgs), 1)
347 self.assertEqual(len(res.msgs[0]), 4)
349 res = self.ldb_admin.search(self.base_dn,
350 expression="(objectclass=configuration)",
351 attrs=["ntSecurityDescriptor"],
352 controls=["dirsync:1:0:10000"])
353 self.assertEqual(len(res.msgs), 1)
354 self.assertEqual(len(res.msgs[0]), 3)
356 res = self.ldb_admin.search(self.base_dn,
357 expression="(objectclass=domaindns)",
358 attrs=["ntSecurityDescriptor"],
359 controls=["dirsync:1:0:10000"])
362 # only sub nc returns a result when asked for objectGUID
363 res = self.ldb_admin.search(self.base_dn,
364 expression="(objectclass=domaindns)",
365 attrs=["objectGUID"],
366 controls=["dirsync:1:0:0"])
367 self.assertEqual(len(res.msgs), nb - 1)
369 self.assertTrue(res.msgs[0].get("objectGUID") != None)
371 res = self.ldb_admin.search(self.base_dn,
372 expression="(objectclass=configuration)",
373 attrs=["objectGUID"],
374 controls=["dirsync:1:0:0"])
377 def test_dirsync_send_delta(self):
378 """Check that dirsync return correct delta when sending the last cookie"""
379 res = self.ldb_admin.search(self.base_dn,
380 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
381 controls=["dirsync:1:0:10000"])
382 ctl = str(res.controls[0]).split(":")
386 control = str(":".join(ctl))
387 res = self.ldb_admin.search(self.base_dn,
388 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
390 self.assertEqual(len(res), 0)
392 res = self.ldb_admin.search(self.base_dn,
393 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
394 controls=["dirsync:1:0:100000"])
396 ctl = str(res.controls[0]).split(":")
400 control2 = str(":".join(ctl))
403 ouname = "OU=testou2,%s" % self.base_dn
405 self.ldb_admin.create_ou(ouname)
406 res = self.ldb_admin.search(self.base_dn,
407 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
409 self.assertEqual(len(res), 1)
410 ctl = str(res.controls[0]).split(":")
414 control3 = str(":".join(ctl))
417 delta.dn = Dn(self.ldb_admin, str(ouname))
419 delta["cn"] = MessageElement("test ou",
422 self.ldb_admin.modify(delta)
423 res = self.ldb_admin.search(self.base_dn,
424 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
427 self.assertEqual(len(res.msgs), 1)
428 # 3 attributes: instanceType, cn and objectGUID
429 self.assertEqual(len(res.msgs[0]), 3)
432 delta.dn = Dn(self.ldb_admin, str(ouname))
433 delta["cn"] = MessageElement([],
436 self.ldb_admin.modify(delta)
437 res = self.ldb_admin.search(self.base_dn,
438 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
441 self.assertEqual(len(res.msgs), 1)
442 # So we won't have much attribute returned but instanceType and GUID
444 # 3 attributes: instanceType and objectGUID and cn but empty
445 self.assertEqual(len(res.msgs[0]), 3)
446 ouname = "OU=newouname,%s" % self.base_dn
447 self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
449 ctl = str(res.controls[0]).split(":")
453 control4 = str(":".join(ctl))
454 res = self.ldb_admin.search(self.base_dn,
455 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
458 self.assertTrue(res[0].get("parentGUID") != None)
459 self.assertTrue(res[0].get("name") != None)
460 delete_force(self.ldb_admin, ouname)
462 def test_dirsync_linkedattributes(self):
463 """Check that dirsync returnd deleted objects too"""
464 # Let's search for members
465 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
466 res = self.ldb_simple.search(self.base_dn,
467 expression="(name=Administrators)",
468 controls=["dirsync:1:1:1"])
470 self.assertTrue(len(res[0].get("member")) > 0)
471 size = len(res[0].get("member"))
473 ctl = str(res.controls[0]).split(":")
477 control1 = str(":".join(ctl))
478 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
479 add_members_operation=True)
481 res = self.ldb_simple.search(self.base_dn,
482 expression="(name=Administrators)",
485 self.assertEqual(len(res[0].get("member")), size + 1)
486 ctl = str(res.controls[0]).split(":")
490 control1 = str(":".join(ctl))
492 # remove the user from the group
493 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
494 add_members_operation=False)
496 res = self.ldb_simple.search(self.base_dn,
497 expression="(name=Administrators)",
500 self.assertEqual(len(res[0].get("member")), size)
502 self.ldb_admin.newgroup("testgroup")
503 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
504 add_members_operation=True)
506 res = self.ldb_admin.search(self.base_dn,
507 expression="(name=testgroup)",
508 controls=["dirsync:1:0:1"])
510 self.assertEqual(len(res[0].get("member")), 1)
511 self.assertTrue(res[0].get("member") != "")
513 ctl = str(res.controls[0]).split(":")
517 control1 = str(":".join(ctl))
519 # Check that reasking the same question but with an updated cookie
520 # didn't return any results.
522 res = self.ldb_admin.search(self.base_dn,
523 expression="(name=testgroup)",
525 self.assertEqual(len(res), 0)
527 ctl = str(res.controls[0]).split(":")
531 control1 = str(":".join(ctl))
533 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
534 add_members_operation=False)
536 res = self.ldb_admin.search(self.base_dn,
537 expression="(name=testgroup)",
541 self.ldb_admin.deletegroup("testgroup")
542 self.assertEqual(len(res[0].get("member")), 0)
546 def test_dirsync_deleted_items(self):
547 """Check that dirsync returnd deleted objects too"""
549 ouname = "OU=testou3,%s" % self.base_dn
551 self.ldb_admin.create_ou(ouname)
552 res = self.ldb_admin.search(self.base_dn,
553 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
554 controls=["dirsync:1:0:1"])
557 if str(e["name"]) == "testou3":
558 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
560 ctl = str(res.controls[0]).split(":")
564 control1 = str(":".join(ctl))
566 # So now delete the object and check that
567 # we can see the object but deleted when admin
568 delete_force(self.ldb_admin, ouname)
570 res = self.ldb_admin.search(self.base_dn,
571 expression="(objectClass=organizationalUnit)",
573 self.assertEqual(len(res), 1)
574 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
575 self.assertEqual(guid2, guid)
576 self.assertTrue(res[0].get("isDeleted"))
577 self.assertTrue(res[0].get("name") != None)
579 def test_cookie_from_others(self):
580 res = self.ldb_admin.search(self.base_dn,
581 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
582 controls=["dirsync:1:0:1"])
583 ctl = str(res.controls[0]).split(":")
584 cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
585 cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
586 controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
587 res = self.ldb_admin.search(self.base_dn,
588 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
592 class ExtendedDirsyncTests(SimpleDirsyncTests):
594 def test_dirsync_linkedattributes(self):
595 flag_incr_linked = 2147483648
596 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
597 res = self.ldb_admin.search(self.base_dn,
599 expression="(name=Administrators)",
600 controls=["dirsync:1:%d:1" % flag_incr_linked])
602 self.assertTrue(res[0].get("member;range=1-1") != None)
603 self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
604 size = len(res[0].get("member;range=1-1"))
606 ctl = str(res.controls[0]).split(":")
608 ctl[2] = "%d" % flag_incr_linked
610 control1 = str(":".join(ctl))
611 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
612 add_members_operation=True)
613 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
614 add_members_operation=True)
617 res = self.ldb_admin.search(self.base_dn,
618 expression="(name=Administrators)",
621 self.assertEqual(len(res[0].get("member;range=1-1")), 2)
622 ctl = str(res.controls[0]).split(":")
624 ctl[2] = "%d" % flag_incr_linked
626 control1 = str(":".join(ctl))
628 # remove the user from the group
629 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
630 add_members_operation=False)
632 res = self.ldb_admin.search(self.base_dn,
633 expression="(name=Administrators)",
636 self.assertEqual(res[0].get("member;range=1-1"), None)
637 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
639 ctl = str(res.controls[0]).split(":")
641 ctl[2] = "%d" % flag_incr_linked
643 control2 = str(":".join(ctl))
645 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
646 add_members_operation=False)
648 res = self.ldb_admin.search(self.base_dn,
649 expression="(name=Administrators)",
652 self.assertEqual(res[0].get("member;range=1-1"), None)
653 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
655 res = self.ldb_admin.search(self.base_dn,
656 expression="(name=Administrators)",
659 self.assertEqual(res[0].get("member;range=1-1"), None)
660 self.assertEqual(len(res[0].get("member;range=0-0")), 2)
662 def test_dirsync_deleted_items(self):
663 """Check that dirsync returnd deleted objects too"""
665 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
666 ouname = "OU=testou3,%s" % self.base_dn
668 self.ldb_admin.create_ou(ouname)
670 # Specify LDAP_DIRSYNC_OBJECT_SECURITY
671 res = self.ldb_simple.search(self.base_dn,
672 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
673 controls=["dirsync:1:1:1"])
677 if str(e["name"]) == "testou3":
678 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
680 self.assertTrue(guid != None)
681 ctl = str(res.controls[0]).split(":")
685 control1 = str(":".join(ctl))
687 # So now delete the object and check that
688 # we can see the object but deleted when admin
689 # we just see the objectGUID when simple user
690 delete_force(self.ldb_admin, ouname)
692 res = self.ldb_simple.search(self.base_dn,
693 expression="(objectClass=organizationalUnit)",
695 self.assertEqual(len(res), 1)
696 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
697 self.assertEqual(guid2, guid)
698 self.assertEqual(str(res[0].dn), "")
701 if not getattr(opts, "listtests", False):
702 lp = sambaopts.get_loadparm()
703 samba.tests.cmdline_credentials = credopts.get_credentials(lp)
706 TestProgram(module=__name__, opts=subunitopts)