96c6950841803aca834679738d2329a582b51a7c
[kai/samba.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 #
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 import optparse
22 import sys
23 sys.path.insert(0, "bin/python")
24 import samba
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
27
28 import samba.getopt as options
29 import base64
30
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials
41 import samba.tests
42 from samba.tests import delete_force
43 from subunit.run import SubunitTestRunner
44 import unittest
45
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
50
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args[0]
61 if not "://" in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start+3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76 class DirsyncBaseTests(samba.tests.TestCase):
77
78     def setUp(self):
79         super(DirsyncBaseTests, self).setUp()
80         self.ldb_admin = ldb
81         self.base_dn = ldb.domain_dn()
82         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
83         self.user_pass = "samba123@AAA"
84         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85         self.sd_utils = sd_utils.SDUtils(ldb)
86         #used for anonymous login
87         print "baseDN: %s" % self.base_dn
88
89     def get_user_dn(self, name):
90         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
91
92     def get_ldb_connection(self, target_username, target_password):
93         creds_tmp = Credentials()
94         creds_tmp.set_username(target_username)
95         creds_tmp.set_password(target_password)
96         creds_tmp.set_domain(creds.get_domain())
97         creds_tmp.set_realm(creds.get_realm())
98         creds_tmp.set_workstation(creds.get_workstation())
99         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100                                       | gensec.FEATURE_SEAL)
101         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
102         return ldb_target
103
104
105 #tests on ldap add operations
106 class SimpleDirsyncTests(DirsyncBaseTests):
107
108     def setUp(self):
109         super(SimpleDirsyncTests, self).setUp()
110         # Regular user
111         self.dirsync_user = "test_dirsync_user"
112         self.simple_user = "test_simple_user"
113         self.admin_user = "test_admin_user"
114         self.ouname = None
115
116         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
117         self.ldb_admin.newuser(self.simple_user, self.user_pass)
118         self.ldb_admin.newuser(self.admin_user, self.user_pass)
119         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
120
121         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
122         mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
123         self.sd_utils.dacl_add_ace(self.base_dn, mod)
124
125         # add admins to the Domain Admins group
126         self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
127                        add_members_operation=True)
128
129     def tearDown(self):
130         super(SimpleDirsyncTests, self).tearDown()
131         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
132         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
133         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
134         if self.ouname:
135             delete_force(self.ldb_admin, self.ouname)
136         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
137         try:
138             self.ldb_admin.deletegroup("testgroup")
139         except:
140             pass
141
142     #def test_dirsync_errors(self):
143
144
145     def test_dirsync_supported(self):
146         """Test the basic of the dirsync is supported"""
147         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
148         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
149         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
150         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151         try:
152             self.ldb_simple.search(self.base_dn,
153                 expression="samaccountname=*",
154                 controls=["dirsync:1:0:1"])
155         except LdbError,l:
156            self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
157
158     def test_parentGUID_referrals(self):
159         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
160
161         res = self.ldb_admin.search(self.base_dn,
162                                         expression="name=Configuration",
163                                         controls=["dirsync:1:0:1"])
164         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
165
166     def test_ok_not_rootdc(self):
167         """Test if it's ok to do dirsync on another NC that is not the root DC"""
168         try:
169             res = self.ldb_admin.search("CN=Configuration, %s" % self.base_dn,
170                                         expression="samaccountname=*",
171                                         controls=["dirsync:1:0:1"])
172         except:
173             self.assertTrue(False)
174
175     def test_dirsync_errors(self):
176         """Test if dirsync returns the correct LDAP errors in case of pb"""
177         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
178         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
179         try:
180             self.ldb_simple.search(self.base_dn,
181                 expression="samaccountname=*",
182                 controls=["dirsync:1:0:1"])
183         except LdbError,l:
184             print l
185             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
186
187         try:
188             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
189                 expression="samaccountname=*",
190                 controls=["dirsync:1:0:1"])
191         except LdbError,l:
192             print l
193             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
194
195         try:
196             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
197                 expression="samaccountname=*",
198                 controls=["dirsync:1:1:1"])
199         except LdbError,l:
200             print l
201             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
202
203         try:
204             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
205                 expression="samaccountname=*",
206                 controls=["dirsync:1:0:1"])
207         except LdbError,l:
208             print l
209             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
210
211         try:
212             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
213                 expression="samaccountname=*",
214                 controls=["dirsync:1:0:1"])
215         except LdbError,l:
216             print l
217             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
218
219         try:
220             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
221                 expression="samaccountname=*",
222                 controls=["dirsync:1:1:1"])
223         except LdbError,l:
224             print l
225             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
226
227
228
229
230     def test_dirsync_attributes(self):
231         """Check behavior with some attributes """
232         res = self.ldb_admin.search(self.base_dn,
233                                     expression="samaccountname=*",
234                                     controls=["dirsync:1:0:1"])
235         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
236         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
237         # Check that non replicated attributes are not returned
238         self.assertTrue(res.msgs[0].get("badPwdCount") == None)
239         # Check that non forward link are not returned
240         self.assertTrue(res.msgs[0].get("memberof") == None)
241
242         # Asking for instanceType will return also objectGUID
243         res = self.ldb_admin.search(self.base_dn,
244                                     expression="samaccountname=Administrator",
245                                     attrs=["instanceType"],
246                                     controls=["dirsync:1:0:1"])
247         self.assertTrue(res.msgs[0].get("objectGUID") != None)
248         self.assertTrue(res.msgs[0].get("instanceType") != None)
249
250         # We don't return an entry if asked for objectGUID
251         res = self.ldb_admin.search(self.base_dn,
252                                     expression="dn=%s" % self.base_dn,
253                                     attrs=["objectGUID"],
254                                     controls=["dirsync:1:0:1"])
255         self.assertEquals(len(res.msgs), 0)
256
257         # a request on the root of a NC didn't return parentGUID
258         res = self.ldb_admin.search(self.base_dn,
259                                     expression="dn=%s" % self.base_dn,
260                                     attrs=["name"],
261                                     controls=["dirsync:1:0:1"])
262         self.assertTrue(res.msgs[0].get("objectGUID") != None)
263         self.assertTrue(res.msgs[0].get("name") != None)
264         self.assertTrue(res.msgs[0].get("parentGUID") == None)
265         self.assertTrue(res.msgs[0].get("instanceType") != None)
266
267          # Asking for name will return also objectGUID and parentGUID
268         # and instanceType and of course name
269         res = self.ldb_admin.search(self.base_dn,
270                                     expression="samaccountname=Administrator",
271                                     attrs=["name"],
272                                     controls=["dirsync:1:0:1"])
273         self.assertTrue(res.msgs[0].get("objectGUID") != None)
274         self.assertTrue(res.msgs[0].get("name") != None)
275         self.assertTrue(res.msgs[0].get("parentGUID") != None)
276         self.assertTrue(res.msgs[0].get("instanceType") != None)
277
278         # Asking for dn will not return not only DN but more like if attrs=*
279         # parentGUID should be returned
280         res = self.ldb_admin.search(self.base_dn,
281                                     expression="samaccountname=Administrator",
282                                     attrs=["dn"],
283                                     controls=["dirsync:1:0:1"])
284         count = len(res.msgs[0])
285         res2 = self.ldb_admin.search(self.base_dn,
286                                     expression="samaccountname=Administrator",
287                                     controls=["dirsync:1:0:1"])
288         count2 = len(res2.msgs[0])
289         self.assertEqual(count, count2)
290
291         # Asking for cn will return nothing on objects that have CN as RDN
292         res = self.ldb_admin.search(self.base_dn,
293                                     expression="samaccountname=Administrator",
294                                     attrs=["cn"],
295                                     controls=["dirsync:1:0:1"])
296         self.assertEqual(len(res.msgs), 0)
297         # Asking for parentGUID will return nothing too
298         res = self.ldb_admin.search(self.base_dn,
299                                     expression="samaccountname=Administrator",
300                                     attrs=["parentGUID"],
301                                     controls=["dirsync:1:0:1"])
302         self.assertEqual(len(res.msgs), 0)
303         ouname="OU=testou,%s" % self.base_dn
304         self.ouname = ouname
305         self.ldb_admin.create_ou(ouname)
306         delta = Message()
307         delta.dn = Dn(self.ldb_admin, str(ouname))
308         delta["cn"] = MessageElement("test ou",
309                                         FLAG_MOD_ADD,
310                                         "cn" )
311         self.ldb_admin.modify(delta)
312         res = self.ldb_admin.search(self.base_dn,
313                                     expression="name=testou",
314                                     attrs=["cn"],
315                                     controls=["dirsync:1:0:1"])
316
317         self.assertEqual(len(res.msgs), 1)
318         self.assertEqual(len(res.msgs[0]), 3)
319         delete_force(self.ldb_admin, ouname)
320
321     def test_dirsync_with_controls(self):
322         """Check that dirsync return correct informations when dealing with the NC"""
323         res = self.ldb_admin.search(self.base_dn,
324                                     expression="(dn=%s)" % str(self.base_dn),
325                                     attrs=["name"],
326                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
327
328     def test_dirsync_basenc(self):
329         """Check that dirsync return correct informations when dealing with the NC"""
330         res = self.ldb_admin.search(self.base_dn,
331                                     expression="(dn=%s)" % str(self.base_dn),
332                                     attrs=["name"],
333                                     controls=["dirsync:1:0:10000"])
334         self.assertEqual(len(res.msgs), 1)
335         self.assertEqual(len(res.msgs[0]), 3)
336
337         res = self.ldb_admin.search(self.base_dn,
338                                     expression="(dn=%s)" % str(self.base_dn),
339                                     attrs=["ntSecurityDescriptor"],
340                                     controls=["dirsync:1:0:10000"])
341         self.assertEqual(len(res.msgs), 1)
342         self.assertEqual(len(res.msgs[0]), 3)
343
344     def test_dirsync_othernc(self):
345         """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
346         res = self.ldb_admin.search(self.base_dn,
347                                     expression="(objectclass=configuration)",
348                                     attrs=["name"],
349                                     controls=["dirsync:1:0:10000"])
350         self.assertEqual(len(res.msgs), 1)
351         self.assertEqual(len(res.msgs[0]), 4)
352
353         res = self.ldb_admin.search(self.base_dn,
354                                     expression="(objectclass=configuration)",
355                                     attrs=["ntSecurityDescriptor"],
356                                     controls=["dirsync:1:0:10000"])
357         self.assertEqual(len(res.msgs), 1)
358         self.assertEqual(len(res.msgs[0]), 3)
359
360         res = self.ldb_admin.search(self.base_dn,
361                                     expression="(objectclass=domaindns)",
362                                     attrs=["ntSecurityDescriptor"],
363                                     controls=["dirsync:1:0:10000"])
364         nb = len(res.msgs)
365
366         # only sub nc returns a result when asked for objectGUID
367         res = self.ldb_admin.search(self.base_dn,
368                                     expression="(objectclass=domaindns)",
369                                     attrs=["objectGUID"],
370                                     controls=["dirsync:1:0:0"])
371         self.assertEqual(len(res.msgs), nb - 1)
372         if nb > 1:
373             self.assertTrue(res.msgs[0].get("objectGUID") != None)
374         else:
375             res = self.ldb_admin.search(self.base_dn,
376                                         expression="(objectclass=configuration)",
377                                         attrs=["objectGUID"],
378                                         controls=["dirsync:1:0:0"])
379
380
381     def test_dirsync_send_delta(self):
382         """Check that dirsync return correct delta when sending the last cookie"""
383         res = self.ldb_admin.search(self.base_dn,
384                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
385                                     controls=["dirsync:1:0:10000"])
386         ctl = str(res.controls[0]).split(":")
387         ctl[1] = "1"
388         ctl[2] = "0"
389         ctl[3] = "10000"
390         control = str(":".join(ctl))
391         res = self.ldb_admin.search(self.base_dn,
392                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
393                                     controls=[control])
394         self.assertEqual(len(res), 0)
395
396         res = self.ldb_admin.search(self.base_dn,
397                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
398                                     controls=["dirsync:1:0:100000"])
399
400         ctl = str(res.controls[0]).split(":")
401         ctl[1] = "1"
402         ctl[2] = "0"
403         ctl[3] = "10000"
404         control2 = str(":".join(ctl))
405
406         # Let's create an OU
407         ouname="OU=testou2,%s" % self.base_dn
408         self.ouname = ouname
409         self.ldb_admin.create_ou(ouname)
410         res = self.ldb_admin.search(self.base_dn,
411                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
412                                     controls=[control2])
413         self.assertEqual(len(res), 1)
414         ctl = str(res.controls[0]).split(":")
415         ctl[1] = "1"
416         ctl[2] = "0"
417         ctl[3] = "10000"
418         control3 = str(":".join(ctl))
419
420         delta = Message()
421         delta.dn = Dn(self.ldb_admin, str(ouname))
422
423         delta["cn"] = MessageElement("test ou",
424                                         FLAG_MOD_ADD,
425                                         "cn" )
426         self.ldb_admin.modify(delta)
427         res = self.ldb_admin.search(self.base_dn,
428                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
429                                     controls=[control3])
430
431         self.assertEqual(len(res.msgs), 1)
432         # 3 attributes: instanceType, cn and objectGUID
433         self.assertEqual(len(res.msgs[0]), 3)
434
435         delta = Message()
436         delta.dn = Dn(self.ldb_admin, str(ouname))
437         delta["cn"] = MessageElement([],
438                                         FLAG_MOD_DELETE,
439                                         "cn" )
440         self.ldb_admin.modify(delta)
441         res = self.ldb_admin.search(self.base_dn,
442                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
443                                     controls=[control3])
444
445         self.assertEqual(len(res.msgs), 1)
446         # So we won't have much attribute returned but instanceType and GUID
447         # are.
448         # 3 attributes: instanceType and objectGUID and cn but empty
449         self.assertEqual(len(res.msgs[0]), 3)
450         ouname = "OU=newouname,%s" % self.base_dn
451         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
452         self.ouname = ouname
453         ctl = str(res.controls[0]).split(":")
454         ctl[1] = "1"
455         ctl[2] = "0"
456         ctl[3] = "10000"
457         control4 = str(":".join(ctl))
458         res = self.ldb_admin.search(self.base_dn,
459                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
460                                     controls=[control3])
461
462         self.assertTrue(res[0].get("parentGUID") != None)
463         self.assertTrue(res[0].get("name") != None)
464         delete_force(self.ldb_admin, ouname)
465
466     def test_dirsync_linkedattributes(self):
467         """Check that dirsync returnd deleted objects too"""
468         # Let's search for members
469         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
470         res = self.ldb_simple.search(self.base_dn,
471                                     expression="(name=Administrators)",
472                                     controls=["dirsync:1:1:1"])
473
474         self.assertTrue(len(res[0].get("member")) > 0)
475         size = len(res[0].get("member"))
476
477         ctl = str(res.controls[0]).split(":")
478         ctl[1] = "1"
479         ctl[2] = "1"
480         ctl[3] = "10000"
481         control1 = str(":".join(ctl))
482         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
483                        add_members_operation=True)
484
485         res = self.ldb_simple.search(self.base_dn,
486                                     expression="(name=Administrators)",
487                                     controls=[control1])
488
489         self.assertEqual(len(res[0].get("member")), size + 1)
490         ctl = str(res.controls[0]).split(":")
491         ctl[1] = "1"
492         ctl[2] = "1"
493         ctl[3] = "10000"
494         control1 = str(":".join(ctl))
495
496         # remove the user from the group
497         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
498                        add_members_operation=False)
499
500         res = self.ldb_simple.search(self.base_dn,
501                                     expression="(name=Administrators)",
502                                     controls=[control1])
503
504         self.assertEqual(len(res[0].get("member")), size )
505
506         self.ldb_admin.newgroup("testgroup")
507         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
508                        add_members_operation=True)
509
510         res = self.ldb_admin.search(self.base_dn,
511                                     expression="(name=testgroup)",
512                                     controls=["dirsync:1:0:1"])
513
514         self.assertEqual(len(res[0].get("member")), 1)
515         self.assertTrue(res[0].get("member") != "" )
516
517         ctl = str(res.controls[0]).split(":")
518         ctl[1] = "1"
519         ctl[2] = "0"
520         ctl[3] = "1"
521         control1 = str(":".join(ctl))
522
523         # Check that reasking the same question but with an updated cookie
524         # didn't return any results.
525         print control1
526         res = self.ldb_admin.search(self.base_dn,
527                                     expression="(name=testgroup)",
528                                     controls=[control1])
529         self.assertEqual(len(res), 0)
530
531         ctl = str(res.controls[0]).split(":")
532         ctl[1] = "1"
533         ctl[2] = "1"
534         ctl[3] = "10000"
535         control1 = str(":".join(ctl))
536
537         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
538                        add_members_operation=False)
539
540         res = self.ldb_admin.search(self.base_dn,
541                                     expression="(name=testgroup)",
542                                     attrs=["member"],
543                                     controls=[control1])
544
545         self.ldb_admin.deletegroup("testgroup")
546         self.assertEqual(len(res[0].get("member")), 0)
547
548
549
550     def test_dirsync_deleted_items(self):
551         """Check that dirsync returnd deleted objects too"""
552         # Let's create an OU
553         ouname="OU=testou3,%s" % self.base_dn
554         self.ouname = ouname
555         self.ldb_admin.create_ou(ouname)
556         res = self.ldb_admin.search(self.base_dn,
557                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
558                                     controls=["dirsync:1:0:1"])
559         guid = None
560         for e in res:
561             if str(e["name"]) == "testou3":
562                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
563
564         ctl = str(res.controls[0]).split(":")
565         ctl[1] = "1"
566         ctl[2] = "0"
567         ctl[3] = "10000"
568         control1 = str(":".join(ctl))
569
570         # So now delete the object and check that
571         # we can see the object but deleted when admin
572         delete_force(self.ldb_admin, ouname)
573
574         res = self.ldb_admin.search(self.base_dn,
575                                     expression="(objectClass=organizationalUnit)",
576                                     controls=[control1])
577         self.assertEqual(len(res), 1)
578         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
579         self.assertEqual(guid2, guid)
580         self.assertTrue(res[0].get("isDeleted"))
581         self.assertTrue(res[0].get("name") != None)
582
583     def test_cookie_from_others(self):
584         res = self.ldb_admin.search(self.base_dn,
585                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
586                                     controls=["dirsync:1:0:1"])
587         ctl = str(res.controls[0]).split(":")
588         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
589         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
590         controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
591         res = self.ldb_admin.search(self.base_dn,
592                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
593                                     controls=controls)
594
595 class ExtendedDirsyncTests(SimpleDirsyncTests):
596     def test_dirsync_linkedattributes(self):
597         flag_incr_linked = 2147483648
598         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
599         res = self.ldb_admin.search(self.base_dn,
600                                     attrs=["member"],
601                                     expression="(name=Administrators)",
602                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
603
604         self.assertTrue(res[0].get("member;range=1-1") != None )
605         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
606         size = len(res[0].get("member;range=1-1"))
607
608         ctl = str(res.controls[0]).split(":")
609         ctl[1] = "1"
610         ctl[2] = "%d" % flag_incr_linked
611         ctl[3] = "10000"
612         control1 = str(":".join(ctl))
613         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
614                        add_members_operation=True)
615         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
616                        add_members_operation=True)
617
618
619         res = self.ldb_admin.search(self.base_dn,
620                                     expression="(name=Administrators)",
621                                     controls=[control1])
622
623         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
624         ctl = str(res.controls[0]).split(":")
625         ctl[1] = "1"
626         ctl[2] = "%d" % flag_incr_linked
627         ctl[3] = "10000"
628         control1 = str(":".join(ctl))
629
630         # remove the user from the group
631         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
632                        add_members_operation=False)
633
634         res = self.ldb_admin.search(self.base_dn,
635                                     expression="(name=Administrators)",
636                                     controls=[control1])
637
638         self.assertEqual(res[0].get("member;range=1-1"), None )
639         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
640
641         ctl = str(res.controls[0]).split(":")
642         ctl[1] = "1"
643         ctl[2] = "%d" % flag_incr_linked
644         ctl[3] = "10000"
645         control2 = str(":".join(ctl))
646
647         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
648                        add_members_operation=False)
649
650         res = self.ldb_admin.search(self.base_dn,
651                                     expression="(name=Administrators)",
652                                     controls=[control2])
653
654         self.assertEqual(res[0].get("member;range=1-1"), None )
655         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
656
657         res = self.ldb_admin.search(self.base_dn,
658                                     expression="(name=Administrators)",
659                                     controls=[control1])
660
661         self.assertEqual(res[0].get("member;range=1-1"), None )
662         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
663
664     def test_dirsync_deleted_items(self):
665         """Check that dirsync returnd deleted objects too"""
666         # Let's create an OU
667         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
668         ouname="OU=testou3,%s" % self.base_dn
669         self.ouname = ouname
670         self.ldb_admin.create_ou(ouname)
671
672         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
673         res = self.ldb_simple.search(self.base_dn,
674                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
675                                     controls=["dirsync:1:1:1"])
676
677         guid = None
678         for e in res:
679             if str(e["name"]) == "testou3":
680                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
681
682         self.assertTrue(guid != None)
683         ctl = str(res.controls[0]).split(":")
684         ctl[1] = "1"
685         ctl[2] = "1"
686         ctl[3] = "10000"
687         control1 = str(":".join(ctl))
688
689         # So now delete the object and check that
690         # we can see the object but deleted when admin
691         # we just see the objectGUID when simple user
692         delete_force(self.ldb_admin, ouname)
693
694         res = self.ldb_simple.search(self.base_dn,
695                                     expression="(objectClass=organizationalUnit)",
696                                     controls=[control1])
697         self.assertEqual(len(res), 1)
698         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
699         self.assertEqual(guid2, guid)
700         self.assertEqual(str(res[0].dn), "")
701
702
703 ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
704
705 runner = SubunitTestRunner()
706 rc = 0
707 #
708 if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
709     rc = 1
710 if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
711     rc = 1
712
713 sys.exit(rc)