3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
28 import samba.getopt as options
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
42 from samba.tests import delete_force
43 from subunit.run import SubunitTestRunner
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 opts, args = parser.parse_args()
62 ldaphost = "ldap://%s" % host
63 ldapshost = "ldaps://%s" % host
66 start = host.rindex("://")
67 host = host.lstrip(start+3)
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
76 class DirsyncBaseTests(samba.tests.TestCase):
79 super(DirsyncBaseTests, self).setUp()
81 self.base_dn = ldb.domain_dn()
82 self.domain_sid = security.dom_sid(ldb.get_domain_sid())
83 self.user_pass = "samba123@AAA"
84 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85 self.sd_utils = sd_utils.SDUtils(ldb)
86 #used for anonymous login
87 print "baseDN: %s" % self.base_dn
89 def get_user_dn(self, name):
90 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
92 def get_ldb_connection(self, target_username, target_password):
93 creds_tmp = Credentials()
94 creds_tmp.set_username(target_username)
95 creds_tmp.set_password(target_password)
96 creds_tmp.set_domain(creds.get_domain())
97 creds_tmp.set_realm(creds.get_realm())
98 creds_tmp.set_workstation(creds.get_workstation())
99 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100 | gensec.FEATURE_SEAL)
101 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
102 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
106 #tests on ldap add operations
107 class SimpleDirsyncTests(DirsyncBaseTests):
110 super(SimpleDirsyncTests, self).setUp()
112 self.dirsync_user = "test_dirsync_user"
113 self.simple_user = "test_simple_user"
114 self.admin_user = "test_admin_user"
117 self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
118 self.ldb_admin.newuser(self.simple_user, self.user_pass)
119 self.ldb_admin.newuser(self.admin_user, self.user_pass)
120 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
122 user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
123 mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
124 self.sd_utils.dacl_add_ace(self.base_dn, mod)
126 # add admins to the Domain Admins group
127 self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
128 add_members_operation=True)
131 super(SimpleDirsyncTests, self).tearDown()
132 delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
133 delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
134 delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
136 delete_force(self.ldb_admin, self.ouname)
137 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
139 self.ldb_admin.deletegroup("testgroup")
143 #def test_dirsync_errors(self):
146 def test_dirsync_supported(self):
147 """Test the basic of the dirsync is supported"""
148 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
149 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
150 res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151 res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
153 self.ldb_simple.search(self.base_dn,
154 expression="samaccountname=*",
155 controls=["dirsync:1:0:1"])
157 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
159 def test_parentGUID_referrals(self):
160 res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
162 res = self.ldb_admin.search(self.base_dn,
163 expression="name=Configuration",
164 controls=["dirsync:1:0:1"])
165 self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
167 def test_ok_not_rootdc(self):
168 """Test if it's ok to do dirsync on another NC that is not the root DC"""
169 self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
170 expression="samaccountname=*",
171 controls=["dirsync:1:0:1"])
173 def test_dirsync_errors(self):
174 """Test if dirsync returns the correct LDAP errors in case of pb"""
175 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
176 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
178 self.ldb_simple.search(self.base_dn,
179 expression="samaccountname=*",
180 controls=["dirsync:1:0:1"])
183 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
186 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
187 expression="samaccountname=*",
188 controls=["dirsync:1:0:1"])
191 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
194 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
195 expression="samaccountname=*",
196 controls=["dirsync:1:1:1"])
199 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
202 self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
203 expression="samaccountname=*",
204 controls=["dirsync:1:0:1"])
207 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
210 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
211 expression="samaccountname=*",
212 controls=["dirsync:1:0:1"])
215 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
218 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
219 expression="samaccountname=*",
220 controls=["dirsync:1:1:1"])
223 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
228 def test_dirsync_attributes(self):
229 """Check behavior with some attributes """
230 res = self.ldb_admin.search(self.base_dn,
231 expression="samaccountname=*",
232 controls=["dirsync:1:0:1"])
233 # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
234 self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
235 # Check that non replicated attributes are not returned
236 self.assertTrue(res.msgs[0].get("badPwdCount") == None)
237 # Check that non forward link are not returned
238 self.assertTrue(res.msgs[0].get("memberof") == None)
240 # Asking for instanceType will return also objectGUID
241 res = self.ldb_admin.search(self.base_dn,
242 expression="samaccountname=Administrator",
243 attrs=["instanceType"],
244 controls=["dirsync:1:0:1"])
245 self.assertTrue(res.msgs[0].get("objectGUID") != None)
246 self.assertTrue(res.msgs[0].get("instanceType") != None)
248 # We don't return an entry if asked for objectGUID
249 res = self.ldb_admin.search(self.base_dn,
250 expression="dn=%s" % self.base_dn,
251 attrs=["objectGUID"],
252 controls=["dirsync:1:0:1"])
253 self.assertEquals(len(res.msgs), 0)
255 # a request on the root of a NC didn't return parentGUID
256 res = self.ldb_admin.search(self.base_dn,
257 expression="dn=%s" % self.base_dn,
259 controls=["dirsync:1:0:1"])
260 self.assertTrue(res.msgs[0].get("objectGUID") != None)
261 self.assertTrue(res.msgs[0].get("name") != None)
262 self.assertTrue(res.msgs[0].get("parentGUID") == None)
263 self.assertTrue(res.msgs[0].get("instanceType") != None)
265 # Asking for name will return also objectGUID and parentGUID
266 # and instanceType and of course name
267 res = self.ldb_admin.search(self.base_dn,
268 expression="samaccountname=Administrator",
270 controls=["dirsync:1:0:1"])
271 self.assertTrue(res.msgs[0].get("objectGUID") != None)
272 self.assertTrue(res.msgs[0].get("name") != None)
273 self.assertTrue(res.msgs[0].get("parentGUID") != None)
274 self.assertTrue(res.msgs[0].get("instanceType") != None)
276 # Asking for dn will not return not only DN but more like if attrs=*
277 # parentGUID should be returned
278 res = self.ldb_admin.search(self.base_dn,
279 expression="samaccountname=Administrator",
281 controls=["dirsync:1:0:1"])
282 count = len(res.msgs[0])
283 res2 = self.ldb_admin.search(self.base_dn,
284 expression="samaccountname=Administrator",
285 controls=["dirsync:1:0:1"])
286 count2 = len(res2.msgs[0])
287 self.assertEqual(count, count2)
289 # Asking for cn will return nothing on objects that have CN as RDN
290 res = self.ldb_admin.search(self.base_dn,
291 expression="samaccountname=Administrator",
293 controls=["dirsync:1:0:1"])
294 self.assertEqual(len(res.msgs), 0)
295 # Asking for parentGUID will return nothing too
296 res = self.ldb_admin.search(self.base_dn,
297 expression="samaccountname=Administrator",
298 attrs=["parentGUID"],
299 controls=["dirsync:1:0:1"])
300 self.assertEqual(len(res.msgs), 0)
301 ouname="OU=testou,%s" % self.base_dn
303 self.ldb_admin.create_ou(ouname)
305 delta.dn = Dn(self.ldb_admin, str(ouname))
306 delta["cn"] = MessageElement("test ou",
309 self.ldb_admin.modify(delta)
310 res = self.ldb_admin.search(self.base_dn,
311 expression="name=testou",
313 controls=["dirsync:1:0:1"])
315 self.assertEqual(len(res.msgs), 1)
316 self.assertEqual(len(res.msgs[0]), 3)
317 delete_force(self.ldb_admin, ouname)
319 def test_dirsync_with_controls(self):
320 """Check that dirsync return correct informations when dealing with the NC"""
321 res = self.ldb_admin.search(self.base_dn,
322 expression="(dn=%s)" % str(self.base_dn),
324 controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
326 def test_dirsync_basenc(self):
327 """Check that dirsync return correct informations when dealing with the NC"""
328 res = self.ldb_admin.search(self.base_dn,
329 expression="(dn=%s)" % str(self.base_dn),
331 controls=["dirsync:1:0:10000"])
332 self.assertEqual(len(res.msgs), 1)
333 self.assertEqual(len(res.msgs[0]), 3)
335 res = self.ldb_admin.search(self.base_dn,
336 expression="(dn=%s)" % str(self.base_dn),
337 attrs=["ntSecurityDescriptor"],
338 controls=["dirsync:1:0:10000"])
339 self.assertEqual(len(res.msgs), 1)
340 self.assertEqual(len(res.msgs[0]), 3)
342 def test_dirsync_othernc(self):
343 """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
344 res = self.ldb_admin.search(self.base_dn,
345 expression="(objectclass=configuration)",
347 controls=["dirsync:1:0:10000"])
348 self.assertEqual(len(res.msgs), 1)
349 self.assertEqual(len(res.msgs[0]), 4)
351 res = self.ldb_admin.search(self.base_dn,
352 expression="(objectclass=configuration)",
353 attrs=["ntSecurityDescriptor"],
354 controls=["dirsync:1:0:10000"])
355 self.assertEqual(len(res.msgs), 1)
356 self.assertEqual(len(res.msgs[0]), 3)
358 res = self.ldb_admin.search(self.base_dn,
359 expression="(objectclass=domaindns)",
360 attrs=["ntSecurityDescriptor"],
361 controls=["dirsync:1:0:10000"])
364 # only sub nc returns a result when asked for objectGUID
365 res = self.ldb_admin.search(self.base_dn,
366 expression="(objectclass=domaindns)",
367 attrs=["objectGUID"],
368 controls=["dirsync:1:0:0"])
369 self.assertEqual(len(res.msgs), nb - 1)
371 self.assertTrue(res.msgs[0].get("objectGUID") != None)
373 res = self.ldb_admin.search(self.base_dn,
374 expression="(objectclass=configuration)",
375 attrs=["objectGUID"],
376 controls=["dirsync:1:0:0"])
379 def test_dirsync_send_delta(self):
380 """Check that dirsync return correct delta when sending the last cookie"""
381 res = self.ldb_admin.search(self.base_dn,
382 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
383 controls=["dirsync:1:0:10000"])
384 ctl = str(res.controls[0]).split(":")
388 control = str(":".join(ctl))
389 res = self.ldb_admin.search(self.base_dn,
390 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
392 self.assertEqual(len(res), 0)
394 res = self.ldb_admin.search(self.base_dn,
395 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
396 controls=["dirsync:1:0:100000"])
398 ctl = str(res.controls[0]).split(":")
402 control2 = str(":".join(ctl))
405 ouname="OU=testou2,%s" % self.base_dn
407 self.ldb_admin.create_ou(ouname)
408 res = self.ldb_admin.search(self.base_dn,
409 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
411 self.assertEqual(len(res), 1)
412 ctl = str(res.controls[0]).split(":")
416 control3 = str(":".join(ctl))
419 delta.dn = Dn(self.ldb_admin, str(ouname))
421 delta["cn"] = MessageElement("test ou",
424 self.ldb_admin.modify(delta)
425 res = self.ldb_admin.search(self.base_dn,
426 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
429 self.assertEqual(len(res.msgs), 1)
430 # 3 attributes: instanceType, cn and objectGUID
431 self.assertEqual(len(res.msgs[0]), 3)
434 delta.dn = Dn(self.ldb_admin, str(ouname))
435 delta["cn"] = MessageElement([],
438 self.ldb_admin.modify(delta)
439 res = self.ldb_admin.search(self.base_dn,
440 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
443 self.assertEqual(len(res.msgs), 1)
444 # So we won't have much attribute returned but instanceType and GUID
446 # 3 attributes: instanceType and objectGUID and cn but empty
447 self.assertEqual(len(res.msgs[0]), 3)
448 ouname = "OU=newouname,%s" % self.base_dn
449 self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
451 ctl = str(res.controls[0]).split(":")
455 control4 = str(":".join(ctl))
456 res = self.ldb_admin.search(self.base_dn,
457 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
460 self.assertTrue(res[0].get("parentGUID") != None)
461 self.assertTrue(res[0].get("name") != None)
462 delete_force(self.ldb_admin, ouname)
464 def test_dirsync_linkedattributes(self):
465 """Check that dirsync returnd deleted objects too"""
466 # Let's search for members
467 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
468 res = self.ldb_simple.search(self.base_dn,
469 expression="(name=Administrators)",
470 controls=["dirsync:1:1:1"])
472 self.assertTrue(len(res[0].get("member")) > 0)
473 size = len(res[0].get("member"))
475 ctl = str(res.controls[0]).split(":")
479 control1 = str(":".join(ctl))
480 self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
481 add_members_operation=True)
483 res = self.ldb_simple.search(self.base_dn,
484 expression="(name=Administrators)",
487 self.assertEqual(len(res[0].get("member")), size + 1)
488 ctl = str(res.controls[0]).split(":")
492 control1 = str(":".join(ctl))
494 # remove the user from the group
495 self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
496 add_members_operation=False)
498 res = self.ldb_simple.search(self.base_dn,
499 expression="(name=Administrators)",
502 self.assertEqual(len(res[0].get("member")), size )
504 self.ldb_admin.newgroup("testgroup")
505 self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
506 add_members_operation=True)
508 res = self.ldb_admin.search(self.base_dn,
509 expression="(name=testgroup)",
510 controls=["dirsync:1:0:1"])
512 self.assertEqual(len(res[0].get("member")), 1)
513 self.assertTrue(res[0].get("member") != "" )
515 ctl = str(res.controls[0]).split(":")
519 control1 = str(":".join(ctl))
521 # Check that reasking the same question but with an updated cookie
522 # didn't return any results.
524 res = self.ldb_admin.search(self.base_dn,
525 expression="(name=testgroup)",
527 self.assertEqual(len(res), 0)
529 ctl = str(res.controls[0]).split(":")
533 control1 = str(":".join(ctl))
535 self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
536 add_members_operation=False)
538 res = self.ldb_admin.search(self.base_dn,
539 expression="(name=testgroup)",
543 self.ldb_admin.deletegroup("testgroup")
544 self.assertEqual(len(res[0].get("member")), 0)
548 def test_dirsync_deleted_items(self):
549 """Check that dirsync returnd deleted objects too"""
551 ouname="OU=testou3,%s" % self.base_dn
553 self.ldb_admin.create_ou(ouname)
554 res = self.ldb_admin.search(self.base_dn,
555 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
556 controls=["dirsync:1:0:1"])
559 if str(e["name"]) == "testou3":
560 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
562 ctl = str(res.controls[0]).split(":")
566 control1 = str(":".join(ctl))
568 # So now delete the object and check that
569 # we can see the object but deleted when admin
570 delete_force(self.ldb_admin, ouname)
572 res = self.ldb_admin.search(self.base_dn,
573 expression="(objectClass=organizationalUnit)",
575 self.assertEqual(len(res), 1)
576 guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
577 self.assertEqual(guid2, guid)
578 self.assertTrue(res[0].get("isDeleted"))
579 self.assertTrue(res[0].get("name") != None)
581 def test_cookie_from_others(self):
582 res = self.ldb_admin.search(self.base_dn,
583 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
584 controls=["dirsync:1:0:1"])
585 ctl = str(res.controls[0]).split(":")
586 cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
587 cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
588 controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
589 res = self.ldb_admin.search(self.base_dn,
590 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
593 class ExtendedDirsyncTests(SimpleDirsyncTests):
594 def test_dirsync_linkedattributes(self):
595 flag_incr_linked = 2147483648
596 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
597 res = self.ldb_admin.search(self.base_dn,
599 expression="(name=Administrators)",
600 controls=["dirsync:1:%d:1" % flag_incr_linked])
602 self.assertTrue(res[0].get("member;range=1-1") != None )
603 self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
604 size = len(res[0].get("member;range=1-1"))
606 ctl = str(res.controls[0]).split(":")
608 ctl[2] = "%d" % flag_incr_linked
610 control1 = str(":".join(ctl))
611 self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
612 add_members_operation=True)
613 self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
614 add_members_operation=True)
617 res = self.ldb_admin.search(self.base_dn,
618 expression="(name=Administrators)",
621 self.assertEqual(len(res[0].get("member;range=1-1")), 2)
622 ctl = str(res.controls[0]).split(":")
624 ctl[2] = "%d" % flag_incr_linked
626 control1 = str(":".join(ctl))
628 # remove the user from the group
629 self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
630 add_members_operation=False)
632 res = self.ldb_admin.search(self.base_dn,
633 expression="(name=Administrators)",
636 self.assertEqual(res[0].get("member;range=1-1"), None )
637 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
639 ctl = str(res.controls[0]).split(":")
641 ctl[2] = "%d" % flag_incr_linked
643 control2 = str(":".join(ctl))
645 self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
646 add_members_operation=False)
648 res = self.ldb_admin.search(self.base_dn,
649 expression="(name=Administrators)",
652 self.assertEqual(res[0].get("member;range=1-1"), None )
653 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
655 res = self.ldb_admin.search(self.base_dn,
656 expression="(name=Administrators)",
659 self.assertEqual(res[0].get("member;range=1-1"), None )
660 self.assertEqual(len(res[0].get("member;range=0-0")), 2)
662 def test_dirsync_deleted_items(self):
663 """Check that dirsync returnd deleted objects too"""
665 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
666 ouname="OU=testou3,%s" % self.base_dn
668 self.ldb_admin.create_ou(ouname)
670 # Specify LDAP_DIRSYNC_OBJECT_SECURITY
671 res = self.ldb_simple.search(self.base_dn,
672 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
673 controls=["dirsync:1:1:1"])
677 if str(e["name"]) == "testou3":
678 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
680 self.assertTrue(guid != None)
681 ctl = str(res.controls[0]).split(":")
685 control1 = str(":".join(ctl))
687 # So now delete the object and check that
688 # we can see the object but deleted when admin
689 # we just see the objectGUID when simple user
690 delete_force(self.ldb_admin, ouname)
692 res = self.ldb_simple.search(self.base_dn,
693 expression="(objectClass=organizationalUnit)",
695 self.assertEqual(len(res), 1)
696 guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
697 self.assertEqual(guid2, guid)
698 self.assertEqual(str(res[0].dn), "")
701 ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
703 runner = SubunitTestRunner()
706 if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
708 if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():