s4-selftest: Avoid running kinit for each new connection
[samba.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 #
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 import optparse
22 import sys
23 sys.path.insert(0, "bin/python")
24 import samba
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
27
28 import samba.getopt as options
29 import base64
30
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
41 import samba.tests
42 from samba.tests import delete_force
43 from subunit.run import SubunitTestRunner
44 import unittest
45
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
50
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args[0]
61 if not "://" in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start+3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76 class DirsyncBaseTests(samba.tests.TestCase):
77
78     def setUp(self):
79         super(DirsyncBaseTests, self).setUp()
80         self.ldb_admin = ldb
81         self.base_dn = ldb.domain_dn()
82         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
83         self.user_pass = "samba123@AAA"
84         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85         self.sd_utils = sd_utils.SDUtils(ldb)
86         #used for anonymous login
87         print "baseDN: %s" % self.base_dn
88
89     def get_user_dn(self, name):
90         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
91
92     def get_ldb_connection(self, target_username, target_password):
93         creds_tmp = Credentials()
94         creds_tmp.set_username(target_username)
95         creds_tmp.set_password(target_password)
96         creds_tmp.set_domain(creds.get_domain())
97         creds_tmp.set_realm(creds.get_realm())
98         creds_tmp.set_workstation(creds.get_workstation())
99         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100                                       | gensec.FEATURE_SEAL)
101         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
102         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
103         return ldb_target
104
105
106 #tests on ldap add operations
107 class SimpleDirsyncTests(DirsyncBaseTests):
108
109     def setUp(self):
110         super(SimpleDirsyncTests, self).setUp()
111         # Regular user
112         self.dirsync_user = "test_dirsync_user"
113         self.simple_user = "test_simple_user"
114         self.admin_user = "test_admin_user"
115         self.ouname = None
116
117         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
118         self.ldb_admin.newuser(self.simple_user, self.user_pass)
119         self.ldb_admin.newuser(self.admin_user, self.user_pass)
120         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
121
122         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
123         mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
124         self.sd_utils.dacl_add_ace(self.base_dn, mod)
125
126         # add admins to the Domain Admins group
127         self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
128                        add_members_operation=True)
129
130     def tearDown(self):
131         super(SimpleDirsyncTests, self).tearDown()
132         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
133         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
134         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
135         if self.ouname:
136             delete_force(self.ldb_admin, self.ouname)
137         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
138         try:
139             self.ldb_admin.deletegroup("testgroup")
140         except Exception:
141             pass
142
143     #def test_dirsync_errors(self):
144
145
146     def test_dirsync_supported(self):
147         """Test the basic of the dirsync is supported"""
148         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
149         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
150         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
152         try:
153             self.ldb_simple.search(self.base_dn,
154                 expression="samaccountname=*",
155                 controls=["dirsync:1:0:1"])
156         except LdbError,l:
157            self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
158
159     def test_parentGUID_referrals(self):
160         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
161
162         res = self.ldb_admin.search(self.base_dn,
163                                         expression="name=Configuration",
164                                         controls=["dirsync:1:0:1"])
165         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
166
167     def test_ok_not_rootdc(self):
168         """Test if it's ok to do dirsync on another NC that is not the root DC"""
169         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
170                                     expression="samaccountname=*",
171                                     controls=["dirsync:1:0:1"])
172
173     def test_dirsync_errors(self):
174         """Test if dirsync returns the correct LDAP errors in case of pb"""
175         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
176         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
177         try:
178             self.ldb_simple.search(self.base_dn,
179                 expression="samaccountname=*",
180                 controls=["dirsync:1:0:1"])
181         except LdbError,l:
182             print l
183             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
184
185         try:
186             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
187                 expression="samaccountname=*",
188                 controls=["dirsync:1:0:1"])
189         except LdbError,l:
190             print l
191             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
192
193         try:
194             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
195                 expression="samaccountname=*",
196                 controls=["dirsync:1:1:1"])
197         except LdbError,l:
198             print l
199             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
200
201         try:
202             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
203                 expression="samaccountname=*",
204                 controls=["dirsync:1:0:1"])
205         except LdbError,l:
206             print l
207             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
208
209         try:
210             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
211                 expression="samaccountname=*",
212                 controls=["dirsync:1:0:1"])
213         except LdbError,l:
214             print l
215             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
216
217         try:
218             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
219                 expression="samaccountname=*",
220                 controls=["dirsync:1:1:1"])
221         except LdbError,l:
222             print l
223             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
224
225
226
227
228     def test_dirsync_attributes(self):
229         """Check behavior with some attributes """
230         res = self.ldb_admin.search(self.base_dn,
231                                     expression="samaccountname=*",
232                                     controls=["dirsync:1:0:1"])
233         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
234         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
235         # Check that non replicated attributes are not returned
236         self.assertTrue(res.msgs[0].get("badPwdCount") == None)
237         # Check that non forward link are not returned
238         self.assertTrue(res.msgs[0].get("memberof") == None)
239
240         # Asking for instanceType will return also objectGUID
241         res = self.ldb_admin.search(self.base_dn,
242                                     expression="samaccountname=Administrator",
243                                     attrs=["instanceType"],
244                                     controls=["dirsync:1:0:1"])
245         self.assertTrue(res.msgs[0].get("objectGUID") != None)
246         self.assertTrue(res.msgs[0].get("instanceType") != None)
247
248         # We don't return an entry if asked for objectGUID
249         res = self.ldb_admin.search(self.base_dn,
250                                     expression="dn=%s" % self.base_dn,
251                                     attrs=["objectGUID"],
252                                     controls=["dirsync:1:0:1"])
253         self.assertEquals(len(res.msgs), 0)
254
255         # a request on the root of a NC didn't return parentGUID
256         res = self.ldb_admin.search(self.base_dn,
257                                     expression="dn=%s" % self.base_dn,
258                                     attrs=["name"],
259                                     controls=["dirsync:1:0:1"])
260         self.assertTrue(res.msgs[0].get("objectGUID") != None)
261         self.assertTrue(res.msgs[0].get("name") != None)
262         self.assertTrue(res.msgs[0].get("parentGUID") == None)
263         self.assertTrue(res.msgs[0].get("instanceType") != None)
264
265          # Asking for name will return also objectGUID and parentGUID
266         # and instanceType and of course name
267         res = self.ldb_admin.search(self.base_dn,
268                                     expression="samaccountname=Administrator",
269                                     attrs=["name"],
270                                     controls=["dirsync:1:0:1"])
271         self.assertTrue(res.msgs[0].get("objectGUID") != None)
272         self.assertTrue(res.msgs[0].get("name") != None)
273         self.assertTrue(res.msgs[0].get("parentGUID") != None)
274         self.assertTrue(res.msgs[0].get("instanceType") != None)
275
276         # Asking for dn will not return not only DN but more like if attrs=*
277         # parentGUID should be returned
278         res = self.ldb_admin.search(self.base_dn,
279                                     expression="samaccountname=Administrator",
280                                     attrs=["dn"],
281                                     controls=["dirsync:1:0:1"])
282         count = len(res.msgs[0])
283         res2 = self.ldb_admin.search(self.base_dn,
284                                     expression="samaccountname=Administrator",
285                                     controls=["dirsync:1:0:1"])
286         count2 = len(res2.msgs[0])
287         self.assertEqual(count, count2)
288
289         # Asking for cn will return nothing on objects that have CN as RDN
290         res = self.ldb_admin.search(self.base_dn,
291                                     expression="samaccountname=Administrator",
292                                     attrs=["cn"],
293                                     controls=["dirsync:1:0:1"])
294         self.assertEqual(len(res.msgs), 0)
295         # Asking for parentGUID will return nothing too
296         res = self.ldb_admin.search(self.base_dn,
297                                     expression="samaccountname=Administrator",
298                                     attrs=["parentGUID"],
299                                     controls=["dirsync:1:0:1"])
300         self.assertEqual(len(res.msgs), 0)
301         ouname="OU=testou,%s" % self.base_dn
302         self.ouname = ouname
303         self.ldb_admin.create_ou(ouname)
304         delta = Message()
305         delta.dn = Dn(self.ldb_admin, str(ouname))
306         delta["cn"] = MessageElement("test ou",
307                                         FLAG_MOD_ADD,
308                                         "cn" )
309         self.ldb_admin.modify(delta)
310         res = self.ldb_admin.search(self.base_dn,
311                                     expression="name=testou",
312                                     attrs=["cn"],
313                                     controls=["dirsync:1:0:1"])
314
315         self.assertEqual(len(res.msgs), 1)
316         self.assertEqual(len(res.msgs[0]), 3)
317         delete_force(self.ldb_admin, ouname)
318
319     def test_dirsync_with_controls(self):
320         """Check that dirsync return correct informations when dealing with the NC"""
321         res = self.ldb_admin.search(self.base_dn,
322                                     expression="(dn=%s)" % str(self.base_dn),
323                                     attrs=["name"],
324                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
325
326     def test_dirsync_basenc(self):
327         """Check that dirsync return correct informations when dealing with the NC"""
328         res = self.ldb_admin.search(self.base_dn,
329                                     expression="(dn=%s)" % str(self.base_dn),
330                                     attrs=["name"],
331                                     controls=["dirsync:1:0:10000"])
332         self.assertEqual(len(res.msgs), 1)
333         self.assertEqual(len(res.msgs[0]), 3)
334
335         res = self.ldb_admin.search(self.base_dn,
336                                     expression="(dn=%s)" % str(self.base_dn),
337                                     attrs=["ntSecurityDescriptor"],
338                                     controls=["dirsync:1:0:10000"])
339         self.assertEqual(len(res.msgs), 1)
340         self.assertEqual(len(res.msgs[0]), 3)
341
342     def test_dirsync_othernc(self):
343         """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
344         res = self.ldb_admin.search(self.base_dn,
345                                     expression="(objectclass=configuration)",
346                                     attrs=["name"],
347                                     controls=["dirsync:1:0:10000"])
348         self.assertEqual(len(res.msgs), 1)
349         self.assertEqual(len(res.msgs[0]), 4)
350
351         res = self.ldb_admin.search(self.base_dn,
352                                     expression="(objectclass=configuration)",
353                                     attrs=["ntSecurityDescriptor"],
354                                     controls=["dirsync:1:0:10000"])
355         self.assertEqual(len(res.msgs), 1)
356         self.assertEqual(len(res.msgs[0]), 3)
357
358         res = self.ldb_admin.search(self.base_dn,
359                                     expression="(objectclass=domaindns)",
360                                     attrs=["ntSecurityDescriptor"],
361                                     controls=["dirsync:1:0:10000"])
362         nb = len(res.msgs)
363
364         # only sub nc returns a result when asked for objectGUID
365         res = self.ldb_admin.search(self.base_dn,
366                                     expression="(objectclass=domaindns)",
367                                     attrs=["objectGUID"],
368                                     controls=["dirsync:1:0:0"])
369         self.assertEqual(len(res.msgs), nb - 1)
370         if nb > 1:
371             self.assertTrue(res.msgs[0].get("objectGUID") != None)
372         else:
373             res = self.ldb_admin.search(self.base_dn,
374                                         expression="(objectclass=configuration)",
375                                         attrs=["objectGUID"],
376                                         controls=["dirsync:1:0:0"])
377
378
379     def test_dirsync_send_delta(self):
380         """Check that dirsync return correct delta when sending the last cookie"""
381         res = self.ldb_admin.search(self.base_dn,
382                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
383                                     controls=["dirsync:1:0:10000"])
384         ctl = str(res.controls[0]).split(":")
385         ctl[1] = "1"
386         ctl[2] = "0"
387         ctl[3] = "10000"
388         control = str(":".join(ctl))
389         res = self.ldb_admin.search(self.base_dn,
390                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
391                                     controls=[control])
392         self.assertEqual(len(res), 0)
393
394         res = self.ldb_admin.search(self.base_dn,
395                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
396                                     controls=["dirsync:1:0:100000"])
397
398         ctl = str(res.controls[0]).split(":")
399         ctl[1] = "1"
400         ctl[2] = "0"
401         ctl[3] = "10000"
402         control2 = str(":".join(ctl))
403
404         # Let's create an OU
405         ouname="OU=testou2,%s" % self.base_dn
406         self.ouname = ouname
407         self.ldb_admin.create_ou(ouname)
408         res = self.ldb_admin.search(self.base_dn,
409                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
410                                     controls=[control2])
411         self.assertEqual(len(res), 1)
412         ctl = str(res.controls[0]).split(":")
413         ctl[1] = "1"
414         ctl[2] = "0"
415         ctl[3] = "10000"
416         control3 = str(":".join(ctl))
417
418         delta = Message()
419         delta.dn = Dn(self.ldb_admin, str(ouname))
420
421         delta["cn"] = MessageElement("test ou",
422                                         FLAG_MOD_ADD,
423                                         "cn" )
424         self.ldb_admin.modify(delta)
425         res = self.ldb_admin.search(self.base_dn,
426                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
427                                     controls=[control3])
428
429         self.assertEqual(len(res.msgs), 1)
430         # 3 attributes: instanceType, cn and objectGUID
431         self.assertEqual(len(res.msgs[0]), 3)
432
433         delta = Message()
434         delta.dn = Dn(self.ldb_admin, str(ouname))
435         delta["cn"] = MessageElement([],
436                                         FLAG_MOD_DELETE,
437                                         "cn" )
438         self.ldb_admin.modify(delta)
439         res = self.ldb_admin.search(self.base_dn,
440                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
441                                     controls=[control3])
442
443         self.assertEqual(len(res.msgs), 1)
444         # So we won't have much attribute returned but instanceType and GUID
445         # are.
446         # 3 attributes: instanceType and objectGUID and cn but empty
447         self.assertEqual(len(res.msgs[0]), 3)
448         ouname = "OU=newouname,%s" % self.base_dn
449         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
450         self.ouname = ouname
451         ctl = str(res.controls[0]).split(":")
452         ctl[1] = "1"
453         ctl[2] = "0"
454         ctl[3] = "10000"
455         control4 = str(":".join(ctl))
456         res = self.ldb_admin.search(self.base_dn,
457                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
458                                     controls=[control3])
459
460         self.assertTrue(res[0].get("parentGUID") != None)
461         self.assertTrue(res[0].get("name") != None)
462         delete_force(self.ldb_admin, ouname)
463
464     def test_dirsync_linkedattributes(self):
465         """Check that dirsync returnd deleted objects too"""
466         # Let's search for members
467         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
468         res = self.ldb_simple.search(self.base_dn,
469                                     expression="(name=Administrators)",
470                                     controls=["dirsync:1:1:1"])
471
472         self.assertTrue(len(res[0].get("member")) > 0)
473         size = len(res[0].get("member"))
474
475         ctl = str(res.controls[0]).split(":")
476         ctl[1] = "1"
477         ctl[2] = "1"
478         ctl[3] = "10000"
479         control1 = str(":".join(ctl))
480         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
481                        add_members_operation=True)
482
483         res = self.ldb_simple.search(self.base_dn,
484                                     expression="(name=Administrators)",
485                                     controls=[control1])
486
487         self.assertEqual(len(res[0].get("member")), size + 1)
488         ctl = str(res.controls[0]).split(":")
489         ctl[1] = "1"
490         ctl[2] = "1"
491         ctl[3] = "10000"
492         control1 = str(":".join(ctl))
493
494         # remove the user from the group
495         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
496                        add_members_operation=False)
497
498         res = self.ldb_simple.search(self.base_dn,
499                                     expression="(name=Administrators)",
500                                     controls=[control1])
501
502         self.assertEqual(len(res[0].get("member")), size )
503
504         self.ldb_admin.newgroup("testgroup")
505         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
506                        add_members_operation=True)
507
508         res = self.ldb_admin.search(self.base_dn,
509                                     expression="(name=testgroup)",
510                                     controls=["dirsync:1:0:1"])
511
512         self.assertEqual(len(res[0].get("member")), 1)
513         self.assertTrue(res[0].get("member") != "" )
514
515         ctl = str(res.controls[0]).split(":")
516         ctl[1] = "1"
517         ctl[2] = "0"
518         ctl[3] = "1"
519         control1 = str(":".join(ctl))
520
521         # Check that reasking the same question but with an updated cookie
522         # didn't return any results.
523         print control1
524         res = self.ldb_admin.search(self.base_dn,
525                                     expression="(name=testgroup)",
526                                     controls=[control1])
527         self.assertEqual(len(res), 0)
528
529         ctl = str(res.controls[0]).split(":")
530         ctl[1] = "1"
531         ctl[2] = "1"
532         ctl[3] = "10000"
533         control1 = str(":".join(ctl))
534
535         self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
536                        add_members_operation=False)
537
538         res = self.ldb_admin.search(self.base_dn,
539                                     expression="(name=testgroup)",
540                                     attrs=["member"],
541                                     controls=[control1])
542
543         self.ldb_admin.deletegroup("testgroup")
544         self.assertEqual(len(res[0].get("member")), 0)
545
546
547
548     def test_dirsync_deleted_items(self):
549         """Check that dirsync returnd deleted objects too"""
550         # Let's create an OU
551         ouname="OU=testou3,%s" % self.base_dn
552         self.ouname = ouname
553         self.ldb_admin.create_ou(ouname)
554         res = self.ldb_admin.search(self.base_dn,
555                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
556                                     controls=["dirsync:1:0:1"])
557         guid = None
558         for e in res:
559             if str(e["name"]) == "testou3":
560                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
561
562         ctl = str(res.controls[0]).split(":")
563         ctl[1] = "1"
564         ctl[2] = "0"
565         ctl[3] = "10000"
566         control1 = str(":".join(ctl))
567
568         # So now delete the object and check that
569         # we can see the object but deleted when admin
570         delete_force(self.ldb_admin, ouname)
571
572         res = self.ldb_admin.search(self.base_dn,
573                                     expression="(objectClass=organizationalUnit)",
574                                     controls=[control1])
575         self.assertEqual(len(res), 1)
576         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
577         self.assertEqual(guid2, guid)
578         self.assertTrue(res[0].get("isDeleted"))
579         self.assertTrue(res[0].get("name") != None)
580
581     def test_cookie_from_others(self):
582         res = self.ldb_admin.search(self.base_dn,
583                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
584                                     controls=["dirsync:1:0:1"])
585         ctl = str(res.controls[0]).split(":")
586         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
587         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
588         controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
589         res = self.ldb_admin.search(self.base_dn,
590                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
591                                     controls=controls)
592
593 class ExtendedDirsyncTests(SimpleDirsyncTests):
594     def test_dirsync_linkedattributes(self):
595         flag_incr_linked = 2147483648
596         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
597         res = self.ldb_admin.search(self.base_dn,
598                                     attrs=["member"],
599                                     expression="(name=Administrators)",
600                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
601
602         self.assertTrue(res[0].get("member;range=1-1") != None )
603         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
604         size = len(res[0].get("member;range=1-1"))
605
606         ctl = str(res.controls[0]).split(":")
607         ctl[1] = "1"
608         ctl[2] = "%d" % flag_incr_linked
609         ctl[3] = "10000"
610         control1 = str(":".join(ctl))
611         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
612                        add_members_operation=True)
613         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
614                        add_members_operation=True)
615
616
617         res = self.ldb_admin.search(self.base_dn,
618                                     expression="(name=Administrators)",
619                                     controls=[control1])
620
621         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
622         ctl = str(res.controls[0]).split(":")
623         ctl[1] = "1"
624         ctl[2] = "%d" % flag_incr_linked
625         ctl[3] = "10000"
626         control1 = str(":".join(ctl))
627
628         # remove the user from the group
629         self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
630                        add_members_operation=False)
631
632         res = self.ldb_admin.search(self.base_dn,
633                                     expression="(name=Administrators)",
634                                     controls=[control1])
635
636         self.assertEqual(res[0].get("member;range=1-1"), None )
637         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
638
639         ctl = str(res.controls[0]).split(":")
640         ctl[1] = "1"
641         ctl[2] = "%d" % flag_incr_linked
642         ctl[3] = "10000"
643         control2 = str(":".join(ctl))
644
645         self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
646                        add_members_operation=False)
647
648         res = self.ldb_admin.search(self.base_dn,
649                                     expression="(name=Administrators)",
650                                     controls=[control2])
651
652         self.assertEqual(res[0].get("member;range=1-1"), None )
653         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
654
655         res = self.ldb_admin.search(self.base_dn,
656                                     expression="(name=Administrators)",
657                                     controls=[control1])
658
659         self.assertEqual(res[0].get("member;range=1-1"), None )
660         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
661
662     def test_dirsync_deleted_items(self):
663         """Check that dirsync returnd deleted objects too"""
664         # Let's create an OU
665         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
666         ouname="OU=testou3,%s" % self.base_dn
667         self.ouname = ouname
668         self.ldb_admin.create_ou(ouname)
669
670         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
671         res = self.ldb_simple.search(self.base_dn,
672                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
673                                     controls=["dirsync:1:1:1"])
674
675         guid = None
676         for e in res:
677             if str(e["name"]) == "testou3":
678                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
679
680         self.assertTrue(guid != None)
681         ctl = str(res.controls[0]).split(":")
682         ctl[1] = "1"
683         ctl[2] = "1"
684         ctl[3] = "10000"
685         control1 = str(":".join(ctl))
686
687         # So now delete the object and check that
688         # we can see the object but deleted when admin
689         # we just see the objectGUID when simple user
690         delete_force(self.ldb_admin, ouname)
691
692         res = self.ldb_simple.search(self.base_dn,
693                                     expression="(objectClass=organizationalUnit)",
694                                     controls=[control1])
695         self.assertEqual(len(res), 1)
696         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
697         self.assertEqual(guid2, guid)
698         self.assertEqual(str(res[0].dn), "")
699
700
701 ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
702
703 runner = SubunitTestRunner()
704 rc = 0
705 #
706 if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
707     rc = 1
708 if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
709     rc = 1
710
711 sys.exit(rc)