ca3a1e936f6c97aebd6c18762762b855504554f2
[samba.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from __future__ import print_function
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25 import samba
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
27
28 import samba.getopt as options
29 import base64
30
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs, security
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
41 import samba.tests
42 from samba.tests import delete_force
43
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args.pop()
61 if not "://" in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start + 3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76 class DirsyncBaseTests(samba.tests.TestCase):
77
78     def setUp(self):
79         super(DirsyncBaseTests, self).setUp()
80         self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
81         self.base_dn = self.ldb_admin.domain_dn()
82         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
83         self.user_pass = samba.generate_random_password(12, 16)
84         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
85         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
86         #used for anonymous login
87         print("baseDN: %s" % self.base_dn)
88
89     def get_user_dn(self, name):
90         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
91
92     def get_ldb_connection(self, target_username, target_password):
93         creds_tmp = Credentials()
94         creds_tmp.set_username(target_username)
95         creds_tmp.set_password(target_password)
96         creds_tmp.set_domain(creds.get_domain())
97         creds_tmp.set_realm(creds.get_realm())
98         creds_tmp.set_workstation(creds.get_workstation())
99         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100                                       | gensec.FEATURE_SEAL)
101         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
102         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
103         return ldb_target
104
105
106 #tests on ldap add operations
107 class SimpleDirsyncTests(DirsyncBaseTests):
108
109     def setUp(self):
110         super(SimpleDirsyncTests, self).setUp()
111         # Regular user
112         self.dirsync_user = "test_dirsync_user"
113         self.simple_user = "test_simple_user"
114         self.admin_user = "test_admin_user"
115         self.ouname = None
116
117         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
118         self.ldb_admin.newuser(self.simple_user, self.user_pass)
119         self.ldb_admin.newuser(self.admin_user, self.user_pass)
120         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
121
122         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
123         mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
124                                    str(user_sid))
125         self.sd_utils.dacl_add_ace(self.base_dn, mod)
126
127         # add admins to the Domain Admins group
128         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
129                                                 add_members_operation=True)
130
131     def tearDown(self):
132         super(SimpleDirsyncTests, self).tearDown()
133         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
134         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
135         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
136         if self.ouname:
137             delete_force(self.ldb_admin, self.ouname)
138         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
139         try:
140             self.ldb_admin.deletegroup("testgroup")
141         except Exception:
142             pass
143
144     #def test_dirsync_errors(self):
145
146     def test_dirsync_supported(self):
147         """Test the basic of the dirsync is supported"""
148         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
149         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
150         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
151         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
152         try:
153             self.ldb_simple.search(self.base_dn,
154                                    expression="samaccountname=*",
155                                    controls=["dirsync:1:0:1"])
156         except LdbError as l:
157             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
158
159     def test_parentGUID_referrals(self):
160         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
161
162         res = self.ldb_admin.search(self.base_dn,
163                                     expression="name=Configuration",
164                                     controls=["dirsync:1:0:1"])
165         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
166
167     def test_ok_not_rootdc(self):
168         """Test if it's ok to do dirsync on another NC that is not the root DC"""
169         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
170                               expression="samaccountname=*",
171                               controls=["dirsync:1:0:1"])
172
173     def test_dirsync_errors(self):
174         """Test if dirsync returns the correct LDAP errors in case of pb"""
175         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
176         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
177         try:
178             self.ldb_simple.search(self.base_dn,
179                                    expression="samaccountname=*",
180                                    controls=["dirsync:1:0:1"])
181         except LdbError as l:
182             print(l)
183             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
184
185         try:
186             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
187                                    expression="samaccountname=*",
188                                    controls=["dirsync:1:0:1"])
189         except LdbError as l:
190             print(l)
191             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
192
193         try:
194             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
195                                    expression="samaccountname=*",
196                                    controls=["dirsync:1:1:1"])
197         except LdbError as l:
198             print(l)
199             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
200
201         try:
202             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
203                                     expression="samaccountname=*",
204                                     controls=["dirsync:1:0:1"])
205         except LdbError as l:
206             print(l)
207             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
208
209         try:
210             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
211                                   expression="samaccountname=*",
212                                   controls=["dirsync:1:0:1"])
213         except LdbError as l:
214             print(l)
215             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
216
217         try:
218             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
219                                   expression="samaccountname=*",
220                                   controls=["dirsync:1:1:1"])
221         except LdbError as l:
222             print(l)
223             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
224
225     def test_dirsync_attributes(self):
226         """Check behavior with some attributes """
227         res = self.ldb_admin.search(self.base_dn,
228                                     expression="samaccountname=*",
229                                     controls=["dirsync:1:0:1"])
230         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
231         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
232         # Check that non replicated attributes are not returned
233         self.assertTrue(res.msgs[0].get("badPwdCount") == None)
234         # Check that non forward link are not returned
235         self.assertTrue(res.msgs[0].get("memberof") == None)
236
237         # Asking for instanceType will return also objectGUID
238         res = self.ldb_admin.search(self.base_dn,
239                                     expression="samaccountname=Administrator",
240                                     attrs=["instanceType"],
241                                     controls=["dirsync:1:0:1"])
242         self.assertTrue(res.msgs[0].get("objectGUID") != None)
243         self.assertTrue(res.msgs[0].get("instanceType") != None)
244
245         # We don't return an entry if asked for objectGUID
246         res = self.ldb_admin.search(self.base_dn,
247                                     expression="(distinguishedName=%s)" % str(self.base_dn),
248                                     attrs=["objectGUID"],
249                                     controls=["dirsync:1:0:1"])
250         self.assertEquals(len(res.msgs), 0)
251
252         # a request on the root of a NC didn't return parentGUID
253         res = self.ldb_admin.search(self.base_dn,
254                                     expression="(distinguishedName=%s)" % str(self.base_dn),
255                                     attrs=["name"],
256                                     controls=["dirsync:1:0:1"])
257         self.assertTrue(res.msgs[0].get("objectGUID") != None)
258         self.assertTrue(res.msgs[0].get("name") != None)
259         self.assertTrue(res.msgs[0].get("parentGUID") == None)
260         self.assertTrue(res.msgs[0].get("instanceType") != None)
261
262         # Asking for name will return also objectGUID and parentGUID
263         # and instanceType and of course name
264         res = self.ldb_admin.search(self.base_dn,
265                                     expression="samaccountname=Administrator",
266                                     attrs=["name"],
267                                     controls=["dirsync:1:0:1"])
268         self.assertTrue(res.msgs[0].get("objectGUID") != None)
269         self.assertTrue(res.msgs[0].get("name") != None)
270         self.assertTrue(res.msgs[0].get("parentGUID") != None)
271         self.assertTrue(res.msgs[0].get("instanceType") != None)
272
273         # Asking for dn will not return not only DN but more like if attrs=*
274         # parentGUID should be returned
275         res = self.ldb_admin.search(self.base_dn,
276                                     expression="samaccountname=Administrator",
277                                     attrs=["dn"],
278                                     controls=["dirsync:1:0:1"])
279         count = len(res.msgs[0])
280         res2 = self.ldb_admin.search(self.base_dn,
281                                      expression="samaccountname=Administrator",
282                                      controls=["dirsync:1:0:1"])
283         count2 = len(res2.msgs[0])
284         self.assertEqual(count, count2)
285
286         # Asking for cn will return nothing on objects that have CN as RDN
287         res = self.ldb_admin.search(self.base_dn,
288                                     expression="samaccountname=Administrator",
289                                     attrs=["cn"],
290                                     controls=["dirsync:1:0:1"])
291         self.assertEqual(len(res.msgs), 0)
292         # Asking for parentGUID will return nothing too
293         res = self.ldb_admin.search(self.base_dn,
294                                     expression="samaccountname=Administrator",
295                                     attrs=["parentGUID"],
296                                     controls=["dirsync:1:0:1"])
297         self.assertEqual(len(res.msgs), 0)
298         ouname = "OU=testou,%s" % self.base_dn
299         self.ouname = ouname
300         self.ldb_admin.create_ou(ouname)
301         delta = Message()
302         delta.dn = Dn(self.ldb_admin, str(ouname))
303         delta["cn"] = MessageElement("test ou",
304                                      FLAG_MOD_ADD,
305                                      "cn")
306         self.ldb_admin.modify(delta)
307         res = self.ldb_admin.search(self.base_dn,
308                                     expression="name=testou",
309                                     attrs=["cn"],
310                                     controls=["dirsync:1:0:1"])
311
312         self.assertEqual(len(res.msgs), 1)
313         self.assertEqual(len(res.msgs[0]), 3)
314         delete_force(self.ldb_admin, ouname)
315
316     def test_dirsync_with_controls(self):
317         """Check that dirsync return correct informations when dealing with the NC"""
318         res = self.ldb_admin.search(self.base_dn,
319                                     expression="(distinguishedName=%s)" % str(self.base_dn),
320                                     attrs=["name"],
321                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
322
323     def test_dirsync_basenc(self):
324         """Check that dirsync return correct informations when dealing with the NC"""
325         res = self.ldb_admin.search(self.base_dn,
326                                     expression="(distinguishedName=%s)" % str(self.base_dn),
327                                     attrs=["name"],
328                                     controls=["dirsync:1:0:10000"])
329         self.assertEqual(len(res.msgs), 1)
330         self.assertEqual(len(res.msgs[0]), 3)
331
332         res = self.ldb_admin.search(self.base_dn,
333                                     expression="(distinguishedName=%s)" % str(self.base_dn),
334                                     attrs=["ntSecurityDescriptor"],
335                                     controls=["dirsync:1:0:10000"])
336         self.assertEqual(len(res.msgs), 1)
337         self.assertEqual(len(res.msgs[0]), 3)
338
339     def test_dirsync_othernc(self):
340         """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
341         res = self.ldb_admin.search(self.base_dn,
342                                     expression="(objectclass=configuration)",
343                                     attrs=["name"],
344                                     controls=["dirsync:1:0:10000"])
345         self.assertEqual(len(res.msgs), 1)
346         self.assertEqual(len(res.msgs[0]), 4)
347
348         res = self.ldb_admin.search(self.base_dn,
349                                     expression="(objectclass=configuration)",
350                                     attrs=["ntSecurityDescriptor"],
351                                     controls=["dirsync:1:0:10000"])
352         self.assertEqual(len(res.msgs), 1)
353         self.assertEqual(len(res.msgs[0]), 3)
354
355         res = self.ldb_admin.search(self.base_dn,
356                                     expression="(objectclass=domaindns)",
357                                     attrs=["ntSecurityDescriptor"],
358                                     controls=["dirsync:1:0:10000"])
359         nb = len(res.msgs)
360
361         # only sub nc returns a result when asked for objectGUID
362         res = self.ldb_admin.search(self.base_dn,
363                                     expression="(objectclass=domaindns)",
364                                     attrs=["objectGUID"],
365                                     controls=["dirsync:1:0:0"])
366         self.assertEqual(len(res.msgs), nb - 1)
367         if nb > 1:
368             self.assertTrue(res.msgs[0].get("objectGUID") != None)
369         else:
370             res = self.ldb_admin.search(self.base_dn,
371                                         expression="(objectclass=configuration)",
372                                         attrs=["objectGUID"],
373                                         controls=["dirsync:1:0:0"])
374
375
376     def test_dirsync_send_delta(self):
377         """Check that dirsync return correct delta when sending the last cookie"""
378         res = self.ldb_admin.search(self.base_dn,
379                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
380                                     controls=["dirsync:1:0:10000"])
381         ctl = str(res.controls[0]).split(":")
382         ctl[1] = "1"
383         ctl[2] = "0"
384         ctl[3] = "10000"
385         control = str(":".join(ctl))
386         res = self.ldb_admin.search(self.base_dn,
387                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
388                                     controls=[control])
389         self.assertEqual(len(res), 0)
390
391         res = self.ldb_admin.search(self.base_dn,
392                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
393                                     controls=["dirsync:1:0:100000"])
394
395         ctl = str(res.controls[0]).split(":")
396         ctl[1] = "1"
397         ctl[2] = "0"
398         ctl[3] = "10000"
399         control2 = str(":".join(ctl))
400
401         # Let's create an OU
402         ouname = "OU=testou2,%s" % self.base_dn
403         self.ouname = ouname
404         self.ldb_admin.create_ou(ouname)
405         res = self.ldb_admin.search(self.base_dn,
406                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
407                                     controls=[control2])
408         self.assertEqual(len(res), 1)
409         ctl = str(res.controls[0]).split(":")
410         ctl[1] = "1"
411         ctl[2] = "0"
412         ctl[3] = "10000"
413         control3 = str(":".join(ctl))
414
415         delta = Message()
416         delta.dn = Dn(self.ldb_admin, str(ouname))
417
418         delta["cn"] = MessageElement("test ou",
419                                      FLAG_MOD_ADD,
420                                      "cn")
421         self.ldb_admin.modify(delta)
422         res = self.ldb_admin.search(self.base_dn,
423                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
424                                     controls=[control3])
425
426         self.assertEqual(len(res.msgs), 1)
427         # 3 attributes: instanceType, cn and objectGUID
428         self.assertEqual(len(res.msgs[0]), 3)
429
430         delta = Message()
431         delta.dn = Dn(self.ldb_admin, str(ouname))
432         delta["cn"] = MessageElement([],
433                                      FLAG_MOD_DELETE,
434                                      "cn")
435         self.ldb_admin.modify(delta)
436         res = self.ldb_admin.search(self.base_dn,
437                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
438                                     controls=[control3])
439
440         self.assertEqual(len(res.msgs), 1)
441         # So we won't have much attribute returned but instanceType and GUID
442         # are.
443         # 3 attributes: instanceType and objectGUID and cn but empty
444         self.assertEqual(len(res.msgs[0]), 3)
445         ouname = "OU=newouname,%s" % self.base_dn
446         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
447         self.ouname = ouname
448         ctl = str(res.controls[0]).split(":")
449         ctl[1] = "1"
450         ctl[2] = "0"
451         ctl[3] = "10000"
452         control4 = str(":".join(ctl))
453         res = self.ldb_admin.search(self.base_dn,
454                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
455                                     controls=[control3])
456
457         self.assertTrue(res[0].get("parentGUID") != None)
458         self.assertTrue(res[0].get("name") != None)
459         delete_force(self.ldb_admin, ouname)
460
461     def test_dirsync_linkedattributes(self):
462         """Check that dirsync returnd deleted objects too"""
463         # Let's search for members
464         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
465         res = self.ldb_simple.search(self.base_dn,
466                                      expression="(name=Administrators)",
467                                      controls=["dirsync:1:1:1"])
468
469         self.assertTrue(len(res[0].get("member")) > 0)
470         size = len(res[0].get("member"))
471
472         ctl = str(res.controls[0]).split(":")
473         ctl[1] = "1"
474         ctl[2] = "1"
475         ctl[3] = "10000"
476         control1 = str(":".join(ctl))
477         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
478                                                 add_members_operation=True)
479
480         res = self.ldb_simple.search(self.base_dn,
481                                      expression="(name=Administrators)",
482                                      controls=[control1])
483
484         self.assertEqual(len(res[0].get("member")), size + 1)
485         ctl = str(res.controls[0]).split(":")
486         ctl[1] = "1"
487         ctl[2] = "1"
488         ctl[3] = "10000"
489         control1 = str(":".join(ctl))
490
491         # remove the user from the group
492         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
493                                                 add_members_operation=False)
494
495         res = self.ldb_simple.search(self.base_dn,
496                                      expression="(name=Administrators)",
497                                      controls=[control1])
498
499         self.assertEqual(len(res[0].get("member")), size)
500
501         self.ldb_admin.newgroup("testgroup")
502         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
503                                                 add_members_operation=True)
504
505         res = self.ldb_admin.search(self.base_dn,
506                                     expression="(name=testgroup)",
507                                     controls=["dirsync:1:0:1"])
508
509         self.assertEqual(len(res[0].get("member")), 1)
510         self.assertTrue(res[0].get("member") != "")
511
512         ctl = str(res.controls[0]).split(":")
513         ctl[1] = "1"
514         ctl[2] = "0"
515         ctl[3] = "1"
516         control1 = str(":".join(ctl))
517
518         # Check that reasking the same question but with an updated cookie
519         # didn't return any results.
520         print(control1)
521         res = self.ldb_admin.search(self.base_dn,
522                                     expression="(name=testgroup)",
523                                     controls=[control1])
524         self.assertEqual(len(res), 0)
525
526         ctl = str(res.controls[0]).split(":")
527         ctl[1] = "1"
528         ctl[2] = "1"
529         ctl[3] = "10000"
530         control1 = str(":".join(ctl))
531
532         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
533                                                 add_members_operation=False)
534
535         res = self.ldb_admin.search(self.base_dn,
536                                     expression="(name=testgroup)",
537                                     attrs=["member"],
538                                     controls=[control1])
539
540         self.ldb_admin.deletegroup("testgroup")
541         self.assertEqual(len(res[0].get("member")), 0)
542
543
544
545     def test_dirsync_deleted_items(self):
546         """Check that dirsync returnd deleted objects too"""
547         # Let's create an OU
548         ouname = "OU=testou3,%s" % self.base_dn
549         self.ouname = ouname
550         self.ldb_admin.create_ou(ouname)
551         res = self.ldb_admin.search(self.base_dn,
552                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
553                                     controls=["dirsync:1:0:1"])
554         guid = None
555         for e in res:
556             if str(e["name"]) == "testou3":
557                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
558
559         ctl = str(res.controls[0]).split(":")
560         ctl[1] = "1"
561         ctl[2] = "0"
562         ctl[3] = "10000"
563         control1 = str(":".join(ctl))
564
565         # So now delete the object and check that
566         # we can see the object but deleted when admin
567         delete_force(self.ldb_admin, ouname)
568
569         res = self.ldb_admin.search(self.base_dn,
570                                     expression="(objectClass=organizationalUnit)",
571                                     controls=[control1])
572         self.assertEqual(len(res), 1)
573         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
574         self.assertEqual(guid2, guid)
575         self.assertTrue(res[0].get("isDeleted"))
576         self.assertTrue(res[0].get("name") != None)
577
578     def test_cookie_from_others(self):
579         res = self.ldb_admin.search(self.base_dn,
580                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
581                                     controls=["dirsync:1:0:1"])
582         ctl = str(res.controls[0]).split(":")
583         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
584         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
585         controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
586         res = self.ldb_admin.search(self.base_dn,
587                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
588                                     controls=controls)
589
590
591 class ExtendedDirsyncTests(SimpleDirsyncTests):
592
593     def test_dirsync_linkedattributes(self):
594         flag_incr_linked = 2147483648
595         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
596         res = self.ldb_admin.search(self.base_dn,
597                                     attrs=["member"],
598                                     expression="(name=Administrators)",
599                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
600
601         self.assertTrue(res[0].get("member;range=1-1") != None)
602         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
603         size = len(res[0].get("member;range=1-1"))
604
605         ctl = str(res.controls[0]).split(":")
606         ctl[1] = "1"
607         ctl[2] = "%d" % flag_incr_linked
608         ctl[3] = "10000"
609         control1 = str(":".join(ctl))
610         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
611                                                 add_members_operation=True)
612         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
613                                                 add_members_operation=True)
614
615
616         res = self.ldb_admin.search(self.base_dn,
617                                     expression="(name=Administrators)",
618                                     controls=[control1])
619
620         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
621         ctl = str(res.controls[0]).split(":")
622         ctl[1] = "1"
623         ctl[2] = "%d" % flag_incr_linked
624         ctl[3] = "10000"
625         control1 = str(":".join(ctl))
626
627         # remove the user from the group
628         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
629                                                 add_members_operation=False)
630
631         res = self.ldb_admin.search(self.base_dn,
632                                     expression="(name=Administrators)",
633                                     controls=[control1])
634
635         self.assertEqual(res[0].get("member;range=1-1"), None)
636         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
637
638         ctl = str(res.controls[0]).split(":")
639         ctl[1] = "1"
640         ctl[2] = "%d" % flag_incr_linked
641         ctl[3] = "10000"
642         control2 = str(":".join(ctl))
643
644         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
645                                                 add_members_operation=False)
646
647         res = self.ldb_admin.search(self.base_dn,
648                                     expression="(name=Administrators)",
649                                     controls=[control2])
650
651         self.assertEqual(res[0].get("member;range=1-1"), None)
652         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
653
654         res = self.ldb_admin.search(self.base_dn,
655                                     expression="(name=Administrators)",
656                                     controls=[control1])
657
658         self.assertEqual(res[0].get("member;range=1-1"), None)
659         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
660
661     def test_dirsync_deleted_items(self):
662         """Check that dirsync returnd deleted objects too"""
663         # Let's create an OU
664         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
665         ouname = "OU=testou3,%s" % self.base_dn
666         self.ouname = ouname
667         self.ldb_admin.create_ou(ouname)
668
669         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
670         res = self.ldb_simple.search(self.base_dn,
671                                      expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
672                                      controls=["dirsync:1:1:1"])
673
674         guid = None
675         for e in res:
676             if str(e["name"]) == "testou3":
677                 guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
678
679         self.assertTrue(guid != None)
680         ctl = str(res.controls[0]).split(":")
681         ctl[1] = "1"
682         ctl[2] = "1"
683         ctl[3] = "10000"
684         control1 = str(":".join(ctl))
685
686         # So now delete the object and check that
687         # we can see the object but deleted when admin
688         # we just see the objectGUID when simple user
689         delete_force(self.ldb_admin, ouname)
690
691         res = self.ldb_simple.search(self.base_dn,
692                                      expression="(objectClass=organizationalUnit)",
693                                      controls=[control1])
694         self.assertEqual(len(res), 1)
695         guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
696         self.assertEqual(guid2, guid)
697         self.assertEqual(str(res[0].dn), "")
698
699
700 if not getattr(opts, "listtests", False):
701     lp = sambaopts.get_loadparm()
702     samba.tests.cmdline_credentials = credopts.get_credentials(lp)
703
704
705 TestProgram(module=__name__, opts=subunitopts)