PEP8: fix E302: expected 2 blank lines, found 1
[samba.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from __future__ import print_function
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25 import samba
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
27
28 import samba.getopt as options
29 import base64
30
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs, security
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
41 import samba.tests
42 from samba.tests import delete_force
43
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args.pop()
61 if not "://" in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start + 3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76
77 class DirsyncBaseTests(samba.tests.TestCase):
78
79     def setUp(self):
80         super(DirsyncBaseTests, self).setUp()
81         self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
82         self.base_dn = self.ldb_admin.domain_dn()
83         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
84         self.user_pass = samba.generate_random_password(12, 16)
85         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
86         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
87         # used for anonymous login
88         print("baseDN: %s" % self.base_dn)
89
90     def get_user_dn(self, name):
91         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
92
93     def get_ldb_connection(self, target_username, target_password):
94         creds_tmp = Credentials()
95         creds_tmp.set_username(target_username)
96         creds_tmp.set_password(target_password)
97         creds_tmp.set_domain(creds.get_domain())
98         creds_tmp.set_realm(creds.get_realm())
99         creds_tmp.set_workstation(creds.get_workstation())
100         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
101                                       | gensec.FEATURE_SEAL)
102         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
103         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
104         return ldb_target
105
106
107 # tests on ldap add operations
108 class SimpleDirsyncTests(DirsyncBaseTests):
109
110     def setUp(self):
111         super(SimpleDirsyncTests, self).setUp()
112         # Regular user
113         self.dirsync_user = "test_dirsync_user"
114         self.simple_user = "test_simple_user"
115         self.admin_user = "test_admin_user"
116         self.ouname = None
117
118         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
119         self.ldb_admin.newuser(self.simple_user, self.user_pass)
120         self.ldb_admin.newuser(self.admin_user, self.user_pass)
121         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
122
123         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
124         mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
125                                    str(user_sid))
126         self.sd_utils.dacl_add_ace(self.base_dn, mod)
127
128         # add admins to the Domain Admins group
129         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
130                                                 add_members_operation=True)
131
132     def tearDown(self):
133         super(SimpleDirsyncTests, self).tearDown()
134         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
135         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
136         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
137         if self.ouname:
138             delete_force(self.ldb_admin, self.ouname)
139         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
140         try:
141             self.ldb_admin.deletegroup("testgroup")
142         except Exception:
143             pass
144
145     # def test_dirsync_errors(self):
146
147     def test_dirsync_supported(self):
148         """Test the basic of the dirsync is supported"""
149         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
150         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
151         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
152         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
153         try:
154             self.ldb_simple.search(self.base_dn,
155                                    expression="samaccountname=*",
156                                    controls=["dirsync:1:0:1"])
157         except LdbError as l:
158             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
159
160     def test_parentGUID_referrals(self):
161         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
162
163         res = self.ldb_admin.search(self.base_dn,
164                                     expression="name=Configuration",
165                                     controls=["dirsync:1:0:1"])
166         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
167
168     def test_ok_not_rootdc(self):
169         """Test if it's ok to do dirsync on another NC that is not the root DC"""
170         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
171                               expression="samaccountname=*",
172                               controls=["dirsync:1:0:1"])
173
174     def test_dirsync_errors(self):
175         """Test if dirsync returns the correct LDAP errors in case of pb"""
176         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
177         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
178         try:
179             self.ldb_simple.search(self.base_dn,
180                                    expression="samaccountname=*",
181                                    controls=["dirsync:1:0:1"])
182         except LdbError as l:
183             print(l)
184             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
185
186         try:
187             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
188                                    expression="samaccountname=*",
189                                    controls=["dirsync:1:0:1"])
190         except LdbError as l:
191             print(l)
192             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
193
194         try:
195             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
196                                    expression="samaccountname=*",
197                                    controls=["dirsync:1:1:1"])
198         except LdbError as l:
199             print(l)
200             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
201
202         try:
203             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
204                                     expression="samaccountname=*",
205                                     controls=["dirsync:1:0:1"])
206         except LdbError as l:
207             print(l)
208             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
209
210         try:
211             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
212                                   expression="samaccountname=*",
213                                   controls=["dirsync:1:0:1"])
214         except LdbError as l:
215             print(l)
216             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
217
218         try:
219             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
220                                   expression="samaccountname=*",
221                                   controls=["dirsync:1:1:1"])
222         except LdbError as l:
223             print(l)
224             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
225
226     def test_dirsync_attributes(self):
227         """Check behavior with some attributes """
228         res = self.ldb_admin.search(self.base_dn,
229                                     expression="samaccountname=*",
230                                     controls=["dirsync:1:0:1"])
231         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
232         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
233         # Check that non replicated attributes are not returned
234         self.assertTrue(res.msgs[0].get("badPwdCount") == None)
235         # Check that non forward link are not returned
236         self.assertTrue(res.msgs[0].get("memberof") == None)
237
238         # Asking for instanceType will return also objectGUID
239         res = self.ldb_admin.search(self.base_dn,
240                                     expression="samaccountname=Administrator",
241                                     attrs=["instanceType"],
242                                     controls=["dirsync:1:0:1"])
243         self.assertTrue(res.msgs[0].get("objectGUID") != None)
244         self.assertTrue(res.msgs[0].get("instanceType") != None)
245
246         # We don't return an entry if asked for objectGUID
247         res = self.ldb_admin.search(self.base_dn,
248                                     expression="(distinguishedName=%s)" % str(self.base_dn),
249                                     attrs=["objectGUID"],
250                                     controls=["dirsync:1:0:1"])
251         self.assertEquals(len(res.msgs), 0)
252
253         # a request on the root of a NC didn't return parentGUID
254         res = self.ldb_admin.search(self.base_dn,
255                                     expression="(distinguishedName=%s)" % str(self.base_dn),
256                                     attrs=["name"],
257                                     controls=["dirsync:1:0:1"])
258         self.assertTrue(res.msgs[0].get("objectGUID") != None)
259         self.assertTrue(res.msgs[0].get("name") != None)
260         self.assertTrue(res.msgs[0].get("parentGUID") == None)
261         self.assertTrue(res.msgs[0].get("instanceType") != None)
262
263         # Asking for name will return also objectGUID and parentGUID
264         # and instanceType and of course name
265         res = self.ldb_admin.search(self.base_dn,
266                                     expression="samaccountname=Administrator",
267                                     attrs=["name"],
268                                     controls=["dirsync:1:0:1"])
269         self.assertTrue(res.msgs[0].get("objectGUID") != None)
270         self.assertTrue(res.msgs[0].get("name") != None)
271         self.assertTrue(res.msgs[0].get("parentGUID") != None)
272         self.assertTrue(res.msgs[0].get("instanceType") != None)
273
274         # Asking for dn will not return not only DN but more like if attrs=*
275         # parentGUID should be returned
276         res = self.ldb_admin.search(self.base_dn,
277                                     expression="samaccountname=Administrator",
278                                     attrs=["dn"],
279                                     controls=["dirsync:1:0:1"])
280         count = len(res.msgs[0])
281         res2 = self.ldb_admin.search(self.base_dn,
282                                      expression="samaccountname=Administrator",
283                                      controls=["dirsync:1:0:1"])
284         count2 = len(res2.msgs[0])
285         self.assertEqual(count, count2)
286
287         # Asking for cn will return nothing on objects that have CN as RDN
288         res = self.ldb_admin.search(self.base_dn,
289                                     expression="samaccountname=Administrator",
290                                     attrs=["cn"],
291                                     controls=["dirsync:1:0:1"])
292         self.assertEqual(len(res.msgs), 0)
293         # Asking for parentGUID will return nothing too
294         res = self.ldb_admin.search(self.base_dn,
295                                     expression="samaccountname=Administrator",
296                                     attrs=["parentGUID"],
297                                     controls=["dirsync:1:0:1"])
298         self.assertEqual(len(res.msgs), 0)
299         ouname = "OU=testou,%s" % self.base_dn
300         self.ouname = ouname
301         self.ldb_admin.create_ou(ouname)
302         delta = Message()
303         delta.dn = Dn(self.ldb_admin, str(ouname))
304         delta["cn"] = MessageElement("test ou",
305                                      FLAG_MOD_ADD,
306                                      "cn")
307         self.ldb_admin.modify(delta)
308         res = self.ldb_admin.search(self.base_dn,
309                                     expression="name=testou",
310                                     attrs=["cn"],
311                                     controls=["dirsync:1:0:1"])
312
313         self.assertEqual(len(res.msgs), 1)
314         self.assertEqual(len(res.msgs[0]), 3)
315         delete_force(self.ldb_admin, ouname)
316
317     def test_dirsync_with_controls(self):
318         """Check that dirsync return correct informations when dealing with the NC"""
319         res = self.ldb_admin.search(self.base_dn,
320                                     expression="(distinguishedName=%s)" % str(self.base_dn),
321                                     attrs=["name"],
322                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
323
324     def test_dirsync_basenc(self):
325         """Check that dirsync return correct informations when dealing with the NC"""
326         res = self.ldb_admin.search(self.base_dn,
327                                     expression="(distinguishedName=%s)" % str(self.base_dn),
328                                     attrs=["name"],
329                                     controls=["dirsync:1:0:10000"])
330         self.assertEqual(len(res.msgs), 1)
331         self.assertEqual(len(res.msgs[0]), 3)
332
333         res = self.ldb_admin.search(self.base_dn,
334                                     expression="(distinguishedName=%s)" % str(self.base_dn),
335                                     attrs=["ntSecurityDescriptor"],
336                                     controls=["dirsync:1:0:10000"])
337         self.assertEqual(len(res.msgs), 1)
338         self.assertEqual(len(res.msgs[0]), 3)
339
340     def test_dirsync_othernc(self):
341         """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
342         res = self.ldb_admin.search(self.base_dn,
343                                     expression="(objectclass=configuration)",
344                                     attrs=["name"],
345                                     controls=["dirsync:1:0:10000"])
346         self.assertEqual(len(res.msgs), 1)
347         self.assertEqual(len(res.msgs[0]), 4)
348
349         res = self.ldb_admin.search(self.base_dn,
350                                     expression="(objectclass=configuration)",
351                                     attrs=["ntSecurityDescriptor"],
352                                     controls=["dirsync:1:0:10000"])
353         self.assertEqual(len(res.msgs), 1)
354         self.assertEqual(len(res.msgs[0]), 3)
355
356         res = self.ldb_admin.search(self.base_dn,
357                                     expression="(objectclass=domaindns)",
358                                     attrs=["ntSecurityDescriptor"],
359                                     controls=["dirsync:1:0:10000"])
360         nb = len(res.msgs)
361
362         # only sub nc returns a result when asked for objectGUID
363         res = self.ldb_admin.search(self.base_dn,
364                                     expression="(objectclass=domaindns)",
365                                     attrs=["objectGUID"],
366                                     controls=["dirsync:1:0:0"])
367         self.assertEqual(len(res.msgs), nb - 1)
368         if nb > 1:
369             self.assertTrue(res.msgs[0].get("objectGUID") != None)
370         else:
371             res = self.ldb_admin.search(self.base_dn,
372                                         expression="(objectclass=configuration)",
373                                         attrs=["objectGUID"],
374                                         controls=["dirsync:1:0:0"])
375
376
377     def test_dirsync_send_delta(self):
378         """Check that dirsync return correct delta when sending the last cookie"""
379         res = self.ldb_admin.search(self.base_dn,
380                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
381                                     controls=["dirsync:1:0:10000"])
382         ctl = str(res.controls[0]).split(":")
383         ctl[1] = "1"
384         ctl[2] = "0"
385         ctl[3] = "10000"
386         control = str(":".join(ctl))
387         res = self.ldb_admin.search(self.base_dn,
388                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
389                                     controls=[control])
390         self.assertEqual(len(res), 0)
391
392         res = self.ldb_admin.search(self.base_dn,
393                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
394                                     controls=["dirsync:1:0:100000"])
395
396         ctl = str(res.controls[0]).split(":")
397         ctl[1] = "1"
398         ctl[2] = "0"
399         ctl[3] = "10000"
400         control2 = str(":".join(ctl))
401
402         # Let's create an OU
403         ouname = "OU=testou2,%s" % self.base_dn
404         self.ouname = ouname
405         self.ldb_admin.create_ou(ouname)
406         res = self.ldb_admin.search(self.base_dn,
407                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
408                                     controls=[control2])
409         self.assertEqual(len(res), 1)
410         ctl = str(res.controls[0]).split(":")
411         ctl[1] = "1"
412         ctl[2] = "0"
413         ctl[3] = "10000"
414         control3 = str(":".join(ctl))
415
416         delta = Message()
417         delta.dn = Dn(self.ldb_admin, str(ouname))
418
419         delta["cn"] = MessageElement("test ou",
420                                      FLAG_MOD_ADD,
421                                      "cn")
422         self.ldb_admin.modify(delta)
423         res = self.ldb_admin.search(self.base_dn,
424                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
425                                     controls=[control3])
426
427         self.assertEqual(len(res.msgs), 1)
428         # 3 attributes: instanceType, cn and objectGUID
429         self.assertEqual(len(res.msgs[0]), 3)
430
431         delta = Message()
432         delta.dn = Dn(self.ldb_admin, str(ouname))
433         delta["cn"] = MessageElement([],
434                                      FLAG_MOD_DELETE,
435                                      "cn")
436         self.ldb_admin.modify(delta)
437         res = self.ldb_admin.search(self.base_dn,
438                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
439                                     controls=[control3])
440
441         self.assertEqual(len(res.msgs), 1)
442         # So we won't have much attribute returned but instanceType and GUID
443         # are.
444         # 3 attributes: instanceType and objectGUID and cn but empty
445         self.assertEqual(len(res.msgs[0]), 3)
446         ouname = "OU=newouname,%s" % self.base_dn
447         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
448         self.ouname = ouname
449         ctl = str(res.controls[0]).split(":")
450         ctl[1] = "1"
451         ctl[2] = "0"
452         ctl[3] = "10000"
453         control4 = str(":".join(ctl))
454         res = self.ldb_admin.search(self.base_dn,
455                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
456                                     controls=[control3])
457
458         self.assertTrue(res[0].get("parentGUID") != None)
459         self.assertTrue(res[0].get("name") != None)
460         delete_force(self.ldb_admin, ouname)
461
462     def test_dirsync_linkedattributes(self):
463         """Check that dirsync returnd deleted objects too"""
464         # Let's search for members
465         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
466         res = self.ldb_simple.search(self.base_dn,
467                                      expression="(name=Administrators)",
468                                      controls=["dirsync:1:1:1"])
469
470         self.assertTrue(len(res[0].get("member")) > 0)
471         size = len(res[0].get("member"))
472
473         ctl = str(res.controls[0]).split(":")
474         ctl[1] = "1"
475         ctl[2] = "1"
476         ctl[3] = "10000"
477         control1 = str(":".join(ctl))
478         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
479                                                 add_members_operation=True)
480
481         res = self.ldb_simple.search(self.base_dn,
482                                      expression="(name=Administrators)",
483                                      controls=[control1])
484
485         self.assertEqual(len(res[0].get("member")), size + 1)
486         ctl = str(res.controls[0]).split(":")
487         ctl[1] = "1"
488         ctl[2] = "1"
489         ctl[3] = "10000"
490         control1 = str(":".join(ctl))
491
492         # remove the user from the group
493         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
494                                                 add_members_operation=False)
495
496         res = self.ldb_simple.search(self.base_dn,
497                                      expression="(name=Administrators)",
498                                      controls=[control1])
499
500         self.assertEqual(len(res[0].get("member")), size)
501
502         self.ldb_admin.newgroup("testgroup")
503         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
504                                                 add_members_operation=True)
505
506         res = self.ldb_admin.search(self.base_dn,
507                                     expression="(name=testgroup)",
508                                     controls=["dirsync:1:0:1"])
509
510         self.assertEqual(len(res[0].get("member")), 1)
511         self.assertTrue(res[0].get("member") != "")
512
513         ctl = str(res.controls[0]).split(":")
514         ctl[1] = "1"
515         ctl[2] = "0"
516         ctl[3] = "1"
517         control1 = str(":".join(ctl))
518
519         # Check that reasking the same question but with an updated cookie
520         # didn't return any results.
521         print(control1)
522         res = self.ldb_admin.search(self.base_dn,
523                                     expression="(name=testgroup)",
524                                     controls=[control1])
525         self.assertEqual(len(res), 0)
526
527         ctl = str(res.controls[0]).split(":")
528         ctl[1] = "1"
529         ctl[2] = "1"
530         ctl[3] = "10000"
531         control1 = str(":".join(ctl))
532
533         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
534                                                 add_members_operation=False)
535
536         res = self.ldb_admin.search(self.base_dn,
537                                     expression="(name=testgroup)",
538                                     attrs=["member"],
539                                     controls=[control1])
540
541         self.ldb_admin.deletegroup("testgroup")
542         self.assertEqual(len(res[0].get("member")), 0)
543
544
545
546     def test_dirsync_deleted_items(self):
547         """Check that dirsync returnd deleted objects too"""
548         # Let's create an OU
549         ouname = "OU=testou3,%s" % self.base_dn
550         self.ouname = ouname
551         self.ldb_admin.create_ou(ouname)
552         res = self.ldb_admin.search(self.base_dn,
553                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
554                                     controls=["dirsync:1:0:1"])
555         guid = None
556         for e in res:
557             if str(e["name"]) == "testou3":
558                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
559
560         ctl = str(res.controls[0]).split(":")
561         ctl[1] = "1"
562         ctl[2] = "0"
563         ctl[3] = "10000"
564         control1 = str(":".join(ctl))
565
566         # So now delete the object and check that
567         # we can see the object but deleted when admin
568         delete_force(self.ldb_admin, ouname)
569
570         res = self.ldb_admin.search(self.base_dn,
571                                     expression="(objectClass=organizationalUnit)",
572                                     controls=[control1])
573         self.assertEqual(len(res), 1)
574         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
575         self.assertEqual(guid2, guid)
576         self.assertTrue(res[0].get("isDeleted"))
577         self.assertTrue(res[0].get("name") != None)
578
579     def test_cookie_from_others(self):
580         res = self.ldb_admin.search(self.base_dn,
581                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
582                                     controls=["dirsync:1:0:1"])
583         ctl = str(res.controls[0]).split(":")
584         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
585         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
586         controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
587         res = self.ldb_admin.search(self.base_dn,
588                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
589                                     controls=controls)
590
591
592 class ExtendedDirsyncTests(SimpleDirsyncTests):
593
594     def test_dirsync_linkedattributes(self):
595         flag_incr_linked = 2147483648
596         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
597         res = self.ldb_admin.search(self.base_dn,
598                                     attrs=["member"],
599                                     expression="(name=Administrators)",
600                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
601
602         self.assertTrue(res[0].get("member;range=1-1") != None)
603         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
604         size = len(res[0].get("member;range=1-1"))
605
606         ctl = str(res.controls[0]).split(":")
607         ctl[1] = "1"
608         ctl[2] = "%d" % flag_incr_linked
609         ctl[3] = "10000"
610         control1 = str(":".join(ctl))
611         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
612                                                 add_members_operation=True)
613         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
614                                                 add_members_operation=True)
615
616
617         res = self.ldb_admin.search(self.base_dn,
618                                     expression="(name=Administrators)",
619                                     controls=[control1])
620
621         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
622         ctl = str(res.controls[0]).split(":")
623         ctl[1] = "1"
624         ctl[2] = "%d" % flag_incr_linked
625         ctl[3] = "10000"
626         control1 = str(":".join(ctl))
627
628         # remove the user from the group
629         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
630                                                 add_members_operation=False)
631
632         res = self.ldb_admin.search(self.base_dn,
633                                     expression="(name=Administrators)",
634                                     controls=[control1])
635
636         self.assertEqual(res[0].get("member;range=1-1"), None)
637         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
638
639         ctl = str(res.controls[0]).split(":")
640         ctl[1] = "1"
641         ctl[2] = "%d" % flag_incr_linked
642         ctl[3] = "10000"
643         control2 = str(":".join(ctl))
644
645         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
646                                                 add_members_operation=False)
647
648         res = self.ldb_admin.search(self.base_dn,
649                                     expression="(name=Administrators)",
650                                     controls=[control2])
651
652         self.assertEqual(res[0].get("member;range=1-1"), None)
653         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
654
655         res = self.ldb_admin.search(self.base_dn,
656                                     expression="(name=Administrators)",
657                                     controls=[control1])
658
659         self.assertEqual(res[0].get("member;range=1-1"), None)
660         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
661
662     def test_dirsync_deleted_items(self):
663         """Check that dirsync returnd deleted objects too"""
664         # Let's create an OU
665         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
666         ouname = "OU=testou3,%s" % self.base_dn
667         self.ouname = ouname
668         self.ldb_admin.create_ou(ouname)
669
670         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
671         res = self.ldb_simple.search(self.base_dn,
672                                      expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
673                                      controls=["dirsync:1:1:1"])
674
675         guid = None
676         for e in res:
677             if str(e["name"]) == "testou3":
678                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
679
680         self.assertTrue(guid != None)
681         ctl = str(res.controls[0]).split(":")
682         ctl[1] = "1"
683         ctl[2] = "1"
684         ctl[3] = "10000"
685         control1 = str(":".join(ctl))
686
687         # So now delete the object and check that
688         # we can see the object but deleted when admin
689         # we just see the objectGUID when simple user
690         delete_force(self.ldb_admin, ouname)
691
692         res = self.ldb_simple.search(self.base_dn,
693                                      expression="(objectClass=organizationalUnit)",
694                                      controls=[control1])
695         self.assertEqual(len(res), 1)
696         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
697         self.assertEqual(guid2, guid)
698         self.assertEqual(str(res[0].dn), "")
699
700
701 if not getattr(opts, "listtests", False):
702     lp = sambaopts.get_loadparm()
703     samba.tests.cmdline_credentials = credopts.get_credentials(lp)
704
705
706 TestProgram(module=__name__, opts=subunitopts)