python:tests: Store keys as bytes rather than as lists of ints
[samba.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python3
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 # Copyright (C) Catalyst.Net Ltd
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25 import samba
26 from samba.tests.subunitrun import TestProgram, SubunitOptions
27
28 import samba.getopt as options
29 import base64
30
31 import ldb
32 from ldb import LdbError, SCOPE_BASE
33 from ldb import Message, MessageElement, Dn
34 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
35 from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_RODC_ATTRIBUTE
36 from samba.dcerpc import security, misc, drsblobs
37 from samba.ndr import ndr_unpack, ndr_pack
38
39 from samba.auth import system_session
40 from samba import gensec, sd_utils
41 from samba.samdb import SamDB
42 from samba.credentials import Credentials, DONT_USE_KERBEROS
43 import samba.tests
44 from samba.tests import delete_force
45
46 parser = optparse.OptionParser("dirsync.py [options] <host>")
47 sambaopts = options.SambaOptions(parser)
48 parser.add_option_group(sambaopts)
49 parser.add_option_group(options.VersionOptions(parser))
50
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 subunitopts = SubunitOptions(parser)
55 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
57
58 if len(args) < 1:
59     parser.print_usage()
60     sys.exit(1)
61
62 host = args.pop()
63 if "://" not in host:
64     ldaphost = "ldap://%s" % host
65 else:
66     ldaphost = host
67     start = host.rindex("://")
68     host = host.lstrip(start + 3)
69
70 lp = sambaopts.get_loadparm()
71 creds = credopts.get_credentials(lp)
72
73 #
74 # Tests start here
75 #
76
77
78 class DirsyncBaseTests(samba.tests.TestCase):
79
80     def setUp(self):
81         super().setUp()
82         self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
83         self.base_dn = self.ldb_admin.domain_dn()
84         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
85         self.user_pass = samba.generate_random_password(12, 16)
86         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
87         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
88         # used for anonymous login
89         print("baseDN: %s" % self.base_dn)
90
91         userou = "OU=dirsync-test"
92         self.ou = f"{userou},{self.base_dn}"
93         samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
94         self.ldb_admin.create_ou(self.ou)
95         self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
96
97         # Regular user
98         self.dirsync_user = "test_dirsync_user"
99         self.simple_user = "test_simple_user"
100         self.admin_user = "test_admin_user"
101         self.dirsync_pass = self.user_pass
102         self.simple_pass = self.user_pass
103         self.admin_pass = self.user_pass
104
105         self.ldb_admin.newuser(self.dirsync_user, self.dirsync_pass, userou=userou)
106         self.ldb_admin.newuser(self.simple_user, self.simple_pass, userou=userou)
107         self.ldb_admin.newuser(self.admin_user, self.admin_pass, userou=userou)
108         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
109
110         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
111         mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
112                                    str(user_sid))
113         self.sd_utils.dacl_add_ace(self.base_dn, mod)
114         self.addCleanup(self.sd_utils.dacl_delete_aces, self.base_dn, mod)
115
116         # add admins to the Domain Admins group
117         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
118                                                 add_members_operation=True)
119
120     def get_user_dn(self, name):
121         return ldb.Dn(self.ldb_admin, "CN={0},{1}".format(name, self.ou))
122
123     def get_ldb_connection(self, target_username, target_password):
124         creds_tmp = Credentials()
125         creds_tmp.set_username(target_username)
126         creds_tmp.set_password(target_password)
127         creds_tmp.set_domain(creds.get_domain())
128         creds_tmp.set_realm(creds.get_realm())
129         creds_tmp.set_workstation(creds.get_workstation())
130         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
131                                       | gensec.FEATURE_SEAL)
132         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
133         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
134         return ldb_target
135
136 # tests on ldap add operations
137 class SimpleDirsyncTests(DirsyncBaseTests):
138
139     # def test_dirsync_errors(self):
140
141     def test_dirsync_supported(self):
142         """Test the basic of the dirsync is supported"""
143         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
144         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
145         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
146         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
147         try:
148             self.ldb_simple.search(self.base_dn,
149                                    expression="samaccountname=*",
150                                    controls=["dirsync:1:0:1"])
151         except LdbError as l:
152             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
153
154     def test_parentGUID_referrals(self):
155         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
156
157         res = self.ldb_admin.search(self.base_dn,
158                                     expression="name=Configuration",
159                                     controls=["dirsync:1:0:1"])
160         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
161
162     def test_ok_not_rootdc(self):
163         """Test if it's ok to do dirsync on another NC that is not the root DC"""
164         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
165                               expression="samaccountname=*",
166                               controls=["dirsync:1:0:1"])
167
168     def test_dirsync_errors(self):
169         """Test if dirsync returns the correct LDAP errors in case of pb"""
170         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
171         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
172         try:
173             self.ldb_simple.search(self.base_dn,
174                                    expression="samaccountname=*",
175                                    controls=["dirsync:1:0:1"])
176         except LdbError as l:
177             print(l)
178             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
179
180         try:
181             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
182                                    expression="samaccountname=*",
183                                    controls=["dirsync:1:0:1"])
184         except LdbError as l:
185             print(l)
186             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
187
188         try:
189             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
190                                    expression="samaccountname=*",
191                                    controls=["dirsync:1:1:1"])
192         except LdbError as l:
193             print(l)
194             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
195
196         try:
197             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
198                                     expression="samaccountname=*",
199                                     controls=["dirsync:1:0:1"])
200         except LdbError as l:
201             print(l)
202             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
203
204         try:
205             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
206                                   expression="samaccountname=*",
207                                   controls=["dirsync:1:0:1"])
208         except LdbError as l:
209             print(l)
210             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
211
212         try:
213             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
214                                   expression="samaccountname=*",
215                                   controls=["dirsync:1:1:1"])
216         except LdbError as l:
217             print(l)
218             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
219
220     def test_dirsync_attributes(self):
221         """Check behavior with some attributes """
222         res = self.ldb_admin.search(self.base_dn,
223                                     expression="samaccountname=*",
224                                     controls=["dirsync:1:0:1"])
225         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
226         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None)
227         # Check that non replicated attributes are not returned
228         self.assertTrue(res.msgs[0].get("badPwdCount") is None)
229         # Check that non forward link are not returned
230         self.assertTrue(res.msgs[0].get("memberof") is None)
231
232         # Asking for instanceType will return also objectGUID
233         res = self.ldb_admin.search(self.base_dn,
234                                     expression="samaccountname=Administrator",
235                                     attrs=["instanceType"],
236                                     controls=["dirsync:1:0:1"])
237         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
238         self.assertTrue(res.msgs[0].get("instanceType") is not None)
239
240         # We don't return an entry if asked for objectGUID
241         res = self.ldb_admin.search(self.base_dn,
242                                     expression="(distinguishedName=%s)" % str(self.base_dn),
243                                     attrs=["objectGUID"],
244                                     controls=["dirsync:1:0:1"])
245         self.assertEqual(len(res.msgs), 0)
246
247         # a request on the root of a NC didn't return parentGUID
248         res = self.ldb_admin.search(self.base_dn,
249                                     expression="(distinguishedName=%s)" % str(self.base_dn),
250                                     attrs=["name"],
251                                     controls=["dirsync:1:0:1"])
252         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
253         self.assertTrue(res.msgs[0].get("name") is not None)
254         self.assertTrue(res.msgs[0].get("parentGUID") is None)
255         self.assertTrue(res.msgs[0].get("instanceType") is not None)
256
257         # Asking for name will return also objectGUID and parentGUID
258         # and instanceType and of course name
259         res = self.ldb_admin.search(self.base_dn,
260                                     expression="samaccountname=Administrator",
261                                     attrs=["name"],
262                                     controls=["dirsync:1:0:1"])
263         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
264         self.assertTrue(res.msgs[0].get("name") is not None)
265         self.assertTrue(res.msgs[0].get("parentGUID") is not None)
266         self.assertTrue(res.msgs[0].get("instanceType") is not None)
267
268         # Asking for dn will not return not only DN but more like if attrs=*
269         # parentGUID should be returned
270         res = self.ldb_admin.search(self.base_dn,
271                                     expression="samaccountname=Administrator",
272                                     attrs=["dn"],
273                                     controls=["dirsync:1:0:1"])
274         count = len(res.msgs[0])
275         res2 = self.ldb_admin.search(self.base_dn,
276                                      expression="samaccountname=Administrator",
277                                      controls=["dirsync:1:0:1"])
278         count2 = len(res2.msgs[0])
279         self.assertEqual(count, count2)
280
281         # Asking for cn will return nothing on objects that have CN as RDN
282         res = self.ldb_admin.search(self.base_dn,
283                                     expression="samaccountname=Administrator",
284                                     attrs=["cn"],
285                                     controls=["dirsync:1:0:1"])
286         self.assertEqual(len(res.msgs), 0)
287         # Asking for parentGUID will return nothing too
288         res = self.ldb_admin.search(self.base_dn,
289                                     expression="samaccountname=Administrator",
290                                     attrs=["parentGUID"],
291                                     controls=["dirsync:1:0:1"])
292         self.assertEqual(len(res.msgs), 0)
293         ouname = "OU=testou,%s" % self.ou
294         self.ouname = ouname
295         self.ldb_admin.create_ou(ouname)
296         delta = Message()
297         delta.dn = Dn(self.ldb_admin, ouname)
298         delta["cn"] = MessageElement("test ou",
299                                      FLAG_MOD_ADD,
300                                      "cn")
301         self.ldb_admin.modify(delta)
302         res = self.ldb_admin.search(self.base_dn,
303                                     expression="name=testou",
304                                     attrs=["cn"],
305                                     controls=["dirsync:1:0:1"])
306
307         self.assertEqual(len(res.msgs), 1)
308         self.assertEqual(len(res.msgs[0]), 3)
309         delete_force(self.ldb_admin, ouname)
310
311     def test_dirsync_with_controls(self):
312         """Check that dirsync return correct information when dealing with the NC"""
313         res = self.ldb_admin.search(self.base_dn,
314                                     expression="(distinguishedName=%s)" % str(self.base_dn),
315                                     attrs=["name"],
316                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
317
318     def test_dirsync_basenc(self):
319         """Check that dirsync return correct information when dealing with the NC"""
320         res = self.ldb_admin.search(self.base_dn,
321                                     expression="(distinguishedName=%s)" % str(self.base_dn),
322                                     attrs=["name"],
323                                     controls=["dirsync:1:0:10000"])
324         self.assertEqual(len(res.msgs), 1)
325         self.assertEqual(len(res.msgs[0]), 3)
326
327         res = self.ldb_admin.search(self.base_dn,
328                                     expression="(distinguishedName=%s)" % str(self.base_dn),
329                                     attrs=["ntSecurityDescriptor"],
330                                     controls=["dirsync:1:0:10000"])
331         self.assertEqual(len(res.msgs), 1)
332         self.assertEqual(len(res.msgs[0]), 3)
333
334     def test_dirsync_othernc(self):
335         """Check that dirsync return information for entries that are normally referrals (ie. other NCs)"""
336         res = self.ldb_admin.search(self.base_dn,
337                                     expression="(objectclass=configuration)",
338                                     attrs=["name"],
339                                     controls=["dirsync:1:0:10000"])
340         self.assertEqual(len(res.msgs), 1)
341         self.assertEqual(len(res.msgs[0]), 4)
342
343         res = self.ldb_admin.search(self.base_dn,
344                                     expression="(objectclass=configuration)",
345                                     attrs=["ntSecurityDescriptor"],
346                                     controls=["dirsync:1:0:10000"])
347         self.assertEqual(len(res.msgs), 1)
348         self.assertEqual(len(res.msgs[0]), 3)
349
350         res = self.ldb_admin.search(self.base_dn,
351                                     expression="(objectclass=domaindns)",
352                                     attrs=["ntSecurityDescriptor"],
353                                     controls=["dirsync:1:0:10000"])
354         nb = len(res.msgs)
355
356         # only sub nc returns a result when asked for objectGUID
357         res = self.ldb_admin.search(self.base_dn,
358                                     expression="(objectclass=domaindns)",
359                                     attrs=["objectGUID"],
360                                     controls=["dirsync:1:0:0"])
361         self.assertEqual(len(res.msgs), nb - 1)
362         if nb > 1:
363             self.assertTrue(res.msgs[0].get("objectGUID") is not None)
364         else:
365             res = self.ldb_admin.search(self.base_dn,
366                                         expression="(objectclass=configuration)",
367                                         attrs=["objectGUID"],
368                                         controls=["dirsync:1:0:0"])
369
370     def test_dirsync_send_delta(self):
371         """Check that dirsync return correct delta when sending the last cookie"""
372         res = self.ldb_admin.search(self.base_dn,
373                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
374                                     controls=["dirsync:1:0:10000"])
375         ctl = str(res.controls[0]).split(":")
376         ctl[1] = "1"
377         ctl[2] = "0"
378         ctl[3] = "10000"
379         control = str(":".join(ctl))
380         res = self.ldb_admin.search(self.base_dn,
381                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
382                                     controls=[control])
383         self.assertEqual(len(res), 0)
384
385         res = self.ldb_admin.search(self.base_dn,
386                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
387                                     controls=["dirsync:1:0:100000"])
388
389         ctl = str(res.controls[0]).split(":")
390         ctl[1] = "1"
391         ctl[2] = "0"
392         ctl[3] = "10000"
393         control2 = str(":".join(ctl))
394
395         # Let's create an OU
396         ouname = "OU=testou2,%s" % self.base_dn
397         self.ouname = ouname
398         self.ldb_admin.create_ou(ouname)
399         res = self.ldb_admin.search(self.base_dn,
400                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
401                                     controls=[control2])
402         self.assertEqual(len(res), 1)
403         ctl = str(res.controls[0]).split(":")
404         ctl[1] = "1"
405         ctl[2] = "0"
406         ctl[3] = "10000"
407         control3 = str(":".join(ctl))
408
409         delta = Message()
410         delta.dn = Dn(self.ldb_admin, str(ouname))
411
412         delta["cn"] = MessageElement("test ou",
413                                      FLAG_MOD_ADD,
414                                      "cn")
415         self.ldb_admin.modify(delta)
416         res = self.ldb_admin.search(self.base_dn,
417                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
418                                     controls=[control3])
419
420         self.assertEqual(len(res.msgs), 1)
421         # 3 attributes: instanceType, cn and objectGUID
422         self.assertEqual(len(res.msgs[0]), 3)
423
424         delta = Message()
425         delta.dn = Dn(self.ldb_admin, str(ouname))
426         delta["cn"] = MessageElement([],
427                                      FLAG_MOD_DELETE,
428                                      "cn")
429         self.ldb_admin.modify(delta)
430         res = self.ldb_admin.search(self.base_dn,
431                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
432                                     controls=[control3])
433
434         self.assertEqual(len(res.msgs), 1)
435         # So we won't have much attribute returned but instanceType and GUID
436         # are.
437         # 3 attributes: instanceType and objectGUID and cn but empty
438         self.assertEqual(len(res.msgs[0]), 3)
439         ouname = "OU=newouname,%s" % self.base_dn
440         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
441         self.ouname = ouname
442         ctl = str(res.controls[0]).split(":")
443         ctl[1] = "1"
444         ctl[2] = "0"
445         ctl[3] = "10000"
446         control4 = str(":".join(ctl))
447         res = self.ldb_admin.search(self.base_dn,
448                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
449                                     controls=[control4])
450
451         self.assertTrue(res[0].get("parentGUID") is not None)
452         self.assertTrue(res[0].get("name") is not None)
453         delete_force(self.ldb_admin, ouname)
454
455     def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
456         """Check that dirsync returned deleted objects too"""
457         # Let's search for members
458         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
459         res = self.ldb_simple.search(self.base_dn,
460                                      expression="(name=Administrators)",
461                                      controls=["dirsync:1:1:1"])
462
463         self.assertTrue(len(res[0].get("member")) > 0)
464         size = len(res[0].get("member"))
465
466         ctl = str(res.controls[0]).split(":")
467         ctl[1] = "1"
468         ctl[2] = "1"
469         ctl[3] = "10000"
470         control1 = str(":".join(ctl))
471         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
472                                                 add_members_operation=True)
473
474         res = self.ldb_simple.search(self.base_dn,
475                                      expression="(name=Administrators)",
476                                      controls=[control1])
477
478         self.assertEqual(len(res[0].get("member")), size + 1)
479         ctl = str(res.controls[0]).split(":")
480         ctl[1] = "1"
481         ctl[2] = "1"
482         ctl[3] = "10000"
483         control1 = str(":".join(ctl))
484
485         # remove the user from the group
486         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
487                                                 add_members_operation=False)
488
489         res = self.ldb_simple.search(self.base_dn,
490                                      expression="(name=Administrators)",
491                                      controls=[control1])
492
493         self.assertEqual(len(res[0].get("member")), size)
494
495         self.ldb_admin.newgroup("testgroup")
496         self.addCleanup(self.ldb_admin.deletegroup, "testgroup")
497         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
498                                                 add_members_operation=True)
499
500         res = self.ldb_admin.search(self.base_dn,
501                                     expression="(name=testgroup)",
502                                     controls=["dirsync:1:0:1"])
503
504         self.assertEqual(len(res[0].get("member")), 1)
505         self.assertTrue(res[0].get("member") != "")
506
507         ctl = str(res.controls[0]).split(":")
508         ctl[1] = "1"
509         ctl[2] = "0"
510         ctl[3] = "1"
511         control1 = str(":".join(ctl))
512
513         # Check that reasking the same question but with an updated cookie
514         # didn't return any results.
515         print(control1)
516         res = self.ldb_admin.search(self.base_dn,
517                                     expression="(name=testgroup)",
518                                     controls=[control1])
519         self.assertEqual(len(res), 0)
520
521         ctl = str(res.controls[0]).split(":")
522         ctl[1] = "1"
523         ctl[2] = "1"
524         ctl[3] = "10000"
525         control1 = str(":".join(ctl))
526
527         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
528                                                 add_members_operation=False)
529
530         res = self.ldb_admin.search(self.base_dn,
531                                     expression="(name=testgroup)",
532                                     attrs=["member"],
533                                     controls=[control1])
534
535         self.assertEqual(len(res[0].get("member")), 0)
536
537     def test_dirsync_deleted_items(self):
538         """Check that dirsync returned deleted objects too"""
539         # Let's create an OU
540         ouname = "OU=testou3,%s" % self.base_dn
541         self.ouname = ouname
542         self.ldb_admin.create_ou(ouname)
543         res = self.ldb_admin.search(self.base_dn,
544                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
545                                     controls=["dirsync:1:0:1"])
546         guid = None
547         for e in res:
548             if str(e["name"]) == "testou3":
549                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
550
551         ctl = str(res.controls[0]).split(":")
552         ctl[1] = "1"
553         ctl[2] = "0"
554         ctl[3] = "10000"
555         control1 = str(":".join(ctl))
556
557         # So now delete the object and check that
558         # we can see the object but deleted when admin
559         delete_force(self.ldb_admin, ouname)
560
561         res = self.ldb_admin.search(self.base_dn,
562                                     expression="(objectClass=organizationalUnit)",
563                                     controls=[control1])
564         self.assertEqual(len(res), 1)
565         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
566         self.assertEqual(guid2, guid)
567         self.assertTrue(res[0].get("isDeleted"))
568         self.assertTrue(res[0].get("name") is not None)
569
570     def test_cookie_from_others(self):
571         res = self.ldb_admin.search(self.base_dn,
572                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
573                                     controls=["dirsync:1:0:1"])
574         ctl = str(res.controls[0]).split(":")
575         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
576         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
577         controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
578         res = self.ldb_admin.search(self.base_dn,
579                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
580                                     controls=controls)
581
582     def test_dirsync_linkedattributes_range(self):
583         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
584         res = self.ldb_admin.search(self.base_dn,
585                                     attrs=["member;range=1-1"],
586                                     expression="(name=Administrators)",
587                                     controls=["dirsync:1:0:0"])
588
589         self.assertTrue(len(res) > 0)
590         self.assertTrue(res[0].get("member;range=1-1") is None)
591         self.assertTrue(res[0].get("member") is not None)
592         self.assertTrue(len(res[0].get("member")) > 0)
593
594     def test_dirsync_linkedattributes_range_user(self):
595         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
596         try:
597             res = self.ldb_simple.search(self.base_dn,
598                                          attrs=["member;range=1-1"],
599                                          expression="(name=Administrators)",
600                                         controls=["dirsync:1:0:0"])
601         except LdbError as e:
602             (num, _) = e.args
603             self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
604         else:
605             self.fail()
606
607     def test_dirsync_linkedattributes(self):
608         flag_incr_linked = 2147483648
609         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
610         res = self.ldb_admin.search(self.base_dn,
611                                     attrs=["member"],
612                                     expression="(name=Administrators)",
613                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
614
615         self.assertTrue(res[0].get("member;range=1-1") is not None)
616         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
617         size = len(res[0].get("member;range=1-1"))
618
619         ctl = str(res.controls[0]).split(":")
620         ctl[1] = "1"
621         ctl[2] = "%d" % flag_incr_linked
622         ctl[3] = "10000"
623         control1 = str(":".join(ctl))
624         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
625                                                 add_members_operation=True)
626         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
627                                                 add_members_operation=True)
628
629         res = self.ldb_admin.search(self.base_dn,
630                                     expression="(name=Administrators)",
631                                     controls=[control1])
632
633         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
634         ctl = str(res.controls[0]).split(":")
635         ctl[1] = "1"
636         ctl[2] = "%d" % flag_incr_linked
637         ctl[3] = "10000"
638         control1 = str(":".join(ctl))
639
640         # remove the user from the group
641         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
642                                                 add_members_operation=False)
643
644         res = self.ldb_admin.search(self.base_dn,
645                                     expression="(name=Administrators)",
646                                     controls=[control1])
647
648         self.assertEqual(res[0].get("member;range=1-1"), None)
649         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
650
651         ctl = str(res.controls[0]).split(":")
652         ctl[1] = "1"
653         ctl[2] = "%d" % flag_incr_linked
654         ctl[3] = "10000"
655         control2 = str(":".join(ctl))
656
657         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
658                                                 add_members_operation=False)
659
660         res = self.ldb_admin.search(self.base_dn,
661                                     expression="(name=Administrators)",
662                                     controls=[control2])
663
664         self.assertEqual(res[0].get("member;range=1-1"), None)
665         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
666
667         res = self.ldb_admin.search(self.base_dn,
668                                     expression="(name=Administrators)",
669                                     controls=[control1])
670
671         self.assertEqual(res[0].get("member;range=1-1"), None)
672         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
673
674     def test_dirsync_extended_dn(self):
675         """Check that dirsync works together with the extended_dn control"""
676         # Let's search for members
677         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
678         res = self.ldb_simple.search(self.base_dn,
679                                      expression="(name=Administrators)",
680                                      controls=["dirsync:1:1:1"])
681
682         self.assertTrue(len(res[0].get("member")) > 0)
683         size = len(res[0].get("member"))
684
685         resEX1 = self.ldb_simple.search(self.base_dn,
686                                         expression="(name=Administrators)",
687                                         controls=["dirsync:1:1:1","extended_dn:1:1"])
688         self.assertTrue(len(resEX1[0].get("member")) > 0)
689         sizeEX1 = len(resEX1[0].get("member"))
690         self.assertEqual(sizeEX1, size)
691         self.assertIn(res[0]["member"][0], resEX1[0]["member"][0])
692         self.assertIn(b"<GUID=", resEX1[0]["member"][0])
693         self.assertIn(b">;<SID=S-1-5-21-", resEX1[0]["member"][0])
694
695         resEX0 = self.ldb_simple.search(self.base_dn,
696                                         expression="(name=Administrators)",
697                                         controls=["dirsync:1:1:1","extended_dn:1:0"])
698         self.assertTrue(len(resEX0[0].get("member")) > 0)
699         sizeEX0 = len(resEX0[0].get("member"))
700         self.assertEqual(sizeEX0, size)
701         self.assertIn(res[0]["member"][0], resEX0[0]["member"][0])
702         self.assertIn(b"<GUID=", resEX0[0]["member"][0])
703         self.assertIn(b">;<SID=010500000000000515", resEX0[0]["member"][0])
704
705     def test_dirsync_deleted_items_OBJECT_SECURITY(self):
706         """Check that dirsync returned deleted objects too"""
707         # Let's create an OU
708         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
709         ouname = "OU=testou3,%s" % self.base_dn
710         self.ouname = ouname
711         self.ldb_admin.create_ou(ouname)
712
713         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
714         res = self.ldb_simple.search(self.base_dn,
715                                      expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
716                                      controls=["dirsync:1:1:1"])
717
718         guid = None
719         for e in res:
720             if str(e["name"]) == "testou3":
721                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
722
723         self.assertTrue(guid is not None)
724         ctl = str(res.controls[0]).split(":")
725         ctl[1] = "1"
726         ctl[2] = "1"
727         ctl[3] = "10000"
728         control1 = str(":".join(ctl))
729
730         # So now delete the object and check that
731         # we can see the object but deleted when admin
732         # we just see the objectGUID when simple user
733         delete_force(self.ldb_admin, ouname)
734
735         res = self.ldb_simple.search(self.base_dn,
736                                      expression="(objectClass=organizationalUnit)",
737                                      controls=[control1])
738         self.assertEqual(len(res), 1)
739         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
740         self.assertEqual(guid2, guid)
741         self.assertEqual(str(res[0].dn), "")
742
743 class SpecialDirsyncTests(DirsyncBaseTests):
744
745     def setUp(self):
746         super().setUp()
747
748         self.schema_dn = self.ldb_admin.get_schema_basedn()
749
750         # the tests work by setting the 'Confidential' or 'RODC Filtered' bit in the searchFlags
751         # for an existing schema attribute. This only works against Windows if
752         # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
753         # schema attribute being modified. There are only a few attributes that
754         # meet this criteria (most of which only apply to 'user' objects)
755         self.conf_attr = "homePostalAddress"
756         attr_cn = "CN=Address-Home"
757         # schemaIdGuid for homePostalAddress (used for ACE tests)
758         self.attr_dn = f"{attr_cn},{self.schema_dn}"
759
760         userou = "OU=conf-attr-test"
761         self.ou = "{0},{1}".format(userou, self.base_dn)
762         samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
763         self.ldb_admin.create_ou(self.ou)
764         self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
765
766         # add a test object with this attribute set
767         self.conf_value = "abcdef"
768         self.conf_user = "conf-user"
769         self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
770         self.conf_dn = self.get_user_dn(self.conf_user)
771         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
772
773         # sanity-check the flag is not already set (this'll cause problems if
774         # previous test run didn't clean up properly)
775
776         search_flags = int(self.get_attr_search_flags(self.attr_dn))
777         if search_flags & SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE:
778             self.set_attr_search_flags(self.attr_dn, str(search_flags &~ (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE)))
779         search_flags = int(self.get_attr_search_flags(self.attr_dn))
780         self.assertEqual(0, search_flags & (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE),
781                          f"{self.conf_attr} searchFlags did not reset to omit SEARCH_FLAG_CONFIDENTIAL and SEARCH_FLAG_RODC_ATTRIBUTE ({search_flags})")
782
783         # work out the original 'searchFlags' value before we overwrite it
784         old_value = self.get_attr_search_flags(self.attr_dn)
785
786         self.set_attr_search_flags(self.attr_dn, str(self.flag_under_test))
787
788         # reset the value after the test completes
789         self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
790
791     def add_attr(self, dn, attr, value):
792         m = Message()
793         m.dn = dn
794         m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
795         self.ldb_admin.modify(m)
796
797     def set_attr_search_flags(self, attr_dn, flags):
798         """Modifies the searchFlags for an object in the schema"""
799         m = Message()
800         m.dn = Dn(self.ldb_admin, attr_dn)
801         m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
802                                           'searchFlags')
803         self.ldb_admin.modify(m)
804
805         # note we have to update the schema for this change to take effect (on
806         # Windows, at least)
807         self.ldb_admin.set_schema_update_now()
808
809     def get_attr_search_flags(self, attr_dn):
810         res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
811                                     attrs=['searchFlags'])
812         return res[0]['searchFlags'][0]
813
814     def find_under_current_ou(self, res):
815         for msg in res:
816             if msg.dn == self.conf_dn:
817                 return msg
818         self.fail(f"Failed to find object {self.conf_dn} in {len(res)} results")
819
820
821 class ConfidentialDirsyncTests(SpecialDirsyncTests):
822
823     def setUp(self):
824         self.flag_under_test = SEARCH_FLAG_CONFIDENTIAL
825         super().setUp()
826
827     def test_unicodePwd_normal(self):
828         res = self.ldb_admin.search(self.base_dn,
829                                     attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
830                                     expression=f"(samAccountName={self.conf_user})")
831
832         msg = res[0]
833
834         self.assertTrue("samAccountName" in msg)
835         # This form ensures this is a case insensitive comparison
836         self.assertTrue(msg.get("samAccountName"))
837         self.assertTrue(msg.get("unicodePwd") is None)
838         self.assertTrue(msg.get("supplementalCredentials") is None)
839
840     def _test_dirsync_unicodePwd(self, ldb_conn, control=None, insist_on_empty_element=False):
841         res = ldb_conn.search(self.base_dn,
842                          attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
843                          expression=f"(samAccountName={self.conf_user})",
844                          controls=[control])
845
846         msg = self.find_under_current_ou(res)
847
848         self.assertTrue("samAccountName" in msg)
849         # This form ensures this is a case insensitive comparison
850         self.assertTrue(msg.get("samAccountName"))
851         if insist_on_empty_element:
852             self.assertTrue(msg.get("unicodePwd") is not None)
853             self.assertEqual(len(msg.get("unicodePwd")), 0)
854             self.assertTrue(msg.get("supplementalCredentials") is not None)
855             self.assertEqual(len(msg.get("supplementalCredentials")), 0)
856         else:
857             self.assertTrue(msg.get("unicodePwd") is None
858                             or len(msg.get("unicodePwd")) == 0)
859             self.assertTrue(msg.get("supplementalCredentials") is None
860                             or len(msg.get("supplementalCredentials")) == 0)
861
862     def test_dirsync_unicodePwd_OBJ_SEC(self):
863         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
864         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
865
866     def test_dirsync_unicodePwd_OBJ_SEC_insist_on_empty_element(self):
867         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
868         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
869
870     def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC(self):
871         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
872         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
873
874     def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC_insist_on_empty_element(self):
875         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
876         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
877
878     def test_dirsync_unicodePwd_with_GET_CHANGES(self):
879         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
880         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0")
881
882     def test_dirsync_unicodePwd_with_GET_CHANGES_insist_on_empty_element(self):
883         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
884         self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0", insist_on_empty_element=True)
885
886     def test_normal(self):
887         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
888         res = ldb_conn.search(self.base_dn,
889                          attrs=[self.conf_attr, "samAccountName"],
890                          expression=f"(samAccountName={self.conf_user})")
891
892         msg = res[0]
893         self.assertTrue("samAccountName" in msg)
894         # This form ensures this is a case insensitive comparison
895         self.assertTrue(msg.get("samAccountName"))
896         self.assertTrue(msg.get(self.conf_attr) is None)
897
898     def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
899         res = ldb_conn.search(self.base_dn,
900                               attrs=[self.conf_attr, "samAccountName"],
901                               expression=f"(samAccountName={self.conf_user})",
902                               controls=["dirsync:1:1:0"])
903
904         msg = self.find_under_current_ou(res)
905         self.assertTrue("samAccountName" in msg)
906         # This form ensures this is a case insensitive comparison
907         self.assertTrue(msg.get("samAccountName"))
908         if insist_on_empty_element:
909             self.assertTrue(msg.get(self.conf_attr) is not None)
910             self.assertEqual(len(msg.get(self.conf_attr)), 0)
911         else:
912             self.assertTrue(msg.get(self.conf_attr) is None
913                             or len(msg.get(self.conf_attr)) == 0)
914
915     def test_dirsync_OBJECT_SECURITY(self):
916         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
917         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
918
919     def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
920         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
921         self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
922
923     def test_dirsync_with_GET_CHANGES(self):
924         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
925         res = ldb_conn.search(self.base_dn,
926                          attrs=[self.conf_attr, "samAccountName"],
927                          expression=f"(samAccountName={self.conf_user})",
928                          controls=["dirsync:1:0:0"])
929
930         msg = self.find_under_current_ou(res)
931         # This form ensures this is a case insensitive comparison
932         self.assertTrue(msg.get("samAccountName"))
933         self.assertTrue(msg.get(self.conf_attr))
934         self.assertEqual(len(msg.get(self.conf_attr)), 1)
935
936     def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY(self):
937         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
938         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
939
940     def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY_insist_on_empty_element(self):
941         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
942         self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
943
944 class FilteredDirsyncTests(SpecialDirsyncTests):
945
946     def setUp(self):
947         self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE
948         super().setUp()
949
950     def test_attr(self):
951         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
952         res = ldb_conn.search(self.base_dn,
953                          attrs=[self.conf_attr, "samAccountName"],
954                          expression=f"(samAccountName={self.conf_user})")
955
956         msg = res[0]
957         self.assertTrue("samAccountName" in msg)
958         # This form ensures this is a case insensitive comparison
959         self.assertTrue(msg.get("samAccountName"))
960         self.assertTrue(msg.get(self.conf_attr))
961         self.assertEqual(len(msg.get(self.conf_attr)), 1)
962
963     def _test_dirsync_OBJECT_SECURITY(self, ldb_conn):
964         res = ldb_conn.search(self.base_dn,
965                          attrs=[self.conf_attr, "samAccountName"],
966                          expression=f"(samAccountName={self.conf_user})",
967                          controls=["dirsync:1:1:0"])
968
969         msg = self.find_under_current_ou(res)
970         self.assertTrue("samAccountName" in msg)
971         # This form ensures this is a case insensitive comparison
972         self.assertTrue(msg.get("samAccountName"))
973         self.assertTrue(msg.get(self.conf_attr))
974         self.assertEqual(len(msg.get(self.conf_attr)), 1)
975
976     def test_dirsync_OBJECT_SECURITY(self):
977         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
978         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
979
980     def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
981         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
982         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
983
984     def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
985         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
986         res = ldb_conn.search(self.base_dn,
987                          expression=f"(samAccountName={self.conf_user})",
988                          controls=["dirsync:1:0:0"])
989
990         msg = self.find_under_current_ou(res)
991         # This form ensures this is a case insensitive comparison
992         self.assertTrue(msg.get("samAccountName"))
993         if insist_on_empty_element:
994             self.assertTrue(msg.get(self.conf_attr) is not None)
995             self.assertEqual(len(msg.get(self.conf_attr)), 0)
996         else:
997             self.assertTrue(msg.get(self.conf_attr) is None
998                             or len(msg.get(self.conf_attr)) == 0)
999
1000     def test_dirsync_with_GET_CHANGES(self):
1001         self._test_dirsync_with_GET_CHANGES()
1002
1003     def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
1004         self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
1005
1006     def test_dirsync_with_GET_CHANGES_attr(self):
1007         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1008         try:
1009             res = ldb_conn.search(self.base_dn,
1010                                   attrs=[self.conf_attr, "samAccountName"],
1011                                   expression=f"(samAccountName={self.conf_user})",
1012                                   controls=["dirsync:1:0:0"])
1013             self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1014         except ldb.LdbError as e:
1015             (errno, errstr) = e.args
1016             self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
1017
1018 class ConfidentialFilteredDirsyncTests(SpecialDirsyncTests):
1019
1020     def setUp(self):
1021         self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE|SEARCH_FLAG_CONFIDENTIAL
1022         super().setUp()
1023
1024     def test_attr(self):
1025         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1026         res = ldb_conn.search(self.base_dn,
1027                          attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
1028                          expression=f"(samAccountName={self.conf_user})")
1029
1030         msg = res[0]
1031         self.assertTrue(msg.get("samAccountName"))
1032         self.assertTrue(msg.get(self.conf_attr) is None)
1033
1034     def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
1035         res = ldb_conn.search(self.base_dn,
1036                               attrs=[self.conf_attr, "samAccountName"],
1037                               expression=f"(samAccountName={self.conf_user})",
1038                               controls=["dirsync:1:1:0"])
1039
1040         msg = self.find_under_current_ou(res)
1041         self.assertTrue("samAccountName" in msg)
1042         # This form ensures this is a case insensitive comparison
1043         self.assertTrue(msg.get("samAccountName"))
1044         if insist_on_empty_element:
1045             self.assertTrue(msg.get(self.conf_attr) is not None)
1046             self.assertEqual(len(msg.get(self.conf_attr)), 0)
1047         else:
1048             self.assertTrue(msg.get(self.conf_attr) is None
1049                             or len(msg.get(self.conf_attr)) == 0)
1050
1051     def test_dirsync_OBJECT_SECURITY(self):
1052         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1053         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
1054
1055     def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
1056         ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
1057         self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
1058
1059     def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
1060         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1061         self._test_dirsync_OBJECT_SECURITY(ldb_conn)
1062
1063     def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES_insist_on_empty_element(self):
1064         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1065         self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
1066
1067     def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
1068         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1069         res = ldb_conn.search(self.base_dn,
1070                          expression=f"(samAccountName={self.conf_user})",
1071                          controls=["dirsync:1:0:0"])
1072
1073         msg = self.find_under_current_ou(res)
1074         # This form ensures this is a case insensitive comparison
1075         self.assertTrue(msg.get("samAccountName"))
1076         if insist_on_empty_element:
1077             self.assertTrue(msg.get(self.conf_attr) is not None)
1078             self.assertEqual(len(msg.get(self.conf_attr)), 0)
1079         else:
1080             self.assertTrue(msg.get(self.conf_attr) is None
1081                             or len(msg.get(self.conf_attr)) == 0)
1082
1083     def test_dirsync_with_GET_CHANGES(self):
1084         self._test_dirsync_with_GET_CHANGES()
1085
1086     def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
1087         self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
1088
1089     def test_dirsync_with_GET_CHANGES_attr(self):
1090         ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
1091         try:
1092             res = ldb_conn.search(self.base_dn,
1093                                   attrs=[self.conf_attr, "samAccountName"],
1094                                   expression=f"(samAccountName={self.conf_user})",
1095                                   controls=["dirsync:1:0:0"])
1096             self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1097         except ldb.LdbError as e:
1098             (errno, errstr) = e.args
1099             self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
1100
1101
1102 if not getattr(opts, "listtests", False):
1103     lp = sambaopts.get_loadparm()
1104     samba.tests.cmdline_credentials = credopts.get_credentials(lp)
1105
1106
1107 TestProgram(module=__name__, opts=subunitopts)