64b847f4306fede087f30b4a41ee3d98701d3628
[ira/wip.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 #
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 import optparse
22 import sys
23 sys.path.insert(0, "bin/python")
24 import samba
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
27
28 import samba.getopt as options
29 import base64
30
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials
41 import samba.tests
42 from samba.tests import delete_force
43 from subunit.run import SubunitTestRunner
44 import unittest
45
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
50
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args[0]
61 if not "://" in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start+3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76 class DirsyncBaseTests(samba.tests.TestCase):
77
78     def setUp(self):
79         super(DirsyncBaseTests, self).setUp()
80         self.ldb_admin = ldb
81         self.base_dn = ldb.domain_dn()
82         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
83         self.user_pass = "samba123@AAA"
84         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85         self.sd_utils = sd_utils.SDUtils(ldb)
86         #used for anonymous login
87         print "baseDN: %s" % self.base_dn
88
89     def get_user_dn(self, name):
90         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
91
92     def get_ldb_connection(self, target_username, target_password):
93         creds_tmp = Credentials()
94         creds_tmp.set_username(target_username)
95         creds_tmp.set_password(target_password)
96         creds_tmp.set_domain(creds.get_domain())
97         creds_tmp.set_realm(creds.get_realm())
98         creds_tmp.set_workstation(creds.get_workstation())
99         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100                                       | gensec.FEATURE_SEAL)
101         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
102         return ldb_target
103
104
105 #tests on ldap add operations
106 class SimpleDirsyncTests(DirsyncBaseTests):
107
108     def setUp(self):
109         super(SimpleDirsyncTests, self).setUp()
110         # Regular user
111         self.dirsync_user = "test_dirsync_user"
112         self.simple_user = "test_simple_user"
113         self.admin_user = "test_admin_user"
114         self.ouname = None
115
116         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
117         self.ldb_admin.newuser(self.simple_user, self.user_pass)
118         self.ldb_admin.newuser(self.admin_user, self.user_pass)
119         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
120
121         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
122         mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
123         self.sd_utils.dacl_add_ace(self.base_dn, mod)
124
125         # add admins to the Domain Admins group
126         self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
127                        add_members_operation=True)
128
129     def tearDown(self):
130         super(SimpleDirsyncTests, self).tearDown()
131         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
132         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
133         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
134         if self.ouname:
135             delete_force(self.ldb_admin, self.ouname)
136         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
137         try:
138             self.ldb_admin.deletegroup("testgroup")
139         except Exception:
140             pass
141
142     #def test_dirsync_errors(self):
143
144
145     def test_dirsync_supported(self):
146         """Test the basic of the dirsync is supported"""
147         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
148         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
149         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
150         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151         try:
152             self.ldb_simple.search(self.base_dn,
153                 expression="samaccountname=*",
154                 controls=["dirsync:1:0:1"])
155         except LdbError,l:
156            self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
157
158     def test_parentGUID_referrals(self):
159         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
160
161         res = self.ldb_admin.search(self.base_dn,
162                                         expression="name=Configuration",
163                                         controls=["dirsync:1:0:1"])
164         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
165
166     def test_ok_not_rootdc(self):
167         """Test if it's ok to do dirsync on another NC that is not the root DC"""
168         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
169                                     expression="samaccountname=*",
170                                     controls=["dirsync:1:0:1"])
171
172     def test_dirsync_errors(self):
173         """Test if dirsync returns the correct LDAP errors in case of pb"""
174         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
175         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
176         try:
177             self.ldb_simple.search(self.base_dn,
178                 expression="samaccountname=*",
179                 controls=["dirsync:1:0:1"])
180         except LdbError,l:
181             print l
182             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
183
184         try:
185             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
186                 expression="samaccountname=*",
187                 controls=["dirsync:1:0:1"])
188         except LdbError,l:
189             print l
190             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
191
192         try:
193             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
194                 expression="samaccountname=*",
195                 controls=["dirsync:1:1:1"])
196         except LdbError,l:
197             print l
198             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
199
200         try:
201             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
202                 expression="samaccountname=*",
203                 controls=["dirsync:1:0:1"])
204         except LdbError,l:
205             print l
206             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
207
208         try:
209             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
210                 expression="samaccountname=*",
211                 controls=["dirsync:1:0:1"])
212         except LdbError,l:
213             print l
214             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
215
216         try:
217             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
218                 expression="samaccountname=*",
219                 controls=["dirsync:1:1:1"])
220         except LdbError,l:
221             print l
222             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
223
224
225
226
227     def test_dirsync_attributes(self):
228         """Check behavior with some attributes """
229         res = self.ldb_admin.search(self.base_dn,
230                                     expression="samaccountname=*",
231                                     controls=["dirsync:1:0:1"])
232         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
233         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
234         # Check that non replicated attributes are not returned
235         self.assertTrue(res.msgs[0].get("badPwdCount") == None)
236         # Check that non forward link are not returned
237         self.assertTrue(res.msgs[0].get("memberof") == None)
238
239         # Asking for instanceType will return also objectGUID
240         res = self.ldb_admin.search(self.base_dn,
241                                     expression="samaccountname=Administrator",
242                                     attrs=["instanceType"],
243                                     controls=["dirsync:1:0:1"])
244         self.assertTrue(res.msgs[0].get("objectGUID") != None)
245         self.assertTrue(res.msgs[0].get("instanceType") != None)
246
247         # We don't return an entry if asked for objectGUID
248         res = self.ldb_admin.search(self.base_dn,
249                                     expression="dn=%s" % self.base_dn,
250                                     attrs=["objectGUID"],
251                                     controls=["dirsync:1:0:1"])
252         self.assertEquals(len(res.msgs), 0)
253
254         # a request on the root of a NC didn't return parentGUID
255         res = self.ldb_admin.search(self.base_dn,
256                                     expression="dn=%s" % self.base_dn,
257                                     attrs=["name"],
258                                     controls=["dirsync:1:0:1"])
259         self.assertTrue(res.msgs[0].get("objectGUID") != None)
260         self.assertTrue(res.msgs[0].get("name") != None)
261         self.assertTrue(res.msgs[0].get("parentGUID") == None)
262         self.assertTrue(res.msgs[0].get("instanceType") != None)
263
264          # Asking for name will return also objectGUID and parentGUID
265         # and instanceType and of course name
266         res = self.ldb_admin.search(self.base_dn,
267                                     expression="samaccountname=Administrator",
268                                     attrs=["name"],
269                                     controls=["dirsync:1:0:1"])
270         self.assertTrue(res.msgs[0].get("objectGUID") != None)
271         self.assertTrue(res.msgs[0].get("name") != None)
272         self.assertTrue(res.msgs[0].get("parentGUID") != None)
273         self.assertTrue(res.msgs[0].get("instanceType") != None)
274
275         # Asking for dn will not return not only DN but more like if attrs=*
276         # parentGUID should be returned
277         res = self.ldb_admin.search(self.base_dn,
278                                     expression="samaccountname=Administrator",
279                                     attrs=["dn"],
280                                     controls=["dirsync:1:0:1"])
281         count = len(res.msgs[0])
282         res2 = self.ldb_admin.search(self.base_dn,
283                                     expression="samaccountname=Administrator",
284                                     controls=["dirsync:1:0:1"])
285         count2 = len(res2.msgs[0])
286         self.assertEqual(count, count2)
287
288         # Asking for cn will return nothing on objects that have CN as RDN
289         res = self.ldb_admin.search(self.base_dn,
290                                     expression="samaccountname=Administrator",
291                                     attrs=["cn"],
292                                     controls=["dirsync:1:0:1"])
293         self.assertEqual(len(res.msgs), 0)
294         # Asking for parentGUID will return nothing too
295         res = self.ldb_admin.search(self.base_dn,
296                                     expression="samaccountname=Administrator",
297                                     attrs=["parentGUID"],
298                                     controls=["dirsync:1:0:1"])
299         self.assertEqual(len(res.msgs), 0)
300         ouname="OU=testou,%s" % self.base_dn
301         self.ouname = ouname
302         self.ldb_admin.create_ou(ouname)
303         delta = Message()
304         delta.dn = Dn(self.ldb_admin, str(ouname))
305         delta["cn"] = MessageElement("test ou",
306                                         FLAG_MOD_ADD,
307                                         "cn" )
308         self.ldb_admin.modify(delta)
309         res = self.ldb_admin.search(self.base_dn,
310                                     expression="name=testou",
311                                     attrs=["cn"],
312                                     controls=["dirsync:1:0:1"])
313
314         self.assertEqual(len(res.msgs), 1)
315         self.assertEqual(len(res.msgs[0]), 3)
316         delete_force(self.ldb_admin, ouname)
317
318     def test_dirsync_with_controls(self):
319         """Check that dirsync return correct informations when dealing with the NC"""
320         res = self.ldb_admin.search(self.base_dn,
321                                     expression="(dn=%s)" % str(self.base_dn),
322                                     attrs=["name"],
323                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
324
325     def test_dirsync_basenc(self):
326         """Check that dirsync return correct informations when dealing with the NC"""
327         res = self.ldb_admin.search(self.base_dn,
328                                     expression="(dn=%s)" % str(self.base_dn),
329                                     attrs=["name"],
330                                     controls=["dirsync:1:0:10000"])
331         self.assertEqual(len(res.msgs), 1)
332         self.assertEqual(len(res.msgs[0]), 3)
333
334         res = self.ldb_admin.search(self.base_dn,
335                                     expression="(dn=%s)" % str(self.base_dn),
336                                     attrs=["ntSecurityDescriptor"],
337                                     controls=["dirsync:1:0:10000"])
338         self.assertEqual(len(res.msgs), 1)
339         self.assertEqual(len(res.msgs[0]), 3)
340
341     def test_dirsync_othernc(self):
342         """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
343         res = self.ldb_admin.search(self.base_dn,
344                                     expression="(objectclass=configuration)",
345                                     attrs=["name"],
346                                     controls=["dirsync:1:0:10000"])
347         self.assertEqual(len(res.msgs), 1)
348         self.assertEqual(len(res.msgs[0]), 4)
349
350         res = self.ldb_admin.search(self.base_dn,
351                                     expression="(objectclass=configuration)",
352                                     attrs=["ntSecurityDescriptor"],
353                                     controls=["dirsync:1:0:10000"])
354         self.assertEqual(len(res.msgs), 1)
355         self.assertEqual(len(res.msgs[0]), 3)
356
357         res = self.ldb_admin.search(self.base_dn,
358                                     expression="(objectclass=domaindns)",
359                                     attrs=["ntSecurityDescriptor"],
360                                     controls=["dirsync:1:0:10000"])
361         nb = len(res.msgs)
362
363         # only sub nc returns a result when asked for objectGUID
364         res = self.ldb_admin.search(self.base_dn,
365                                     expression="(objectclass=domaindns)",
366                                     attrs=["objectGUID"],
367                                     controls=["dirsync:1:0:0"])
368         self.assertEqual(len(res.msgs), nb - 1)
369         if nb > 1:
370             self.assertTrue(res.msgs[0].get("objectGUID") != None)
371         else:
372             res = self.ldb_admin.search(self.base_dn,
373                                         expression="(objectclass=configuration)",
374                                         attrs=["objectGUID"],
375                                         controls=["dirsync:1:0:0"])
376
377
378     def test_dirsync_send_delta(self):
379         """Check that dirsync return correct delta when sending the last cookie"""
380         res = self.ldb_admin.search(self.base_dn,
381                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
382                                     controls=["dirsync:1:0:10000"])
383         ctl = str(res.controls[0]).split(":")
384         ctl[1] = "1"
385         ctl[2] = "0"
386         ctl[3] = "10000"
387         control = str(":".join(ctl))
388         res = self.ldb_admin.search(self.base_dn,
389                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
390                                     controls=[control])
391         self.assertEqual(len(res), 0)
392
393         res = self.ldb_admin.search(self.base_dn,
394                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
395                                     controls=["dirsync:1:0:100000"])
396
397         ctl = str(res.controls[0]).split(":")
398         ctl[1] = "1"
399         ctl[2] = "0"
400         ctl[3] = "10000"
401         control2 = str(":".join(ctl))
402
403         # Let's create an OU
404         ouname="OU=testou2,%s" % self.base_dn
405         self.ouname = ouname
406         self.ldb_admin.create_ou(ouname)
407         res = self.ldb_admin.search(self.base_dn,
408                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
409                                     controls=[control2])
410         self.assertEqual(len(res), 1)
411         ctl = str(res.controls[0]).split(":")
412         ctl[1] = "1"
413         ctl[2] = "0"
414         ctl[3] = "10000"
415         control3 = str(":".join(ctl))
416
417         delta = Message()
418         delta.dn = Dn(self.ldb_admin, str(ouname))
419
420         delta["cn"] = MessageElement("test ou",
421                                         FLAG_MOD_ADD,
422                                         "cn" )
423         self.ldb_admin.modify(delta)
424         res = self.ldb_admin.search(self.base_dn,
425                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
426                                     controls=[control3])
427
428         self.assertEqual(len(res.msgs), 1)
429         # 3 attributes: instanceType, cn and objectGUID
430         self.assertEqual(len(res.msgs[0]), 3)
431
432         delta = Message()
433         delta.dn = Dn(self.ldb_admin, str(ouname))
434         delta["cn"] = MessageElement([],
435                                         FLAG_MOD_DELETE,
436                                         "cn" )
437         self.ldb_admin.modify(delta)
438         res = self.ldb_admin.search(self.base_dn,
439                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
440                                     controls=[control3])
441
442         self.assertEqual(len(res.msgs), 1)
443         # So we won't have much attribute returned but instanceType and GUID
444         # are.
445         # 3 attributes: instanceType and objectGUID and cn but empty
446         self.assertEqual(len(res.msgs[0]), 3)
447         ouname = "OU=newouname,%s" % self.base_dn
448         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
449         self.ouname = ouname
450         ctl = str(res.controls[0]).split(":")
451         ctl[1] = "1"
452         ctl[2] = "0"
453         ctl[3] = "10000"
454         control4 = str(":".join(ctl))
455         res = self.ldb_admin.search(self.base_dn,
456                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
457                                     controls=[control3])
458
459         self.assertTrue(res[0].get("parentGUID") != None)
460         self.assertTrue(res[0].get("name") != None)
461         delete_force(self.ldb_admin, ouname)
462
463     def test_dirsync_linkedattributes(self):
464         """Check that dirsync returnd deleted objects too"""
465         # Let's search for members
466         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
467         res = self.ldb_simple.search(self.base_dn,
468                                     expression="(name=Administrators)",
469                                     controls=["dirsync:1:1:1"])
470
471         self.assertTrue(len(res[0].get("member")) > 0)
472         size = len(res[0].get("member"))
473
474         ctl = str(res.controls[0]).split(":")
475         ctl[1] = "1"
476         ctl[2] = "1"
477         ctl[3] = "10000"
478         control1 = str(":".join(ctl))
479         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
480                        add_members_operation=True)
481
482         res = self.ldb_simple.search(self.base_dn,
483                                     expression="(name=Administrators)",
484                                     controls=[control1])
485
486         self.assertEqual(len(res[0].get("member")), size + 1)
487         ctl = str(res.controls[0]).split(":")
488         ctl[1] = "1"
489         ctl[2] = "1"
490         ctl[3] = "10000"
491         control1 = str(":".join(ctl))
492
493         # remove the user from the group
494         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
495                        add_members_operation=False)
496
497         res = self.ldb_simple.search(self.base_dn,
498                                     expression="(name=Administrators)",
499                                     controls=[control1])
500
501         self.assertEqual(len(res[0].get("member")), size )
502
503         self.ldb_admin.newgroup("testgroup")
504         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
505                        add_members_operation=True)
506
507         res = self.ldb_admin.search(self.base_dn,
508                                     expression="(name=testgroup)",
509                                     controls=["dirsync:1:0:1"])
510
511         self.assertEqual(len(res[0].get("member")), 1)
512         self.assertTrue(res[0].get("member") != "" )
513
514         ctl = str(res.controls[0]).split(":")
515         ctl[1] = "1"
516         ctl[2] = "0"
517         ctl[3] = "1"
518         control1 = str(":".join(ctl))
519
520         # Check that reasking the same question but with an updated cookie
521         # didn't return any results.
522         print control1
523         res = self.ldb_admin.search(self.base_dn,
524                                     expression="(name=testgroup)",
525                                     controls=[control1])
526         self.assertEqual(len(res), 0)
527
528         ctl = str(res.controls[0]).split(":")
529         ctl[1] = "1"
530         ctl[2] = "1"
531         ctl[3] = "10000"
532         control1 = str(":".join(ctl))
533
534         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
535                        add_members_operation=False)
536
537         res = self.ldb_admin.search(self.base_dn,
538                                     expression="(name=testgroup)",
539                                     attrs=["member"],
540                                     controls=[control1])
541
542         self.ldb_admin.deletegroup("testgroup")
543         self.assertEqual(len(res[0].get("member")), 0)
544
545
546
547     def test_dirsync_deleted_items(self):
548         """Check that dirsync returnd deleted objects too"""
549         # Let's create an OU
550         ouname="OU=testou3,%s" % self.base_dn
551         self.ouname = ouname
552         self.ldb_admin.create_ou(ouname)
553         res = self.ldb_admin.search(self.base_dn,
554                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
555                                     controls=["dirsync:1:0:1"])
556         guid = None
557         for e in res:
558             if str(e["name"]) == "testou3":
559                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
560
561         ctl = str(res.controls[0]).split(":")
562         ctl[1] = "1"
563         ctl[2] = "0"
564         ctl[3] = "10000"
565         control1 = str(":".join(ctl))
566
567         # So now delete the object and check that
568         # we can see the object but deleted when admin
569         delete_force(self.ldb_admin, ouname)
570
571         res = self.ldb_admin.search(self.base_dn,
572                                     expression="(objectClass=organizationalUnit)",
573                                     controls=[control1])
574         self.assertEqual(len(res), 1)
575         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
576         self.assertEqual(guid2, guid)
577         self.assertTrue(res[0].get("isDeleted"))
578         self.assertTrue(res[0].get("name") != None)
579
580     def test_cookie_from_others(self):
581         res = self.ldb_admin.search(self.base_dn,
582                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
583                                     controls=["dirsync:1:0:1"])
584         ctl = str(res.controls[0]).split(":")
585         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
586         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
587         controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
588         res = self.ldb_admin.search(self.base_dn,
589                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
590                                     controls=controls)
591
592 class ExtendedDirsyncTests(SimpleDirsyncTests):
593     def test_dirsync_linkedattributes(self):
594         flag_incr_linked = 2147483648
595         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
596         res = self.ldb_admin.search(self.base_dn,
597                                     attrs=["member"],
598                                     expression="(name=Administrators)",
599                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
600
601         self.assertTrue(res[0].get("member;range=1-1") != None )
602         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
603         size = len(res[0].get("member;range=1-1"))
604
605         ctl = str(res.controls[0]).split(":")
606         ctl[1] = "1"
607         ctl[2] = "%d" % flag_incr_linked
608         ctl[3] = "10000"
609         control1 = str(":".join(ctl))
610         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
611                        add_members_operation=True)
612         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
613                        add_members_operation=True)
614
615
616         res = self.ldb_admin.search(self.base_dn,
617                                     expression="(name=Administrators)",
618                                     controls=[control1])
619
620         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
621         ctl = str(res.controls[0]).split(":")
622         ctl[1] = "1"
623         ctl[2] = "%d" % flag_incr_linked
624         ctl[3] = "10000"
625         control1 = str(":".join(ctl))
626
627         # remove the user from the group
628         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
629                        add_members_operation=False)
630
631         res = self.ldb_admin.search(self.base_dn,
632                                     expression="(name=Administrators)",
633                                     controls=[control1])
634
635         self.assertEqual(res[0].get("member;range=1-1"), None )
636         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
637
638         ctl = str(res.controls[0]).split(":")
639         ctl[1] = "1"
640         ctl[2] = "%d" % flag_incr_linked
641         ctl[3] = "10000"
642         control2 = str(":".join(ctl))
643
644         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
645                        add_members_operation=False)
646
647         res = self.ldb_admin.search(self.base_dn,
648                                     expression="(name=Administrators)",
649                                     controls=[control2])
650
651         self.assertEqual(res[0].get("member;range=1-1"), None )
652         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
653
654         res = self.ldb_admin.search(self.base_dn,
655                                     expression="(name=Administrators)",
656                                     controls=[control1])
657
658         self.assertEqual(res[0].get("member;range=1-1"), None )
659         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
660
661     def test_dirsync_deleted_items(self):
662         """Check that dirsync returnd deleted objects too"""
663         # Let's create an OU
664         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
665         ouname="OU=testou3,%s" % self.base_dn
666         self.ouname = ouname
667         self.ldb_admin.create_ou(ouname)
668
669         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
670         res = self.ldb_simple.search(self.base_dn,
671                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
672                                     controls=["dirsync:1:1:1"])
673
674         guid = None
675         for e in res:
676             if str(e["name"]) == "testou3":
677                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
678
679         self.assertTrue(guid != None)
680         ctl = str(res.controls[0]).split(":")
681         ctl[1] = "1"
682         ctl[2] = "1"
683         ctl[3] = "10000"
684         control1 = str(":".join(ctl))
685
686         # So now delete the object and check that
687         # we can see the object but deleted when admin
688         # we just see the objectGUID when simple user
689         delete_force(self.ldb_admin, ouname)
690
691         res = self.ldb_simple.search(self.base_dn,
692                                     expression="(objectClass=organizationalUnit)",
693                                     controls=[control1])
694         self.assertEqual(len(res), 1)
695         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
696         self.assertEqual(guid2, guid)
697         self.assertEqual(str(res[0].dn), "")
698
699
700 ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
701
702 runner = SubunitTestRunner()
703 rc = 0
704 #
705 if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
706     rc = 1
707 if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
708     rc = 1
709
710 sys.exit(rc)