CVE-2020-25720: s4-acl: Adjusted some tests to work with the new behavior
[samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11
12 from samba.tests import DynamicTestCase
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
14 from samba.common import get_string
15
16 import samba.getopt as options
17 from samba.join import DCJoinContext
18
19 from ldb import (
20     SCOPE_BASE, SCOPE_ONELEVEL, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
21     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
22 from ldb import ERR_CONSTRAINT_VIOLATION
23 from ldb import ERR_OPERATIONS_ERROR
24 from ldb import Message, MessageElement, Dn
25 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
26 from samba.dcerpc import security, drsuapi, misc
27
28 from samba.auth import system_session
29 from samba import gensec, sd_utils, werror
30 from samba.samdb import SamDB
31 from samba.credentials import Credentials, DONT_USE_KERBEROS
32 import samba.tests
33 from samba.tests import delete_force
34 import samba.dsdb
35 from samba.tests.password_test import PasswordCommon
36 from samba.ndr import ndr_pack
37
38 parser = optparse.OptionParser("acl.py [options] <host>")
39 sambaopts = options.SambaOptions(parser)
40 parser.add_option_group(sambaopts)
41 parser.add_option_group(options.VersionOptions(parser))
42
43 # use command line creds if available
44 credopts = options.CredentialsOptions(parser)
45 parser.add_option_group(credopts)
46 subunitopts = SubunitOptions(parser)
47 parser.add_option_group(subunitopts)
48
49 opts, args = parser.parse_args()
50
51 if len(args) < 1:
52     parser.print_usage()
53     sys.exit(1)
54
55 host = args[0]
56 if "://" not in host:
57     ldaphost = "ldap://%s" % host
58 else:
59     ldaphost = host
60     start = host.rindex("://")
61     host = host.lstrip(start + 3)
62
63 lp = sambaopts.get_loadparm()
64 creds = credopts.get_credentials(lp)
65 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
66
67 #
68 # Tests start here
69 #
70
71
72 class AclTests(samba.tests.TestCase):
73
74     def setUp(self):
75         super(AclTests, self).setUp()
76
77         strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
78         if strict_checking is None:
79             strict_checking = '1'
80         self.strict_checking = bool(int(strict_checking))
81
82         self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
83         self.base_dn = self.ldb_admin.domain_dn()
84         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
85         self.user_pass = "samba123@"
86         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
87         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
88         self.addCleanup(self.delete_admin_connection)
89         # used for anonymous login
90         self.creds_tmp = Credentials()
91         self.creds_tmp.set_username("")
92         self.creds_tmp.set_password("")
93         self.creds_tmp.set_domain(creds.get_domain())
94         self.creds_tmp.set_realm(creds.get_realm())
95         self.creds_tmp.set_workstation(creds.get_workstation())
96         print("baseDN: %s" % self.base_dn)
97
98         # set AttributeAuthorizationOnLDAPAdd and BlockOwnerImplicitRights
99         self.set_heuristic(samba.dsdb.DS_HR_ATTR_AUTHZ_ON_LDAP_ADD, b'11')
100
101     def set_heuristic(self, index, values):
102         self.assertGreater(index, 0)
103         self.assertLess(index, 30)
104         self.assertIsInstance(values, bytes)
105
106         # Get the old "dSHeuristics" if it was set
107         dsheuristics = self.ldb_admin.get_dsheuristics()
108         # Reset the "dSHeuristics" as they were before
109         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
110         # Set the "dSHeuristics" to activate the correct behaviour
111         default_heuristics = b"000000000100000000020000000003"
112         if dsheuristics is None:
113             dsheuristics = b""
114         dsheuristics += default_heuristics[len(dsheuristics):]
115         dsheuristics = (dsheuristics[:index - 1] +
116                         values +
117                         dsheuristics[index - 1 + len(values):])
118         self.ldb_admin.set_dsheuristics(dsheuristics)
119
120     def get_user_dn(self, name):
121         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
122
123     def get_ldb_connection(self, target_username, target_password):
124         creds_tmp = Credentials()
125         creds_tmp.set_username(target_username)
126         creds_tmp.set_password(target_password)
127         creds_tmp.set_domain(creds.get_domain())
128         creds_tmp.set_realm(creds.get_realm())
129         creds_tmp.set_workstation(creds.get_workstation())
130         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
131                                       | gensec.FEATURE_SEAL)
132         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
133         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
134         return ldb_target
135
136     # Test if we have any additional groups for users than default ones
137     def assert_user_no_group_member(self, username):
138         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
139         try:
140             self.assertEqual(res[0]["memberOf"][0], "")
141         except KeyError:
142             pass
143         else:
144             self.fail()
145
146     def delete_admin_connection(self):
147         del self.sd_utils
148         del self.ldb_admin
149
150 # tests on ldap add operations
151
152
153 class AclAddTests(AclTests):
154
155     def setUp(self):
156         super(AclAddTests, self).setUp()
157         # Domain admin that will be creator of OU parent-child structure
158         self.usr_admin_owner = "acl_add_user1"
159         # Second domain admin that will not be creator of OU parent-child structure
160         self.usr_admin_not_owner = "acl_add_user2"
161         # Regular user
162         self.regular_user = "acl_add_user3"
163         self.regular_user2 = "acl_add_user4"
164         self.regular_user3 = "acl_add_user5"
165         self.test_user1 = "test_add_user1"
166         self.test_user2 = "test_add_user2"
167         self.test_user3 = "test_add_user3"
168         self.test_user4 = "test_add_user4"
169         self.test_group1 = "test_add_group1"
170         self.ou1 = "OU=test_add_ou1"
171         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
172         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
173         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
174         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
175         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user2))
176         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
177         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
178         self.ldb_admin.newuser(self.regular_user, self.user_pass)
179         self.ldb_admin.newuser(self.regular_user2, self.user_pass)
180
181         # add admins to the Domain Admins group
182         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
183                                                 add_members_operation=True)
184         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
185                                                 add_members_operation=True)
186
187         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
188         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
189         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
190         self.ldb_user2 = self.get_ldb_connection(self.regular_user2, self.user_pass)
191
192     def tearDown(self):
193         super(AclAddTests, self).tearDown()
194         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
195                      (self.test_user1, self.ou2, self.base_dn))
196         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
197                      (self.test_user1, self.ou1, self.base_dn))
198         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
199                      (self.test_user2, self.ou1, self.base_dn))
200         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
201                      (self.test_user3, self.ou1, self.base_dn))
202         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
203                      (self.test_user4, self.ou1, self.base_dn))
204         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
205                      (self.test_group1, self.ou2, self.base_dn))
206         delete_force(self.ldb_admin, "CN=test_computer2,%s,%s" %
207                      (self.ou1, self.base_dn))
208         delete_force(self.ldb_admin, "CN=test_computer1,%s,%s" %
209                      (self.ou1, self.base_dn))
210         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
211         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
212         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
213         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
214         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
215         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user2))
216         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
217
218         del self.ldb_notowner
219         del self.ldb_owner
220         del self.ldb_user
221         del self.ldb_user2
222
223     # Make sure top OU is deleted (and so everything under it)
224     def assert_top_ou_deleted(self):
225         res = self.ldb_admin.search(self.base_dn,
226                                     expression="(distinguishedName=%s,%s)" % (
227                                         "OU=test_add_ou1", self.base_dn))
228         self.assertEqual(len(res), 0)
229
230     def test_add_u1(self):
231         """Testing OU with the rights of Doman Admin not creator of the OU """
232         self.assert_top_ou_deleted()
233         # Change descriptor for top level OU
234         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
235         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
236         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
237         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
238         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
239         # Test user and group creation with another domain admin's credentials
240         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
241         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
242                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
243         # Make sure we HAVE created the two objects -- user and group
244         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
245         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
246         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
247         self.assertTrue(len(res) > 0)
248         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
249         self.assertTrue(len(res) > 0)
250
251     def test_add_u2(self):
252         """Testing OU with the regular user that has no rights granted over the OU """
253         self.assert_top_ou_deleted()
254         # Create a parent-child OU structure with domain admin credentials
255         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
256         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
257         # Test user and group creation with regular user credentials
258         try:
259             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
260             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
261                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
262         except LdbError as e:
263             (num, _) = e.args
264             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
265         else:
266             self.fail()
267         # Make sure we HAVEN'T created any of two objects -- user or group
268         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
269         self.assertEqual(len(res), 0)
270         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
271         self.assertEqual(len(res), 0)
272
273     def test_add_u3(self):
274         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
275         self.assert_top_ou_deleted()
276         # Change descriptor for top level OU
277         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
278         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
279         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
280         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
281         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
282         # Test user and group creation with granted user only to one of the objects
283         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
284         try:
285             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
286                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
287         except LdbError as e1:
288             (num, _) = e1.args
289             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
290         else:
291             self.fail()
292         # Make sure we HAVE created the one of two objects -- user
293         res = self.ldb_admin.search(self.base_dn,
294                                     expression="(distinguishedName=%s,%s)" %
295                                     ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
296                                      self.base_dn))
297         self.assertNotEqual(len(res), 0)
298         res = self.ldb_admin.search(self.base_dn,
299                                     expression="(distinguishedName=%s,%s)" %
300                                     ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
301                                      self.base_dn))
302         self.assertEqual(len(res), 0)
303
304     def test_add_u4(self):
305         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
306         self.assert_top_ou_deleted()
307         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
308         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
309         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
310         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
311                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
312         # Make sure we have successfully created the two objects -- user and group
313         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
314         self.assertTrue(len(res) > 0)
315         res = self.ldb_admin.search(self.base_dn,
316                                     expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
317         self.assertTrue(len(res) > 0)
318
319     def test_add_c1(self):
320         """Testing adding a computer object with the rights of regular user granted the right 'Create Computer child objects' """
321         self.assert_top_ou_deleted()
322         # Change descriptor for top level OU
323         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
324         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
325         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
326         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
327         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
328         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
329
330         # Add a computer object, specifying an explicit SD to grant WP to the creator
331         print("Test adding a user with explicit nTSecurityDescriptor")
332         wp_ace = "(A;;WP;;;%s)" % str(user_sid)
333         tmp_desc = security.descriptor.from_sddl("D:%s" % wp_ace, self.domain_sid)
334         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user1, self.base_dn)
335         samaccountname = self.test_user1 + "$"
336         # This should fail, the user has no WD or WO
337         try:
338             self.ldb_user.add({
339                 "dn": dn,
340                 "objectclass": "computer",
341                 "sAMAccountName": samaccountname,
342                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
343                 "servicePrincipalName": "host/" + self.test_user1,
344                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
345         except LdbError as e3:
346             (num, _) = e3.args
347             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
348         else:
349             self.fail()
350
351     def test_add_c2(self):
352         """Testing adding a computer object with the rights of regular user granted the right 'Create User child objects' and WO"""
353         self.assert_top_ou_deleted()
354         # Change descriptor for top level OU
355         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
356         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
357         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
358         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
359         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
360         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
361         # Grant WO, we should still not be able to specify a DACL
362         mod = "(A;CI;WO;;;%s)" % str(user_sid)
363         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
364         # Add a computer object, specifying an explicit SD to grant WP to the creator
365         print("Test adding a user with explicit nTSecurityDescriptor")
366         wp_ace = "(A;;WP;;;%s)" % str(user_sid)
367         tmp_desc = security.descriptor.from_sddl("D:%s" % wp_ace, self.domain_sid)
368         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user1, self.base_dn)
369         samaccountname = self.test_user1 + "$"
370         # This should fail, the user has no WD
371         try:
372             self.ldb_user.add({
373                 "dn": dn,
374                 "objectclass": "computer",
375                 "sAMAccountName": samaccountname,
376                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
377                 "servicePrincipalName": "host/" + self.test_user1,
378                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
379         except LdbError as e3:
380             (num, _) = e3.args
381             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
382         else:
383             self.fail()
384
385         # We still cannot modify the owner or group
386         sd_sddl = f"O:{user_sid}G:{user_sid}"
387         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
388         try:
389             self.ldb_user.add({
390                 "dn": dn,
391                 "objectclass": "computer",
392                 "sAMAccountName": samaccountname,
393                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
394                 "servicePrincipalName": "host/" + self.test_user1,
395                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
396         except LdbError as e3:
397             (num, _) = e3.args
398             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
399         else:
400             self.fail()
401
402     def test_add_c3(self):
403         """Testing adding a computer object with the rights of regular user granted the right 'Create Computer child objects' and WD"""
404         self.assert_top_ou_deleted()
405         # Change descriptor for top level OU
406         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
407         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
408         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
409         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
410         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
411         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
412         # Grant WD, we should still not be able to specify a DACL
413         mod = "(A;CI;WD;;;%s)" % str(user_sid)
414         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
415         # Add a computer object, specifying an explicit SD to grant WP to the creator
416         print("Test adding a user with explicit nTSecurityDescriptor")
417         wp_ace = "(A;;WP;;;%s)" % str(user_sid)
418         sd_sddl = f"O:{user_sid}G:BA"
419         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
420         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user1, self.base_dn)
421         samaccountname = self.test_user1 + "$"
422         # The user has no WO, but this succeeds, because WD means we skip further per-attribute checks
423         try:
424             self.ldb_user.add({
425                 "dn": dn,
426                 "objectclass": "computer",
427                 "sAMAccountName": samaccountname,
428                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
429                 "servicePrincipalName": "host/" + self.test_user1,
430                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
431         except LdbError as e3:
432             self.fail(str(e3))
433
434         # we should be able to modify the DACL
435         tmp_desc = security.descriptor.from_sddl("D:%s" % wp_ace, self.domain_sid)
436         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user2, self.base_dn)
437         samaccountname = self.test_user2 + "$"
438         try:
439             self.ldb_user.add({
440                 "dn": dn,
441                 "objectclass": "computer",
442                 "sAMAccountName": samaccountname,
443                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
444                 "servicePrincipalName": "host/" + self.test_user2,
445                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
446         except LdbError as e3:
447             self.fail(str(e3))
448
449         # verify the ace is present
450         new_sd = self.sd_utils.get_sd_as_sddl("CN=test_add_user2,OU=test_add_ou1,%s" %
451                                               self.base_dn)
452         self.assertIn(wp_ace, new_sd)
453
454     def test_add_c4(self):
455         """Testing adding a computer object with the rights of regular user granted the right 'Create User child objects' and WDWO"""
456         self.assert_top_ou_deleted()
457         # Change descriptor for top level OU
458         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
459         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
460         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
461         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
462         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
463         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
464         # Grant WD and WO, we should be able to update the SD
465         mod = "(A;CI;WDWO;;;%s)" % str(user_sid)
466         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
467         # Add a computer object, specifying an explicit SD to grant WP to the creator
468         print("Test adding a user with explicit nTSecurityDescriptor")
469         wp_ace = "(A;;WP;;;%s)" % str(user_sid)
470         sd_sddl = "O:%sG:BAD:(A;;WP;;;%s)" % (str(user_sid), str(user_sid))
471         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
472         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user1, self.base_dn)
473         samaccountname = self.test_user1 + "$"
474         try:
475             self.ldb_user.add({
476                 "dn": dn,
477                 "objectclass": "computer",
478                 "sAMAccountName": samaccountname,
479                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
480                 "servicePrincipalName": "host/" + self.test_user1,
481                 "nTSecurityDescriptor": ndr_pack(tmp_desc)})
482         except LdbError as e3:
483             self.fail(str(e3))
484
485         # verify the owner and group is present
486         new_sd = self.sd_utils.get_sd_as_sddl("CN=test_add_user1,OU=test_add_ou1,%s" %
487                                               self.base_dn)
488         self.assertIn(f"O:{user_sid}G:BA", new_sd)
489         self.assertIn(wp_ace, new_sd)
490
491     def test_add_c5(self):
492         """Testing adding a computer with an optional attribute """
493         self.assert_top_ou_deleted()
494         # Change descriptor for top level OU
495         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
496         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
497         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
498         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
499         # servicePrincipalName
500         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
501         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
502         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user3, self.base_dn)
503         samaccountname = self.test_user3 + "$"
504         try:
505             self.ldb_user.add({
506                 "dn": dn,
507                 "objectclass": "computer",
508                 "sAMAccountName": samaccountname,
509                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
510                 "servicePrincipalName": "host/" + self.test_user3,
511                 "department": "Ministry of Silly Walks"})
512         except LdbError as e3:
513             (num, _) = e3.args
514             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
515         else:
516             self.fail()
517
518         # grant WP for that attribute and try again
519         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_DEPARTMENT};;{user_sid})"
520         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
521         try:
522             self.ldb_user.add({
523                 "dn": dn,
524                 "objectclass": "computer",
525                 "sAMAccountName": samaccountname,
526                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
527                 "servicePrincipalName": "host/" + self.test_user3,
528                 "department": "Ministry of Silly Walks"})
529         except LdbError as e3:
530             self.fail(str(e3))
531
532     def test_add_c6(self):
533         """Test creating a computer with a mandatory attribute(sAMAccountName)"""
534         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
535         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
536         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
537         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
538         # servicePrincipalName
539         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
540         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
541         # userAccountControl
542         mod = f"(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_USER_ACCOUNT_CONTROL};;{user_sid})"
543         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
544         dn = "CN=%s,OU=test_add_ou1,%s" % (self.test_user4, self.base_dn)
545         samaccountname = self.test_user4 + "$"
546         try:
547             self.ldb_user.add({
548                 "dn": dn,
549                 "objectclass": "computer",
550                 "sAMAccountName": samaccountname,
551                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
552                 "servicePrincipalName": "host/" + self.test_user4})
553         except LdbError as e3:
554             self.fail(str(e3))
555
556     def test_add_computer1(self):
557         """Testing Computer with the rights of regular user granted the right 'Create Computer child objects' """
558         self.assert_top_ou_deleted()
559         # Change descriptor for top level OU
560         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
561         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
562         mod = f"(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})"
563         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
564         mod = f"(OA;CI;SW;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;CO)"
565         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
566
567         # add a Computer object with servicePrincipalName
568         # Creator-Owner has SW from the default SD
569         dn = "CN=test_computer1,OU=test_add_ou1,%s" % (self.base_dn)
570         samaccountname = "test_computer1$"
571         try:
572             self.ldb_user.add({
573                 "dn": dn,
574                 "objectclass": "computer",
575                 "sAMAccountName": samaccountname,
576                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
577                 "servicePrincipalName": "nosuchservice/abcd/abcd"})
578         except LdbError as e3:
579             (num, _) = e3.args
580             if self.strict_checking:
581                 self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
582             else:
583                 self.assertIn(num, (ERR_INSUFFICIENT_ACCESS_RIGHTS,
584                                     ERR_CONSTRAINT_VIOLATION))
585         else:
586             self.fail()
587
588         # Inherited Deny from the parent will not work, because of ordering rules
589         mod = f"(OD;CI;SW;{samba.dsdb.DS_GUID_SCHEMA_ATTR_SERVICE_PRINCIPAL_NAME};;{user_sid})"
590         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
591         try:
592             self.ldb_user.add({
593                 "dn": dn,
594                 "objectclass": "computer",
595                 "sAMAccountName": samaccountname,
596                 "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
597                 "servicePrincipalName": "nosuchservice/abcd/abcd"})
598         except LdbError as e3:
599             (num, _) = e3.args
600             if self.strict_checking:
601                 self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
602             else:
603                 self.assertIn(num, (ERR_INSUFFICIENT_ACCESS_RIGHTS,
604                                     ERR_CONSTRAINT_VIOLATION))
605         else:
606             self.fail()
607
608     def test_add_optional_attr(self):
609         '''Show that adding a computer object with an optional attribute is disallowed'''
610
611         self.assert_top_ou_deleted()
612         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
613
614         user_sid = self.sd_utils.get_object_sid(
615             self.get_user_dn(self.regular_user))
616         self.sd_utils.dacl_add_ace(
617             f'{self.ou1},{self.base_dn}',
618             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
619         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
620         account_name = f'{self.test_user1}$'
621         try:
622             self.ldb_user.add({
623                 'dn': dn,
624                 'objectclass': 'computer',
625                 'sAMAccountName': account_name,
626                 'msSFU30Name': 'foo',
627             })
628         except LdbError as err:
629             num, estr = err.args
630             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
631             if self.strict_checking:
632                 self.assertIn('000021CC', estr)
633         else:
634             self.fail('expected to fail')
635
636     def test_add_domain_admins(self):
637         '''Show that adding a computer object with an optional attribute is allowed if the user is a Domain Administrator'''
638
639         self.assert_top_ou_deleted()
640         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
641
642         self.ldb_admin.add_remove_group_members('Domain Admins',
643                                                 [self.regular_user],
644                                                 add_members_operation=True)
645         ldb_domain_admin = self.get_ldb_connection(self.regular_user,
646                                                    self.user_pass)
647
648         user_sid = self.sd_utils.get_object_sid(
649             self.get_user_dn(self.regular_user))
650         self.sd_utils.dacl_add_ace(
651             f'{self.ou1},{self.base_dn}',
652             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
653         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
654         account_name = f'{self.test_user1}$'
655         try:
656             ldb_domain_admin.add({
657                 'dn': dn,
658                 'objectclass': 'computer',
659                 'sAMAccountName': account_name,
660                 'msSFU30Name': 'foo',
661             })
662         except LdbError as err:
663             self.fail(err)
664
665     def test_add_enterprise_admins(self):
666         '''Show that adding a computer object with an optional attribute is allowed if the user is an Enterprise Administrator'''
667
668         self.assert_top_ou_deleted()
669         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
670
671         self.ldb_admin.add_remove_group_members('Enterprise Admins',
672                                                 [self.regular_user],
673                                                 add_members_operation=True)
674         ldb_enterprise_admin = self.get_ldb_connection(self.regular_user,
675                                                        self.user_pass)
676
677         user_sid = self.sd_utils.get_object_sid(
678             self.get_user_dn(self.regular_user))
679         self.sd_utils.dacl_add_ace(
680             f'{self.ou1},{self.base_dn}',
681             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
682         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
683         account_name = f'{self.test_user1}$'
684         try:
685             ldb_enterprise_admin.add({
686                 'dn': dn,
687                 'objectclass': 'computer',
688                 'sAMAccountName': account_name,
689                 'msSFU30Name': 'foo',
690             })
691         except LdbError as err:
692             self.fail(err)
693
694     def test_add_non_computer(self):
695         '''Show that adding a non-computer object with an optional attribute is allowed'''
696
697         self.assert_top_ou_deleted()
698         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
699
700         user_sid = self.sd_utils.get_object_sid(
701             self.get_user_dn(self.regular_user))
702         self.sd_utils.dacl_add_ace(
703             f'{self.ou1},{self.base_dn}',
704             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_USER};;{user_sid})')
705         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
706         account_name = self.test_user1
707         try:
708             self.ldb_user.add({
709                 'dn': dn,
710                 'objectclass': 'user',
711                 'sAMAccountName': account_name,
712                 'msSFU30Name': 'foo',
713             })
714         except LdbError as err:
715             self.fail(err)
716
717     def test_add_derived_computer(self):
718         '''Show that adding an object derived from computer with an optional attribute is disallowed'''
719
720         self.assert_top_ou_deleted()
721         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
722
723         user_sid = self.sd_utils.get_object_sid(
724             self.get_user_dn(self.regular_user))
725         self.sd_utils.dacl_add_ace(
726             f'{self.ou1},{self.base_dn}',
727             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_MANAGED_SERVICE_ACCOUNT};;{user_sid})')
728         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
729         account_name = f'{self.test_user1}$'
730         try:
731             self.ldb_user.add({
732                 'dn': dn,
733                 'objectclass': 'msDS-ManagedServiceAccount',
734                 'sAMAccountName': account_name,
735                 'msSFU30Name': 'foo',
736             })
737         except LdbError as err:
738             num, estr = err.args
739             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
740             if self.strict_checking:
741                 self.assertIn('000021CC', estr)
742         else:
743             self.fail('expected to fail')
744
745     def test_add_write_dac(self):
746         '''Show that adding a computer object with an optional attribute is allowed if the security descriptor gives WRITE_DAC access'''
747
748         self.assert_top_ou_deleted()
749         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
750
751         user_sid = self.sd_utils.get_object_sid(
752             self.get_user_dn(self.regular_user))
753         self.sd_utils.dacl_add_ace(
754             f'{self.ou1},{self.base_dn}',
755             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
756         self.sd_utils.dacl_add_ace(
757             f'{self.ou1},{self.base_dn}',
758             f'(A;CI;WD;;;{user_sid})')
759         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
760         account_name = f'{self.test_user1}$'
761         try:
762             self.ldb_user.add({
763                 'dn': dn,
764                 'objectclass': 'computer',
765                 'sAMAccountName': account_name,
766                 'msSFU30Name': 'foo',
767             })
768         except LdbError as err:
769             self.fail(err)
770
771     def test_add_system_must_contain(self):
772         '''Show that adding a computer object with only systemMustContain attributes is allowed'''
773
774         self.assert_top_ou_deleted()
775         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
776
777         user_sid = self.sd_utils.get_object_sid(
778             self.get_user_dn(self.regular_user))
779         self.sd_utils.dacl_add_ace(
780             f'{self.ou1},{self.base_dn}',
781             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
782         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
783         account_name = f'{self.test_user1}$'
784         try:
785             self.ldb_user.add({
786                 'dn': dn,
787                 'objectclass': 'computer',
788                 'sAMAccountName': account_name,
789                 'instanceType': '4',
790             })
791         except LdbError as err:
792             self.fail(err)
793
794     def test_add_system_must_contain_denied(self):
795         '''Show that adding a computer object with only systemMustContain attributes is allowed, even when explicitly denied'''
796
797         self.assert_top_ou_deleted()
798         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
799
800         user_sid = self.sd_utils.get_object_sid(
801             self.get_user_dn(self.regular_user))
802         self.sd_utils.dacl_add_ace(
803             f'{self.ou1},{self.base_dn}',
804             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
805         self.sd_utils.dacl_add_ace(
806             f'{self.ou1},{self.base_dn}',
807             f'(D;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_INSTANCE_TYPE};;{user_sid})')
808         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
809         account_name = f'{self.test_user1}$'
810         try:
811             self.ldb_user.add({
812                 'dn': dn,
813                 'objectclass': 'computer',
814                 'sAMAccountName': account_name,
815                 'instanceType': '4',
816             })
817         except LdbError as err:
818             self.fail(err)
819
820     def test_add_unicode_pwd(self):
821         '''Show that adding a computer object with a unicodePwd is allowed'''
822
823         self.assert_top_ou_deleted()
824         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
825
826         user_sid = self.sd_utils.get_object_sid(
827             self.get_user_dn(self.regular_user))
828         self.sd_utils.dacl_add_ace(
829             f'{self.ou1},{self.base_dn}',
830             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
831         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
832         account_name = f'{self.test_user1}$'
833         password = 'Secret007'
834         utf16pw = f'"{password}"'.encode('utf-16-le')
835         try:
836             self.ldb_user.add({
837                 'dn': dn,
838                 'objectclass': 'computer',
839                 'sAMAccountName': account_name,
840                 'unicodePwd': utf16pw,
841             })
842         except LdbError as err:
843             self.fail(err)
844
845     def test_add_user_password(self):
846         '''Show that adding a computer object with a userPassword is allowed'''
847
848         self.assert_top_ou_deleted()
849         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
850
851         user_sid = self.sd_utils.get_object_sid(
852             self.get_user_dn(self.regular_user))
853         self.sd_utils.dacl_add_ace(
854             f'{self.ou1},{self.base_dn}',
855             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
856         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
857         account_name = f'{self.test_user1}$'
858         password = 'Secret007'
859         try:
860             self.ldb_user.add({
861                 'dn': dn,
862                 'objectclass': 'computer',
863                 'sAMAccountName': account_name,
864                 'userPassword': password,
865             })
866         except LdbError as err:
867             self.fail(err)
868
869     def test_add_user_password_denied(self):
870         '''Show that adding a computer object with a userPassword is allowed, even when explicitly denied'''
871
872         self.assert_top_ou_deleted()
873         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
874
875         user_sid = self.sd_utils.get_object_sid(
876             self.get_user_dn(self.regular_user))
877         self.sd_utils.dacl_add_ace(
878             f'{self.ou1},{self.base_dn}',
879             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
880         self.sd_utils.dacl_add_ace(
881             f'{self.ou1},{self.base_dn}',
882             f'(D;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_USER_PASSWORD};;{user_sid})')
883         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
884         account_name = f'{self.test_user1}$'
885         password = 'Secret007'
886         try:
887             self.ldb_user.add({
888                 'dn': dn,
889                 'objectclass': 'computer',
890                 'sAMAccountName': account_name,
891                 'userPassword': password,
892             })
893         except LdbError as err:
894             self.fail(err)
895
896     def test_add_clear_text_password(self):
897         '''Show that adding a computer object with a clearTextPassword is allowed
898
899 Note: this does not work on Windows.'''
900
901         self.assert_top_ou_deleted()
902         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
903
904         user_sid = self.sd_utils.get_object_sid(
905             self.get_user_dn(self.regular_user))
906         self.sd_utils.dacl_add_ace(
907             f'{self.ou1},{self.base_dn}',
908             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
909         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
910         account_name = f'{self.test_user1}$'
911         password = 'Secret007'.encode('utf-16-le')
912         try:
913             self.ldb_user.add({
914                 'dn': dn,
915                 'objectclass': 'computer',
916                 'sAMAccountName': account_name,
917                 'clearTextPassword': password,
918             })
919         except LdbError as err:
920             self.fail(err)
921
922     def test_add_disallowed_attr(self):
923         '''Show that adding a computer object with a denied attribute is disallowed'''
924
925         self.assert_top_ou_deleted()
926         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
927
928         user_sid = self.sd_utils.get_object_sid(
929             self.get_user_dn(self.regular_user))
930         self.sd_utils.dacl_add_ace(
931             f'{self.ou1},{self.base_dn}',
932             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
933         self.sd_utils.dacl_add_ace(
934             f'{self.ou1},{self.base_dn}',
935             f'(D;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_MS_SFU_30};;{user_sid})')
936         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
937         account_name = f'{self.test_user1}$'
938         try:
939             self.ldb_user.add({
940                 'dn': dn,
941                 'objectclass': 'computer',
942                 'sAMAccountName': account_name,
943                 'msSFU30Name': 'foo',
944             })
945         except LdbError as err:
946             num, estr = err.args
947             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
948             if self.strict_checking:
949                 self.assertIn('000021CC', estr)
950         else:
951             self.fail('expected to fail')
952
953     def test_add_allowed_attr(self):
954         '''Show that adding a computer object with an allowed attribute is allowed'''
955
956         self.assert_top_ou_deleted()
957         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
958
959         user_sid = self.sd_utils.get_object_sid(
960             self.get_user_dn(self.regular_user))
961         self.sd_utils.dacl_add_ace(
962             f'{self.ou1},{self.base_dn}',
963             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
964         self.sd_utils.dacl_add_ace(
965             f'{self.ou1},{self.base_dn}',
966             f'(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_MS_SFU_30};;{user_sid})')
967         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
968         account_name = f'{self.test_user1}$'
969         try:
970             self.ldb_user.add({
971                 'dn': dn,
972                 'objectclass': 'computer',
973                 'sAMAccountName': account_name,
974                 'msSFU30Name': 'foo',
975             })
976         except LdbError as err:
977             self.fail(err)
978
979     def test_add_optional_attr_heuristic_0(self):
980         '''Show that adding a computer object with an optional attribute is allowed when AttributeAuthorizationOnLDAPAdd == 0'''
981
982         self.assert_top_ou_deleted()
983         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
984
985         self.set_heuristic(samba.dsdb.DS_HR_ATTR_AUTHZ_ON_LDAP_ADD, b'0')
986
987         user_sid = self.sd_utils.get_object_sid(
988             self.get_user_dn(self.regular_user))
989         self.sd_utils.dacl_add_ace(
990             f'{self.ou1},{self.base_dn}',
991             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
992         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
993         account_name = f'{self.test_user1}$'
994         try:
995             self.ldb_user.add({
996                 'dn': dn,
997                 'objectclass': 'computer',
998                 'sAMAccountName': account_name,
999                 'msSFU30Name': 'foo',
1000             })
1001         except LdbError as err:
1002             self.fail(err)
1003
1004     def test_add_optional_attr_heuristic_2(self):
1005         '''Show that adding a computer object with an optional attribute is allowed when AttributeAuthorizationOnLDAPAdd == 2'''
1006
1007         self.assert_top_ou_deleted()
1008         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1009
1010         self.set_heuristic(samba.dsdb.DS_HR_ATTR_AUTHZ_ON_LDAP_ADD, b'2')
1011
1012         user_sid = self.sd_utils.get_object_sid(
1013             self.get_user_dn(self.regular_user))
1014         self.sd_utils.dacl_add_ace(
1015             f'{self.ou1},{self.base_dn}',
1016             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1017         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1018         account_name = f'{self.test_user1}$'
1019         try:
1020             self.ldb_user.add({
1021                 'dn': dn,
1022                 'objectclass': 'computer',
1023                 'sAMAccountName': account_name,
1024                 'msSFU30Name': 'foo',
1025             })
1026         except LdbError as err:
1027             self.fail(err)
1028
1029     def test_add_security_descriptor_implicit_right(self):
1030         '''Show that adding a computer object with a security descriptor is allowed when BlockOwnerImplicitRights != 1'''
1031
1032         self.assert_top_ou_deleted()
1033         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1034
1035         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'0')
1036
1037         user_sid = self.sd_utils.get_object_sid(
1038             self.get_user_dn(self.regular_user))
1039         self.sd_utils.dacl_add_ace(
1040             f'{self.ou1},{self.base_dn}',
1041             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1042         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1043         account_name = f'{self.test_user1}$'
1044         sd_sddl = f'O:{user_sid}G:{user_sid}'
1045         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1046         try:
1047             self.ldb_user.add({
1048                 'dn': dn,
1049                 'objectclass': 'computer',
1050                 'sAMAccountName': account_name,
1051                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1052             })
1053         except LdbError as err:
1054             self.fail(err)
1055
1056     def test_add_security_descriptor_implicit_right_optional_attr(self):
1057         '''Show that adding a computer object with a security descriptor and an optional attribute is disallowed when BlockOwnerImplicitRights != 1'''
1058
1059         self.assert_top_ou_deleted()
1060         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1061
1062         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'0')
1063
1064         user_sid = self.sd_utils.get_object_sid(
1065             self.get_user_dn(self.regular_user))
1066         self.sd_utils.dacl_add_ace(
1067             f'{self.ou1},{self.base_dn}',
1068             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1069         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1070         account_name = f'{self.test_user1}$'
1071         sd_sddl = f'O:{user_sid}G:{user_sid}'
1072         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1073         try:
1074             self.ldb_user.add({
1075                 'dn': dn,
1076                 'objectclass': 'computer',
1077                 'sAMAccountName': account_name,
1078                 'msSFU30Name': 'foo',
1079                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1080             })
1081         except LdbError as err:
1082             num, estr = err.args
1083             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1084             if self.strict_checking:
1085                 self.assertIn('000021CC', estr)
1086         else:
1087             self.fail('expected to fail')
1088
1089     def test_add_security_descriptor_explicit_right(self):
1090         '''Show that a computer object with a security descriptor can be added if BlockOwnerImplicitRights == 1 and WRITE_DAC is granted'''
1091
1092         self.assert_top_ou_deleted()
1093         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1094
1095         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1096
1097         user_sid = self.sd_utils.get_object_sid(
1098             self.get_user_dn(self.regular_user))
1099         self.sd_utils.dacl_add_ace(
1100             f'{self.ou1},{self.base_dn}',
1101             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1102         self.sd_utils.dacl_add_ace(
1103             f'{self.ou1},{self.base_dn}',
1104             f'(A;CI;WD;;;{user_sid})')
1105         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1106         account_name = f'{self.test_user1}$'
1107         sd_sddl = (f'O:{user_sid}G:{user_sid}'
1108                    f'D:(A;;WP;;;{user_sid})')
1109         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1110         try:
1111             self.ldb_user.add({
1112                 'dn': dn,
1113                 'objectclass': 'computer',
1114                 'sAMAccountName': account_name,
1115                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1116             })
1117         except LdbError as err:
1118             self.fail(err)
1119
1120     def test_add_security_descriptor_explicit_right_no_owner_disallow(self):
1121         '''Show that a computer object with a security descriptor can be added if BlockOwnerImplicitRights == 1, WRITE_DAC is granted, and WRITE_OWNER is denied'''
1122
1123         self.assert_top_ou_deleted()
1124         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1125
1126         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1127
1128         user_sid = self.sd_utils.get_object_sid(
1129             self.get_user_dn(self.regular_user))
1130         self.sd_utils.dacl_add_ace(
1131             f'{self.ou1},{self.base_dn}',
1132             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1133         self.sd_utils.dacl_add_ace(
1134             f'{self.ou1},{self.base_dn}',
1135             f'(A;CI;WD;;;{user_sid})')
1136         self.sd_utils.dacl_add_ace(
1137             f'{self.ou1},{self.base_dn}',
1138             f'(D;CI;WO;;;{user_sid})')
1139         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1140         account_name = f'{self.test_user1}$'
1141         sd_sddl = f'D:(A;;WP;;;{user_sid})'
1142         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1143         try:
1144             self.ldb_user.add({
1145                 'dn': dn,
1146                 'objectclass': 'computer',
1147                 'sAMAccountName': account_name,
1148                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1149             })
1150         except LdbError as err:
1151             self.fail(err)
1152
1153     def test_add_security_descriptor_explicit_right_owner_disallow(self):
1154         '''Show that a computer object with a security descriptor containing an owner and group can be added if BlockOwnerImplicitRights == 1, WRITE_DAC is granted, and WRITE_OWNER is denied'''
1155
1156         self.assert_top_ou_deleted()
1157         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1158
1159         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1160
1161         user_sid = self.sd_utils.get_object_sid(
1162             self.get_user_dn(self.regular_user))
1163         self.sd_utils.dacl_add_ace(
1164             f'{self.ou1},{self.base_dn}',
1165             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1166         self.sd_utils.dacl_add_ace(
1167             f'{self.ou1},{self.base_dn}',
1168             f'(A;CI;WD;;;{user_sid})')
1169         self.sd_utils.dacl_add_ace(
1170             f'{self.ou1},{self.base_dn}',
1171             f'(D;CI;WO;;;{user_sid})')
1172         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1173         account_name = f'{self.test_user1}$'
1174         sd_sddl = (f'O:{user_sid}G:{user_sid}'
1175                    f'D:(A;;WP;;;{user_sid})')
1176         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1177         try:
1178             self.ldb_user.add({
1179                 'dn': dn,
1180                 'objectclass': 'computer',
1181                 'sAMAccountName': account_name,
1182                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1183             })
1184         except LdbError as err:
1185             self.fail(err)
1186
1187     def test_add_security_descriptor_explicit_right_sacl(self):
1188         '''Show that adding a computer object with a security descriptor containing a SACL is disallowed if BlockOwnerImplicitRights == 1 and WRITE_DAC is granted'''
1189
1190         self.assert_top_ou_deleted()
1191         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1192
1193         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1194
1195         user_sid = self.sd_utils.get_object_sid(
1196             self.get_user_dn(self.regular_user))
1197         self.sd_utils.dacl_add_ace(
1198             f'{self.ou1},{self.base_dn}',
1199             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1200         self.sd_utils.dacl_add_ace(
1201             f'{self.ou1},{self.base_dn}',
1202             f'(A;CI;WD;;;{user_sid})')
1203         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1204         account_name = f'{self.test_user1}$'
1205         sd_sddl = (f'O:{user_sid}G:{user_sid}'
1206                    f'D:(A;;WP;;;{user_sid})S:(A;;WP;;;{user_sid})')
1207         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1208         try:
1209             self.ldb_user.add({
1210                 'dn': dn,
1211                 'objectclass': 'computer',
1212                 'sAMAccountName': account_name,
1213                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1214             })
1215         except LdbError as err:
1216             num, estr = err.args
1217             if self.strict_checking:
1218                 self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
1219                 self.assertIn(f'{werror.WERR_PRIVILEGE_NOT_HELD:08X}', estr)
1220             else:
1221                 self.assertIn(num, (ERR_CONSTRAINT_VIOLATION,
1222                                     ERR_INSUFFICIENT_ACCESS_RIGHTS))
1223         else:
1224             self.fail('expected to fail')
1225
1226     def test_add_security_descriptor_explicit_right_owner_not_us(self):
1227         '''Show that adding a computer object with a security descriptor owned by another is disallowed if BlockOwnerImplicitRights == 1 and WRITE_DAC is granted'''
1228
1229         self.assert_top_ou_deleted()
1230         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1231
1232         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1233
1234         user_sid = self.sd_utils.get_object_sid(
1235             self.get_user_dn(self.regular_user))
1236         self.sd_utils.dacl_add_ace(
1237             f'{self.ou1},{self.base_dn}',
1238             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1239         self.sd_utils.dacl_add_ace(
1240             f'{self.ou1},{self.base_dn}',
1241             f'(A;CI;WD;;;{user_sid})')
1242         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1243         account_name = f'{self.test_user1}$'
1244         sd_sddl = 'O:BA'
1245         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1246         try:
1247             self.ldb_user.add({
1248                 'dn': dn,
1249                 'objectclass': 'computer',
1250                 'sAMAccountName': account_name,
1251                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1252             })
1253         except LdbError as err:
1254             num, estr = err.args
1255             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
1256             self.assertIn(f'{werror.WERR_INVALID_OWNER:08X}', estr)
1257         else:
1258             self.fail('expected to fail')
1259
1260     def test_add_security_descriptor_explicit_right_owner_not_us_admin(self):
1261         '''Show that adding a computer object with a security descriptor owned by another is allowed if BlockOwnerImplicitRights == 1, WRITE_DAC is granted, and we are in Domain Admins'''
1262
1263         self.assert_top_ou_deleted()
1264         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1265
1266         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1267
1268         user_sid = self.sd_utils.get_object_sid(
1269             self.get_user_dn(self.regular_user))
1270         self.sd_utils.dacl_add_ace(
1271             f'{self.ou1},{self.base_dn}',
1272             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1273         self.sd_utils.dacl_add_ace(
1274             f'{self.ou1},{self.base_dn}',
1275             f'(A;CI;WD;;;{user_sid})')
1276         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1277         account_name = f'{self.test_user1}$'
1278         sd_sddl = 'O:BA'
1279         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1280         try:
1281             self.ldb_admin.add({
1282                 'dn': dn,
1283                 'objectclass': 'computer',
1284                 'sAMAccountName': account_name,
1285                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1286             })
1287         except LdbError as err:
1288             self.fail(err)
1289
1290     def test_add_no_implicit_right(self):
1291         '''Show that adding a computer object without a security descriptor is allowed when BlockOwnerImplicitRights == 1'''
1292
1293         self.assert_top_ou_deleted()
1294         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1295
1296         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1297
1298         user_sid = self.sd_utils.get_object_sid(
1299             self.get_user_dn(self.regular_user))
1300         self.sd_utils.dacl_add_ace(
1301             f'{self.ou1},{self.base_dn}',
1302             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1303         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1304         account_name = f'{self.test_user1}$'
1305         try:
1306             self.ldb_user.add({
1307                 'dn': dn,
1308                 'objectclass': 'computer',
1309                 'sAMAccountName': account_name,
1310             })
1311         except LdbError as err:
1312             self.fail(err)
1313
1314     def test_add_security_descriptor_owner(self):
1315         '''Show that adding a computer object with a security descriptor containing an owner is disallowed when BlockOwnerImplicitRights == 1'''
1316
1317         self.assert_top_ou_deleted()
1318         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1319
1320         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1321
1322         user_sid = self.sd_utils.get_object_sid(
1323             self.get_user_dn(self.regular_user))
1324         self.sd_utils.dacl_add_ace(
1325             f'{self.ou1},{self.base_dn}',
1326             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1327         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1328         account_name = f'{self.test_user1}$'
1329         sd_sddl = f'O:{user_sid}'
1330         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1331         try:
1332             self.ldb_user.add({
1333                 'dn': dn,
1334                 'objectclass': 'computer',
1335                 'sAMAccountName': account_name,
1336                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1337             })
1338         except LdbError as err:
1339             num, estr = err.args
1340             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1341             if self.strict_checking:
1342                 self.assertIn('000021CC', estr)
1343         else:
1344             self.fail('expected to fail')
1345
1346     def test_add_security_descriptor_owner_implicit(self):
1347         '''Show that adding a computer object with a security descriptor containing an owner is disallowed when BlockOwnerImplicitRights == 1, even when we are the owner of the OU security descriptor'''
1348
1349         self.assert_top_ou_deleted()
1350         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1351
1352         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1353
1354         user_sid = self.sd_utils.get_object_sid(
1355             self.get_user_dn(self.regular_user))
1356
1357         ou_controls = [
1358             f'sd_flags:1:{security.SECINFO_OWNER|security.SECINFO_DACL}']
1359         ou_sddl = (f'O:{user_sid}'
1360                    f'D:(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1361         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
1362         self.sd_utils.modify_sd_on_dn(f'{self.ou1},{self.base_dn}', ou_desc,
1363                                       controls=ou_controls)
1364
1365         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1366         account_name = f'{self.test_user1}$'
1367         sd_sddl = f'O:{user_sid}'
1368         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1369         try:
1370             self.ldb_user.add({
1371                 'dn': dn,
1372                 'objectclass': 'computer',
1373                 'sAMAccountName': account_name,
1374                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1375             })
1376         except LdbError as err:
1377             num, estr = err.args
1378             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1379             if self.strict_checking:
1380                 self.assertIn('000021CC', estr)
1381         else:
1382             self.fail('expected to fail')
1383
1384     def test_add_security_descriptor_owner_explicit_right(self):
1385         '''Show that adding a computer object with a security descriptor containing an owner is disallowed when BlockOwnerImplicitRights == 1, even with WO'''
1386
1387         self.assert_top_ou_deleted()
1388         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1389
1390         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1391
1392         user_sid = self.sd_utils.get_object_sid(
1393             self.get_user_dn(self.regular_user))
1394         self.sd_utils.dacl_add_ace(
1395             f'{self.ou1},{self.base_dn}',
1396             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1397         self.sd_utils.dacl_add_ace(
1398             f'{self.ou1},{self.base_dn}',
1399             f'(A;CI;WO;;;{user_sid})')
1400         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1401         account_name = f'{self.test_user1}$'
1402         sd_sddl = f'O:{user_sid}'
1403         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1404         try:
1405             self.ldb_user.add({
1406                 'dn': dn,
1407                 'objectclass': 'computer',
1408                 'sAMAccountName': account_name,
1409                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1410             })
1411         except LdbError as err:
1412             num, estr = err.args
1413             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1414             if self.strict_checking:
1415                 self.assertIn('000021CC', estr)
1416         else:
1417             self.fail('expected to fail')
1418
1419     def test_add_security_descriptor_group(self):
1420         '''Show that adding a computer object with a security descriptor containing an group is disallowed when BlockOwnerImplicitRights == 1'''
1421
1422         self.assert_top_ou_deleted()
1423         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1424
1425         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1426
1427         user_sid = self.sd_utils.get_object_sid(
1428             self.get_user_dn(self.regular_user))
1429         self.sd_utils.dacl_add_ace(
1430             f'{self.ou1},{self.base_dn}',
1431             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1432         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1433         account_name = f'{self.test_user1}$'
1434         sd_sddl = f'G:{user_sid}'
1435         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1436         try:
1437             self.ldb_user.add({
1438                 'dn': dn,
1439                 'objectclass': 'computer',
1440                 'sAMAccountName': account_name,
1441                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1442             })
1443         except LdbError as err:
1444             num, estr = err.args
1445             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1446             if self.strict_checking:
1447                 self.assertIn('000021CC', estr)
1448         else:
1449             self.fail('expected to fail')
1450
1451     def test_add_security_descriptor_group_explicit_right(self):
1452         '''Show that adding a computer object with a security descriptor containing an group is disallowed when BlockOwnerImplicitRights == 1, even with WO'''
1453
1454         self.assert_top_ou_deleted()
1455         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1456
1457         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1458
1459         user_sid = self.sd_utils.get_object_sid(
1460             self.get_user_dn(self.regular_user))
1461         self.sd_utils.dacl_add_ace(
1462             f'{self.ou1},{self.base_dn}',
1463             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1464         self.sd_utils.dacl_add_ace(
1465             f'{self.ou1},{self.base_dn}',
1466             f'(A;CI;WO;;;{user_sid})')
1467         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1468         account_name = f'{self.test_user1}$'
1469         sd_sddl = f'G:{user_sid}'
1470         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1471         try:
1472             self.ldb_user.add({
1473                 'dn': dn,
1474                 'objectclass': 'computer',
1475                 'sAMAccountName': account_name,
1476                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1477             })
1478         except LdbError as err:
1479             num, estr = err.args
1480             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1481             if self.strict_checking:
1482                 self.assertIn('000021CC', estr)
1483         else:
1484             self.fail('expected to fail')
1485
1486     def test_add_security_descriptor_group_implicit(self):
1487         '''Show that adding a computer object with a security descriptor containing an group is disallowed when BlockOwnerImplicitRights == 1, even when we are the owner of the OU security descriptor'''
1488
1489         self.assert_top_ou_deleted()
1490         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1491
1492         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1493
1494         user_sid = self.sd_utils.get_object_sid(
1495             self.get_user_dn(self.regular_user))
1496
1497         ou_controls = [
1498             f'sd_flags:1:{security.SECINFO_OWNER|security.SECINFO_DACL}']
1499         ou_sddl = (f'O:{user_sid}'
1500                    f'D:(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1501         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
1502         self.sd_utils.modify_sd_on_dn(f'{self.ou1},{self.base_dn}', ou_desc,
1503                                       controls=ou_controls)
1504
1505         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1506         account_name = f'{self.test_user1}$'
1507         sd_sddl = f'G:{user_sid}'
1508         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1509         try:
1510             self.ldb_user.add({
1511                 'dn': dn,
1512                 'objectclass': 'computer',
1513                 'sAMAccountName': account_name,
1514                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1515             })
1516         except LdbError as err:
1517             num, estr = err.args
1518             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1519             if self.strict_checking:
1520                 self.assertIn('000021CC', estr)
1521         else:
1522             self.fail('expected to fail')
1523
1524     def test_add_security_descriptor_dacl(self):
1525         '''Show that adding a computer object with a security descriptor containing a DACL is disallowed when BlockOwnerImplicitRights == 1'''
1526
1527         self.assert_top_ou_deleted()
1528         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1529
1530         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1531
1532         user_sid = self.sd_utils.get_object_sid(
1533             self.get_user_dn(self.regular_user))
1534         self.sd_utils.dacl_add_ace(
1535             f'{self.ou1},{self.base_dn}',
1536             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1537         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1538         account_name = f'{self.test_user1}$'
1539         sd_sddl = f'D:(A;;WP;;;{user_sid})'
1540         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1541         try:
1542             self.ldb_user.add({
1543                 'dn': dn,
1544                 'objectclass': 'computer',
1545                 'sAMAccountName': account_name,
1546                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1547             })
1548         except LdbError as err:
1549             num, estr = err.args
1550             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1551             if self.strict_checking:
1552                 self.assertIn('000021CC', estr)
1553         else:
1554             self.fail('expected to fail')
1555
1556     def test_add_security_descriptor_dacl_implicit(self):
1557         '''Show that adding a computer object with a security descriptor containing a DACL is disallowed when BlockOwnerImplicitRights == 1, even when we are the owner of the OU security descriptor'''
1558
1559         self.assert_top_ou_deleted()
1560         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1561
1562         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1563
1564         user_sid = self.sd_utils.get_object_sid(
1565             self.get_user_dn(self.regular_user))
1566
1567         ou_controls = [
1568             f'sd_flags:1:{security.SECINFO_OWNER|security.SECINFO_DACL}']
1569         ou_sddl = (f'O:{user_sid}'
1570                    f'D:(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1571         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
1572         self.sd_utils.modify_sd_on_dn(f'{self.ou1},{self.base_dn}', ou_desc,
1573                                       controls=ou_controls)
1574
1575         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1576         account_name = f'{self.test_user1}$'
1577         sd_sddl = f'D:(A;;WP;;;{user_sid})'
1578         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1579         try:
1580             self.ldb_user.add({
1581                 'dn': dn,
1582                 'objectclass': 'computer',
1583                 'sAMAccountName': account_name,
1584                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1585             })
1586         except LdbError as err:
1587             num, estr = err.args
1588             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1589             if self.strict_checking:
1590                 self.assertIn('000021CC', estr)
1591         else:
1592             self.fail('expected to fail')
1593
1594     def test_add_security_descriptor_sacl(self):
1595         '''Show that adding a computer object with a security descriptor containing a SACL is disallowed when BlockOwnerImplicitRights == 1'''
1596
1597         self.assert_top_ou_deleted()
1598         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1599
1600         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1601
1602         user_sid = self.sd_utils.get_object_sid(
1603             self.get_user_dn(self.regular_user))
1604         self.sd_utils.dacl_add_ace(
1605             f'{self.ou1},{self.base_dn}',
1606             f'(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1607         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1608         account_name = f'{self.test_user1}$'
1609         sd_sddl = f'S:(A;;WP;;;{user_sid})'
1610         tmp_desc = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
1611         try:
1612             self.ldb_user.add({
1613                 'dn': dn,
1614                 'objectclass': 'computer',
1615                 'sAMAccountName': account_name,
1616                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1617             })
1618         except LdbError as err:
1619             num, estr = err.args
1620             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1621             if self.strict_checking:
1622                 self.assertIn('000021CC', estr)
1623         else:
1624             self.fail('expected to fail')
1625
1626     def test_add_security_descriptor_empty(self):
1627         '''Show that adding a computer object with an empty security descriptor is disallowed when BlockOwnerImplicitRights == 1, even when we are the owner of the OU security descriptor'''
1628
1629         self.assert_top_ou_deleted()
1630         self.ldb_owner.create_ou(f'{self.ou1},{self.base_dn}')
1631
1632         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'1')
1633
1634         user_sid = self.sd_utils.get_object_sid(
1635             self.get_user_dn(self.regular_user))
1636
1637         ou_controls = [
1638             f'sd_flags:1:{security.SECINFO_OWNER|security.SECINFO_DACL}']
1639         ou_sddl = (f'O:{user_sid}'
1640                    f'D:(OA;CI;CC;{samba.dsdb.DS_GUID_SCHEMA_CLASS_COMPUTER};;{user_sid})')
1641         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
1642         self.sd_utils.modify_sd_on_dn(f'{self.ou1},{self.base_dn}', ou_desc,
1643                                       controls=ou_controls)
1644
1645         dn = f'CN={self.test_user1},{self.ou1},{self.base_dn}'
1646         account_name = f'{self.test_user1}$'
1647         tmp_desc = security.descriptor.from_sddl('', self.domain_sid)
1648         try:
1649             self.ldb_user.add({
1650                 'dn': dn,
1651                 'objectclass': 'computer',
1652                 'sAMAccountName': account_name,
1653                 'ntSecurityDescriptor': ndr_pack(tmp_desc),
1654             })
1655         except LdbError as err:
1656             num, estr = err.args
1657             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1658             if self.strict_checking:
1659                 self.assertIn('000021CC', estr)
1660         else:
1661             self.fail('expected to fail')
1662
1663     def test_add_anonymous(self):
1664         """Test add operation with anonymous user"""
1665         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1666         try:
1667             anonymous.newuser("test_add_anonymous", self.user_pass)
1668         except LdbError as e2:
1669             (num, _) = e2.args
1670             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1671         else:
1672             self.fail()
1673
1674 # tests on ldap modify operations
1675
1676
1677 class AclModifyTests(AclTests):
1678
1679     def setUp(self):
1680         super(AclModifyTests, self).setUp()
1681         self.user_with_wp = "acl_mod_user1"
1682         self.user_with_sm = "acl_mod_user2"
1683         self.user_with_group_sm = "acl_mod_user3"
1684         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1685         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
1686         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
1687         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1688         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
1689         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
1690         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
1691         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1692         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1693         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
1694
1695     def tearDown(self):
1696         super(AclModifyTests, self).tearDown()
1697         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
1698         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
1699         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
1700         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
1701         delete_force(self.ldb_admin, "CN=test_mod_hostname,OU=test_modify_ou1," + self.base_dn)
1702         delete_force(self.ldb_admin, "CN=test_modify_ou1_user,OU=test_modify_ou1," + self.base_dn)
1703         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
1704         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1705         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
1706         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
1707         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
1708         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
1709
1710         del self.ldb_user
1711         del self.ldb_user2
1712         del self.ldb_user3
1713
1714     def get_sd_rights_effective(self, samdb, dn):
1715         res = samdb.search(dn,
1716                            scope=SCOPE_BASE,
1717                            attrs=['sDRightsEffective'])
1718         sd_rights = res[0].get('sDRightsEffective', idx=0)
1719         if sd_rights is not None:
1720             sd_rights = int(sd_rights)
1721
1722         return sd_rights
1723
1724     def test_modify_u1(self):
1725         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
1726         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
1727         # First test object -- User
1728         print("Testing modify on User object")
1729         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
1730         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
1731         ldif = """
1732 dn: """ + self.get_user_dn("test_modify_user1") + """
1733 changetype: modify
1734 replace: displayName
1735 displayName: test_changed"""
1736         self.ldb_user.modify_ldif(ldif)
1737         res = self.ldb_admin.search(self.base_dn,
1738                                     expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
1739         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1740         # Second test object -- Group
1741         print("Testing modify on Group object")
1742         self.ldb_admin.newgroup("test_modify_group1",
1743                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1744         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
1745         ldif = """
1746 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
1747 changetype: modify
1748 replace: displayName
1749 displayName: test_changed"""
1750         self.ldb_user.modify_ldif(ldif)
1751         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
1752         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1753         # Third test object -- Organizational Unit
1754         print("Testing modify on OU object")
1755         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
1756         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
1757         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
1758         ldif = """
1759 dn: OU=test_modify_ou1,""" + self.base_dn + """
1760 changetype: modify
1761 replace: displayName
1762 displayName: test_changed"""
1763         self.ldb_user.modify_ldif(ldif)
1764         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
1765         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1766
1767     def test_modify_u2(self):
1768         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
1769         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
1770         # First test object -- User
1771         print("Testing modify on User object")
1772         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
1773         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
1774         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
1775         # Modify on attribute you have rights for
1776         ldif = """
1777 dn: """ + self.get_user_dn("test_modify_user1") + """
1778 changetype: modify
1779 replace: displayName
1780 displayName: test_changed"""
1781         self.ldb_user.modify_ldif(ldif)
1782         res = self.ldb_admin.search(self.base_dn,
1783                                     expression="(distinguishedName=%s)" %
1784                                     self.get_user_dn("test_modify_user1"))
1785         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1786         # Modify on attribute you do not have rights for granted
1787         ldif = """
1788 dn: """ + self.get_user_dn("test_modify_user1") + """
1789 changetype: modify
1790 replace: url
1791 url: www.samba.org"""
1792         try:
1793             self.ldb_user.modify_ldif(ldif)
1794         except LdbError as e3:
1795             (num, _) = e3.args
1796             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1797         else:
1798             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1799             self.fail()
1800         # Second test object -- Group
1801         print("Testing modify on Group object")
1802         self.ldb_admin.newgroup("test_modify_group1",
1803                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1804         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
1805         ldif = """
1806 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
1807 changetype: modify
1808 replace: displayName
1809 displayName: test_changed"""
1810         self.ldb_user.modify_ldif(ldif)
1811         res = self.ldb_admin.search(self.base_dn,
1812                                     expression="(distinguishedName=%s)" %
1813                                     str("CN=test_modify_group1,CN=Users," + self.base_dn))
1814         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1815         # Modify on attribute you do not have rights for granted
1816         ldif = """
1817 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
1818 changetype: modify
1819 replace: url
1820 url: www.samba.org"""
1821         try:
1822             self.ldb_user.modify_ldif(ldif)
1823         except LdbError as e4:
1824             (num, _) = e4.args
1825             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1826         else:
1827             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1828             self.fail()
1829         # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
1830         ldif = """
1831 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
1832 changetype: modify
1833 replace: url
1834 url: www.samba.org
1835 replace: displayName
1836 displayName: test_changed"""
1837         try:
1838             self.ldb_user.modify_ldif(ldif)
1839         except LdbError as e5:
1840             (num, _) = e5.args
1841             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1842         else:
1843             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1844             self.fail()
1845         # Second test object -- Organizational Unit
1846         print("Testing modify on OU object")
1847         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
1848         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
1849         ldif = """
1850 dn: OU=test_modify_ou1,""" + self.base_dn + """
1851 changetype: modify
1852 replace: displayName
1853 displayName: test_changed"""
1854         self.ldb_user.modify_ldif(ldif)
1855         res = self.ldb_admin.search(self.base_dn,
1856                                     expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
1857                                                                               + self.base_dn))
1858         self.assertEqual(str(res[0]["displayName"][0]), "test_changed")
1859         # Modify on attribute you do not have rights for granted
1860         ldif = """
1861 dn: OU=test_modify_ou1,""" + self.base_dn + """
1862 changetype: modify
1863 replace: url
1864 url: www.samba.org"""
1865         try:
1866             self.ldb_user.modify_ldif(ldif)
1867         except LdbError as e6:
1868             (num, _) = e6.args
1869             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1870         else:
1871             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1872             self.fail()
1873
1874     def test_modify_u3(self):
1875         """7 Modify one attribute as you have no what so ever rights granted"""
1876         # First test object -- User
1877         print("Testing modify on User object")
1878         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
1879         # Modify on attribute you do not have rights for granted
1880         ldif = """
1881 dn: """ + self.get_user_dn("test_modify_user1") + """
1882 changetype: modify
1883 replace: url
1884 url: www.samba.org"""
1885         try:
1886             self.ldb_user.modify_ldif(ldif)
1887         except LdbError as e7:
1888             (num, _) = e7.args
1889             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1890         else:
1891             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1892             self.fail()
1893
1894         # Second test object -- Group
1895         print("Testing modify on Group object")
1896         self.ldb_admin.newgroup("test_modify_group1",
1897                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1898         # Modify on attribute you do not have rights for granted
1899         ldif = """
1900 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
1901 changetype: modify
1902 replace: url
1903 url: www.samba.org"""
1904         try:
1905             self.ldb_user.modify_ldif(ldif)
1906         except LdbError as e8:
1907             (num, _) = e8.args
1908             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1909         else:
1910             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1911             self.fail()
1912
1913         # Second test object -- Organizational Unit
1914         print("Testing modify on OU object")
1915         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
1916         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
1917         # Modify on attribute you do not have rights for granted
1918         ldif = """
1919 dn: OU=test_modify_ou1,""" + self.base_dn + """
1920 changetype: modify
1921 replace: url
1922 url: www.samba.org"""
1923         try:
1924             self.ldb_user.modify_ldif(ldif)
1925         except LdbError as e9:
1926             (num, _) = e9.args
1927             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1928         else:
1929             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1930             self.fail()
1931
1932     def test_modify_u4(self):
1933         """11 Grant WP to PRINCIPAL_SELF and test modify"""
1934         ldif = """
1935 dn: """ + self.get_user_dn(self.user_with_wp) + """
1936 changetype: modify
1937 add: adminDescription
1938 adminDescription: blah blah blah"""
1939         try:
1940             self.ldb_user.modify_ldif(ldif)
1941         except LdbError as e10:
1942             (num, _) = e10.args
1943             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1944         else:
1945             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1946             self.fail()
1947
1948         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
1949         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1950         # Modify on attribute you have rights for
1951         self.ldb_user.modify_ldif(ldif)
1952         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
1953                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"])
1954         self.assertEqual(str(res[0]["adminDescription"][0]), "blah blah blah")
1955
1956     def test_modify_u5(self):
1957         """12 test self membership"""
1958         ldif = """
1959 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
1960 changetype: modify
1961 add: Member
1962 Member: """ + self.get_user_dn(self.user_with_sm)
1963 # the user has no rights granted, this should fail
1964         try:
1965             self.ldb_user2.modify_ldif(ldif)
1966         except LdbError as e11:
1967             (num, _) = e11.args
1968             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1969         else:
1970             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1971             self.fail()
1972
1973 # grant self-membership, should be able to add himself
1974         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
1975         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
1976         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
1977         self.ldb_user2.modify_ldif(ldif)
1978         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
1979                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
1980         self.assertEqual(str(res[0]["Member"][0]), self.get_user_dn(self.user_with_sm))
1981 # but not other users
1982         ldif = """
1983 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
1984 changetype: modify
1985 add: Member
1986 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
1987         try:
1988             self.ldb_user2.modify_ldif(ldif)
1989         except LdbError as e12:
1990             (num, _) = e12.args
1991             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1992         else:
1993             self.fail()
1994
1995     def test_modify_u6(self):
1996         """13 test self membership"""
1997         ldif = """
1998 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
1999 changetype: modify
2000 add: Member
2001 Member: """ + self.get_user_dn(self.user_with_sm) + """
2002 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
2003
2004 # grant self-membership, should be able to add himself  but not others at the same time
2005         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
2006         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
2007         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
2008         try:
2009             self.ldb_user2.modify_ldif(ldif)
2010         except LdbError as e13:
2011             (num, _) = e13.args
2012             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
2013         else:
2014             self.fail()
2015
2016     def test_modify_u7(self):
2017         """13 User with WP modifying Member"""
2018 # a second user is given write property permission
2019         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
2020         mod = "(A;;WP;;;%s)" % str(user_sid)
2021         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
2022         ldif = """
2023 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
2024 changetype: modify
2025 add: Member
2026 Member: """ + self.get_user_dn(self.user_with_wp)
2027         self.ldb_user.modify_ldif(ldif)
2028         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
2029                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
2030         self.assertEqual(str(res[0]["Member"][0]), self.get_user_dn(self.user_with_wp))
2031         ldif = """
2032 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
2033 changetype: modify
2034 delete: Member"""
2035         self.ldb_user.modify_ldif(ldif)
2036         ldif = """
2037 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
2038 changetype: modify
2039 add: Member
2040 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
2041         self.ldb_user.modify_ldif(ldif)
2042         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
2043                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
2044         self.assertEqual(str(res[0]["Member"][0]), "CN=test_modify_user2,CN=Users," + self.base_dn)
2045
2046     def test_modify_dacl_explicit_user(self):
2047         '''Modify the DACL of a user's security descriptor when we have RIGHT_WRITE_DAC'''
2048
2049         ou_name = 'test_modify_ou1'
2050         ou_dn = f'OU={ou_name},{self.base_dn}'
2051
2052         username = 'test_modify_ou1_user'
2053         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2054
2055         sd_sddl = 'D:(A;;WP;;;BA)'
2056         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2057
2058         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2059         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2060         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2061
2062         self.ldb_admin.newuser(username, self.user_pass,
2063                                userou=f'OU={ou_name}',
2064                                sd=descriptor)
2065
2066         new_sddl = f'D:(A;;WP;;;{self.user_sid})'
2067         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2068
2069         controls = [f'sd_flags:1:{security.SECINFO_DACL}']
2070
2071         # Check our effective rights.
2072         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2073         expected_rights = 0
2074         self.assertEqual(expected_rights, effective_rights)
2075
2076         # The user should not be able to modify the DACL.
2077         message = Message(user_dn)
2078         message['nTSecurityDescriptor'] = MessageElement(
2079             ndr_pack(new_desc),
2080             FLAG_MOD_REPLACE,
2081             'nTSecurityDescriptor')
2082
2083         # The update fails since we don't have WRITE_DAC.
2084         try:
2085             self.ldb_user.modify(message, controls=controls)
2086         except LdbError as err:
2087             num, estr = err.args
2088             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2089             if self.strict_checking:
2090                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2091         else:
2092             self.fail()
2093
2094         # Grant ourselves WRITE_DAC.
2095         write_dac_sddl = f'(A;CI;WD;;;{self.user_sid})'
2096         self.sd_utils.dacl_add_ace(ou_dn, write_dac_sddl)
2097
2098         # Check our effective rights.
2099         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2100         expected_rights = security.SECINFO_DACL
2101         self.assertEqual(expected_rights, effective_rights)
2102
2103         # The update fails if we don't specify the controls.
2104         try:
2105             self.ldb_user.modify(message)
2106         except LdbError as err:
2107             if self.strict_checking:
2108                 num, estr = err.args
2109                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2110                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2111         else:
2112             self.fail()
2113
2114         # The update succeeds when specifying the controls.
2115         self.ldb_user.modify(message, controls=controls)
2116
2117     def test_modify_dacl_explicit_computer(self):
2118         '''Modify the DACL of a computer's security descriptor when we have RIGHT_WRITE_DAC'''
2119
2120         ou_name = 'test_modify_ou1'
2121         ou_dn = f'OU={ou_name},{self.base_dn}'
2122
2123         account_name = 'test_mod_hostname'
2124         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2125
2126         sd_sddl = 'D:(A;;WP;;;BA)'
2127         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2128
2129         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2130         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2131         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2132
2133         # Create the account.
2134         self.ldb_admin.add({
2135             'dn': dn,
2136             'objectClass': 'computer',
2137             'sAMAccountName': f'{account_name}$',
2138             'nTSecurityDescriptor': ndr_pack(descriptor),
2139         })
2140
2141         new_sddl = f'D:(A;;WP;;;{self.user_sid})'
2142         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2143
2144         controls = [f'sd_flags:1:{security.SECINFO_DACL}']
2145
2146         # Check our effective rights.
2147         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2148         expected_rights = None
2149         self.assertEqual(expected_rights, effective_rights)
2150
2151         # The user should not be able to modify the DACL.
2152         message = Message(dn)
2153         message['nTSecurityDescriptor'] = MessageElement(
2154             ndr_pack(new_desc),
2155             FLAG_MOD_REPLACE,
2156             'nTSecurityDescriptor')
2157
2158         # The update fails since we don't have WRITE_DAC.
2159         try:
2160             self.ldb_user.modify(message, controls=controls)
2161         except LdbError as err:
2162             num, estr = err.args
2163             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2164             if self.strict_checking:
2165                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2166         else:
2167             self.fail()
2168
2169         # Grant ourselves WRITE_DAC.
2170         write_dac_sddl = f'(A;CI;WD;;;{self.user_sid})'
2171         self.sd_utils.dacl_add_ace(ou_dn, write_dac_sddl)
2172
2173         # Check our effective rights.
2174         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2175         expected_rights = None
2176         self.assertEqual(expected_rights, effective_rights)
2177
2178         # The update fails if we don't specify the controls.
2179         try:
2180             self.ldb_user.modify(message)
2181         except LdbError as err:
2182             if self.strict_checking:
2183                 num, estr = err.args
2184                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2185                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2186         else:
2187             self.fail()
2188
2189         # The update succeeds when specifying the controls.
2190         self.ldb_user.modify(message, controls=controls)
2191
2192     def test_modify_dacl_owner_user(self):
2193         '''Modify the DACL of a user's security descriptor when we are its owner'''
2194
2195         ou_name = 'test_modify_ou1'
2196         ou_dn = f'OU={ou_name},{self.base_dn}'
2197
2198         username = 'test_modify_ou1_user'
2199         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2200
2201         sd_sddl = 'O:BA'
2202         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2203
2204         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2205         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2206         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2207
2208         self.ldb_admin.newuser(username, self.user_pass,
2209                                userou=f'OU={ou_name}',
2210                                sd=descriptor)
2211
2212         new_sddl = f'D:(A;;WP;;;{self.user_sid})'
2213         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2214
2215         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2216         dacl_controls = [f'sd_flags:1:{security.SECINFO_DACL}']
2217
2218         # Check our effective rights.
2219         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2220         expected_rights = 0
2221         self.assertEqual(expected_rights, effective_rights)
2222
2223         # The user should not be able to modify the DACL.
2224         message = Message(user_dn)
2225         message['nTSecurityDescriptor'] = MessageElement(
2226             ndr_pack(new_desc),
2227             FLAG_MOD_REPLACE,
2228             'nTSecurityDescriptor')
2229
2230         # The update fails since we are not the owner.
2231         try:
2232             self.ldb_user.modify(message, controls=dacl_controls)
2233         except LdbError as err:
2234             num, estr = err.args
2235             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2236             if self.strict_checking:
2237                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2238         else:
2239             self.fail()
2240
2241         # Make ourselves the owner of the security descriptor.
2242         owner_sddl = f'O:{self.user_sid}'
2243         owner_desc = security.descriptor.from_sddl(owner_sddl, self.domain_sid)
2244         self.sd_utils.modify_sd_on_dn(user_dn, owner_desc,
2245                                       controls=owner_controls)
2246
2247         # Check our effective rights.
2248         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2249         expected_rights = security.SECINFO_DACL
2250         self.assertEqual(expected_rights, effective_rights)
2251
2252         # The update fails if we don't specify the controls.
2253         try:
2254             self.ldb_user.modify(message)
2255         except LdbError as err:
2256             if self.strict_checking:
2257                 num, estr = err.args
2258                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2259                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2260         else:
2261             self.fail()
2262
2263         # The update succeeds when specifying the controls.
2264         self.ldb_user.modify(message, controls=dacl_controls)
2265
2266     def test_modify_dacl_owner_computer_implicit_right_blocked(self):
2267         '''Show that we cannot modify the DACL of a computer's security descriptor when we are its owner and BlockOwnerImplicitRights == 1'''
2268
2269         ou_name = 'test_modify_ou1'
2270         ou_dn = f'OU={ou_name},{self.base_dn}'
2271
2272         account_name = 'test_mod_hostname'
2273         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2274
2275         sd_sddl = 'O:BA'
2276         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2277
2278         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2279         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2280         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2281
2282         # Create the account.
2283         self.ldb_admin.add({
2284             'dn': dn,
2285             'objectClass': 'computer',
2286             'sAMAccountName': f'{account_name}$',
2287             'nTSecurityDescriptor': ndr_pack(descriptor),
2288         })
2289
2290         new_sddl = f'D:(A;;WP;;;{self.user_sid})'
2291         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2292
2293         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2294         dacl_controls = [f'sd_flags:1:{security.SECINFO_DACL}']
2295
2296         # Check our effective rights.
2297         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2298         expected_rights = None
2299         self.assertEqual(expected_rights, effective_rights)
2300
2301         # The user should not be able to modify the DACL.
2302         message = Message(dn)
2303         message['nTSecurityDescriptor'] = MessageElement(
2304             ndr_pack(new_desc),
2305             FLAG_MOD_REPLACE,
2306             'nTSecurityDescriptor')
2307
2308         # The update fails since we are not the owner.
2309         try:
2310             self.ldb_user.modify(message, controls=dacl_controls)
2311         except LdbError as err:
2312             num, estr = err.args
2313             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2314             if self.strict_checking:
2315                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2316         else:
2317             self.fail()
2318
2319         # Make ourselves the owner of the security descriptor.
2320         owner_sddl = f'O:{self.user_sid}'
2321         owner_desc = security.descriptor.from_sddl(owner_sddl, self.domain_sid)
2322         self.sd_utils.modify_sd_on_dn(dn, owner_desc,
2323                                       controls=owner_controls)
2324
2325         # Check our effective rights.
2326         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2327         expected_rights = None
2328         self.assertEqual(expected_rights, effective_rights)
2329
2330         # The update fails even when specifying the controls.
2331         try:
2332             self.ldb_user.modify(message, controls=dacl_controls)
2333         except LdbError as err:
2334             if self.strict_checking:
2335                 num, estr = err.args
2336                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2337                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2338         else:
2339             self.fail()
2340
2341     def test_modify_dacl_owner_computer_implicit_right_allowed(self):
2342         '''Modify the DACL of a computer's security descriptor when we are its owner and BlockOwnerImplicitRights != 1'''
2343
2344         self.set_heuristic(samba.dsdb.DS_HR_BLOCK_OWNER_IMPLICIT_RIGHTS, b'0')
2345
2346         ou_name = 'test_modify_ou1'
2347         ou_dn = f'OU={ou_name},{self.base_dn}'
2348
2349         account_name = 'test_mod_hostname'
2350         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2351
2352         sd_sddl = 'O:BA'
2353         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2354
2355         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2356         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2357         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2358
2359         # Create the account.
2360         self.ldb_admin.add({
2361             'dn': dn,
2362             'objectClass': 'computer',
2363             'sAMAccountName': f'{account_name}$',
2364             'nTSecurityDescriptor': ndr_pack(descriptor),
2365         })
2366
2367         new_sddl = f'D:(A;;WP;;;{self.user_sid})'
2368         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2369
2370         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2371         dacl_controls = [f'sd_flags:1:{security.SECINFO_DACL}']
2372
2373         # Check our effective rights.
2374         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2375         expected_rights = None
2376         self.assertEqual(expected_rights, effective_rights)
2377
2378         # The user should not be able to modify the DACL.
2379         message = Message(dn)
2380         message['nTSecurityDescriptor'] = MessageElement(
2381             ndr_pack(new_desc),
2382             FLAG_MOD_REPLACE,
2383             'nTSecurityDescriptor')
2384
2385         # The update fails since we are not the owner.
2386         try:
2387             self.ldb_user.modify(message, controls=dacl_controls)
2388         except LdbError as err:
2389             num, estr = err.args
2390             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2391             if self.strict_checking:
2392                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2393         else:
2394             self.fail()
2395
2396         # Make ourselves the owner of the security descriptor.
2397         owner_sddl = f'O:{self.user_sid}'
2398         owner_desc = security.descriptor.from_sddl(owner_sddl, self.domain_sid)
2399         self.sd_utils.modify_sd_on_dn(dn, owner_desc,
2400                                       controls=owner_controls)
2401
2402         # Check our effective rights.
2403         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2404         expected_rights = None
2405         self.assertEqual(expected_rights, effective_rights)
2406
2407         # The update fails if we don't specify the controls.
2408         try:
2409             self.ldb_user.modify(message)
2410         except LdbError as err:
2411             if self.strict_checking:
2412                 num, estr = err.args
2413                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2414                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2415         else:
2416             self.fail()
2417
2418         # The update succeeds when specifying the controls.
2419         self.ldb_user.modify(message, controls=dacl_controls)
2420
2421     def test_modify_owner_explicit_user(self):
2422         '''Modify the owner of a user's security descriptor when we have RIGHT_WRITE_OWNER'''
2423
2424         ou_name = 'test_modify_ou1'
2425         ou_dn = f'OU={ou_name},{self.base_dn}'
2426
2427         username = 'test_modify_ou1_user'
2428         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2429
2430         sd_sddl = 'O:BA'
2431         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2432
2433         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2434         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2435         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2436
2437         self.ldb_admin.newuser(username, self.user_pass,
2438                                userou=f'OU={ou_name}',
2439                                sd=descriptor)
2440
2441         # Try to modify the owner to ourselves.
2442         new_sddl = f'O:{self.user_sid}'
2443         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2444
2445         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2446
2447         # Check our effective rights.
2448         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2449         expected_rights = 0
2450         self.assertEqual(expected_rights, effective_rights)
2451
2452         # The user should not be able to modify the owner.
2453         message = Message(user_dn)
2454         message['nTSecurityDescriptor'] = MessageElement(
2455             ndr_pack(new_desc),
2456             FLAG_MOD_REPLACE,
2457             'nTSecurityDescriptor')
2458
2459         # The update fails since we don't have WRITE_OWNER.
2460         try:
2461             self.ldb_user.modify(message, controls=owner_controls)
2462         except LdbError as err:
2463             num, estr = err.args
2464             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2465             if self.strict_checking:
2466                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2467         else:
2468             self.fail()
2469
2470         # Grant ourselves WRITE_OWNER.
2471         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2472         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2473
2474         # Check our effective rights.
2475         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2476         expected_rights = security.SECINFO_OWNER | security.SECINFO_GROUP
2477         self.assertEqual(expected_rights, effective_rights)
2478
2479         # The update fails if we don't specify the controls.
2480         try:
2481             self.ldb_user.modify(message)
2482         except LdbError as err:
2483             num, estr = err.args
2484             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2485             if self.strict_checking:
2486                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2487         else:
2488             self.fail()
2489
2490         # The update succeeds when specifying the controls.
2491         self.ldb_user.modify(message, controls=owner_controls)
2492
2493     def test_modify_owner_explicit_computer(self):
2494         '''Modify the owner of a computer's security descriptor when we have RIGHT_WRITE_OWNER'''
2495
2496         ou_name = 'test_modify_ou1'
2497         ou_dn = f'OU={ou_name},{self.base_dn}'
2498
2499         account_name = 'test_mod_hostname'
2500         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2501
2502         sd_sddl = 'O:BA'
2503         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2504
2505         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2506         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2507         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2508
2509         # Create the account.
2510         self.ldb_admin.add({
2511             'dn': dn,
2512             'objectClass': 'computer',
2513             'sAMAccountName': f'{account_name}$',
2514             'nTSecurityDescriptor': ndr_pack(descriptor),
2515         })
2516
2517         # Try to modify the owner to ourselves.
2518         new_sddl = f'O:{self.user_sid}'
2519         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2520
2521         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2522
2523         # Check our effective rights.
2524         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2525         expected_rights = None
2526         self.assertEqual(expected_rights, effective_rights)
2527
2528         # The user should not be able to modify the owner.
2529         message = Message(dn)
2530         message['nTSecurityDescriptor'] = MessageElement(
2531             ndr_pack(new_desc),
2532             FLAG_MOD_REPLACE,
2533             'nTSecurityDescriptor')
2534
2535         # The update fails since we don't have WRITE_OWNER.
2536         try:
2537             self.ldb_user.modify(message, controls=owner_controls)
2538         except LdbError as err:
2539             num, estr = err.args
2540             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2541             if self.strict_checking:
2542                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2543         else:
2544             self.fail()
2545
2546         # Grant ourselves WRITE_OWNER.
2547         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2548         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2549
2550         # Check our effective rights.
2551         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2552         expected_rights = None
2553         self.assertEqual(expected_rights, effective_rights)
2554
2555         # The update fails if we don't specify the controls.
2556         try:
2557             self.ldb_user.modify(message)
2558         except LdbError as err:
2559             num, estr = err.args
2560             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2561             if self.strict_checking:
2562                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2563         else:
2564             self.fail()
2565
2566         # The update succeeds when specifying the controls.
2567         self.ldb_user.modify(message, controls=owner_controls)
2568
2569     def test_modify_group_explicit_user(self):
2570         '''Modify the group of a user's security descriptor when we have RIGHT_WRITE_OWNER'''
2571
2572         ou_name = 'test_modify_ou1'
2573         ou_dn = f'OU={ou_name},{self.base_dn}'
2574
2575         username = 'test_modify_ou1_user'
2576         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2577
2578         sd_sddl = 'O:BA'
2579         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2580
2581         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2582         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2583         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2584
2585         self.ldb_admin.newuser(username, self.user_pass,
2586                                userou=f'OU={ou_name}',
2587                                sd=descriptor)
2588
2589         # Try to modify the group to ourselves.
2590         new_sddl = f'G:{self.user_sid}'
2591         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2592
2593         group_controls = [f'sd_flags:1:{security.SECINFO_GROUP}']
2594
2595         # Check our effective rights.
2596         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2597         expected_rights = 0
2598         self.assertEqual(expected_rights, effective_rights)
2599
2600         # The user should not be able to modify the group.
2601         message = Message(user_dn)
2602         message['nTSecurityDescriptor'] = MessageElement(
2603             ndr_pack(new_desc),
2604             FLAG_MOD_REPLACE,
2605             'nTSecurityDescriptor')
2606
2607         # The update fails since we don't have WRITE_OWNER.
2608         try:
2609             self.ldb_user.modify(message, controls=group_controls)
2610         except LdbError as err:
2611             num, estr = err.args
2612             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2613             if self.strict_checking:
2614                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2615         else:
2616             self.fail()
2617
2618         # Grant ourselves WRITE_OWNER.
2619         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2620         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2621
2622         # Check our effective rights.
2623         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2624         expected_rights = security.SECINFO_OWNER | security.SECINFO_GROUP
2625         self.assertEqual(expected_rights, effective_rights)
2626
2627         # The update fails if we don't specify the controls.
2628         try:
2629             self.ldb_user.modify(message)
2630         except LdbError as err:
2631             if self.strict_checking:
2632                 num, estr = err.args
2633                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2634                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2635         else:
2636             self.fail()
2637
2638         # The update succeeds when specifying the controls.
2639         self.ldb_user.modify(message, controls=group_controls)
2640
2641     def test_modify_group_explicit_computer(self):
2642         '''Modify the group of a computer's security descriptor when we have RIGHT_WRITE_OWNER'''
2643
2644         ou_name = 'test_modify_ou1'
2645         ou_dn = f'OU={ou_name},{self.base_dn}'
2646
2647         account_name = 'test_mod_hostname'
2648         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2649
2650         sd_sddl = 'O:BA'
2651         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2652
2653         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2654         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2655         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2656
2657         # Create the account.
2658         self.ldb_admin.add({
2659             'dn': dn,
2660             'objectClass': 'computer',
2661             'sAMAccountName': f'{account_name}$',
2662             'nTSecurityDescriptor': ndr_pack(descriptor),
2663         })
2664
2665         # Try to modify the group to ourselves.
2666         new_sddl = f'G:{self.user_sid}'
2667         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2668
2669         group_controls = [f'sd_flags:1:{security.SECINFO_GROUP}']
2670
2671         # Check our effective rights.
2672         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2673         expected_rights = None
2674         self.assertEqual(expected_rights, effective_rights)
2675
2676         # The user should not be able to modify the group.
2677         message = Message(dn)
2678         message['nTSecurityDescriptor'] = MessageElement(
2679             ndr_pack(new_desc),
2680             FLAG_MOD_REPLACE,
2681             'nTSecurityDescriptor')
2682
2683         # The update fails since we don't have WRITE_OWNER.
2684         try:
2685             self.ldb_user.modify(message, controls=group_controls)
2686         except LdbError as err:
2687             num, estr = err.args
2688             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2689             if self.strict_checking:
2690                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2691         else:
2692             self.fail()
2693
2694         # Grant ourselves WRITE_OWNER.
2695         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2696         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2697
2698         # Check our effective rights.
2699         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2700         expected_rights = None
2701         self.assertEqual(expected_rights, effective_rights)
2702
2703         # The update fails if we don't specify the controls.
2704         try:
2705             self.ldb_user.modify(message)
2706         except LdbError as err:
2707             if self.strict_checking:
2708                 num, estr = err.args
2709                 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
2710                 self.assertIn(f'{werror.WERR_ACCESS_DENIED:08X}', estr)
2711         else:
2712             self.fail()
2713
2714         # The update succeeds when specifying the controls.
2715         self.ldb_user.modify(message, controls=group_controls)
2716
2717     def test_modify_owner_other_user(self):
2718         '''Show we cannot set the owner of a user's security descriptor to another SID'''
2719
2720         ou_name = 'test_modify_ou1'
2721         ou_dn = f'OU={ou_name},{self.base_dn}'
2722
2723         username = 'test_modify_ou1_user'
2724         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2725
2726         sd_sddl = 'O:BA'
2727         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2728
2729         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2730         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2731         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2732
2733         self.ldb_admin.newuser(username, self.user_pass,
2734                                userou=f'OU={ou_name}',
2735                                sd=descriptor)
2736
2737         # Try to modify the owner to someone other than ourselves.
2738         new_sddl = f'O:BA'
2739         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2740
2741         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2742
2743         # Check our effective rights.
2744         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2745         expected_rights = 0
2746         self.assertEqual(expected_rights, effective_rights)
2747
2748         # The user should not be able to modify the owner.
2749         message = Message(user_dn)
2750         message['nTSecurityDescriptor'] = MessageElement(
2751             ndr_pack(new_desc),
2752             FLAG_MOD_REPLACE,
2753             'nTSecurityDescriptor')
2754
2755         # Grant ourselves WRITE_OWNER.
2756         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2757         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2758
2759         # Check our effective rights.
2760         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2761         expected_rights = security.SECINFO_OWNER | security.SECINFO_GROUP
2762         self.assertEqual(expected_rights, effective_rights)
2763
2764         # The update fails when trying to specify another user.
2765         try:
2766             self.ldb_user.modify(message, controls=owner_controls)
2767         except LdbError as err:
2768             num, estr = err.args
2769             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
2770             if self.strict_checking:
2771                 self.assertIn(f'{werror.WERR_INVALID_OWNER:08X}', estr)
2772         else:
2773             self.fail('expected an error')
2774
2775     def test_modify_owner_other_computer(self):
2776         '''Show we cannot set the owner of a computer's security descriptor to another SID'''
2777
2778         ou_name = 'test_modify_ou1'
2779         ou_dn = f'OU={ou_name},{self.base_dn}'
2780
2781         account_name = 'test_mod_hostname'
2782         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2783
2784         sd_sddl = 'O:BA'
2785         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2786
2787         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2788         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2789         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2790
2791         # Create the account.
2792         self.ldb_admin.add({
2793             'dn': dn,
2794             'objectClass': 'computer',
2795             'sAMAccountName': f'{account_name}$',
2796             'nTSecurityDescriptor': ndr_pack(descriptor),
2797         })
2798
2799         # Try to modify the owner to someone other than ourselves.
2800         new_sddl = f'O:BA'
2801         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2802
2803         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2804
2805         # Check our effective rights.
2806         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2807         expected_rights = None
2808         self.assertEqual(expected_rights, effective_rights)
2809
2810         # The user should not be able to modify the owner.
2811         message = Message(dn)
2812         message['nTSecurityDescriptor'] = MessageElement(
2813             ndr_pack(new_desc),
2814             FLAG_MOD_REPLACE,
2815             'nTSecurityDescriptor')
2816
2817         # Grant ourselves WRITE_OWNER.
2818         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2819         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2820
2821         # Check our effective rights.
2822         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2823         expected_rights = None
2824         self.assertEqual(expected_rights, effective_rights)
2825
2826         # The update fails when trying to specify another user.
2827         try:
2828             self.ldb_user.modify(message, controls=owner_controls)
2829         except LdbError as err:
2830             num, estr = err.args
2831             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
2832             if self.strict_checking:
2833                 self.assertIn(f'{werror.WERR_INVALID_OWNER:08X}', estr)
2834         else:
2835             self.fail('expected an error')
2836
2837     def test_modify_owner_other_admin_user(self):
2838         '''Show a domain admin cannot set the owner of a user's security descriptor to another SID'''
2839
2840         ou_name = 'test_modify_ou1'
2841         ou_dn = f'OU={ou_name},{self.base_dn}'
2842
2843         username = 'test_modify_ou1_user'
2844         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2845
2846         sd_sddl = 'O:BA'
2847         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2848
2849         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2850         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2851         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2852
2853         self.ldb_admin.newuser(username, self.user_pass,
2854                                userou=f'OU={ou_name}',
2855                                sd=descriptor)
2856
2857         # Try to modify the owner to someone other than ourselves.
2858         new_sddl = f'O:BA'
2859         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2860
2861         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2862
2863         # Check our effective rights.
2864         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2865         expected_rights = 0
2866         self.assertEqual(expected_rights, effective_rights)
2867
2868         # The user should not be able to modify the owner.
2869         message = Message(user_dn)
2870         message['nTSecurityDescriptor'] = MessageElement(
2871             ndr_pack(new_desc),
2872             FLAG_MOD_REPLACE,
2873             'nTSecurityDescriptor')
2874
2875         # Grant ourselves WRITE_OWNER.
2876         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2877         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2878
2879         # Check our effective rights.
2880         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2881         expected_rights = security.SECINFO_OWNER | security.SECINFO_GROUP
2882         self.assertEqual(expected_rights, effective_rights)
2883
2884         # The update succeeds as admin when trying to specify another user.
2885         self.ldb_admin.modify(message, controls=owner_controls)
2886
2887     def test_modify_owner_other_admin_computer(self):
2888         '''Show a domain admin cannot set the owner of a computer's security descriptor to another SID'''
2889
2890         ou_name = 'test_modify_ou1'
2891         ou_dn = f'OU={ou_name},{self.base_dn}'
2892
2893         account_name = 'test_mod_hostname'
2894         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2895
2896         sd_sddl = 'O:BA'
2897         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2898
2899         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2900         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2901         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2902
2903         # Create the account.
2904         self.ldb_admin.add({
2905             'dn': dn,
2906             'objectClass': 'computer',
2907             'sAMAccountName': f'{account_name}$',
2908             'nTSecurityDescriptor': ndr_pack(descriptor),
2909         })
2910
2911         # Try to modify the owner to someone other than ourselves.
2912         new_sddl = f'O:BA'
2913         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2914
2915         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2916
2917         # Check our effective rights.
2918         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2919         expected_rights = None
2920         self.assertEqual(expected_rights, effective_rights)
2921
2922         # The user should not be able to modify the owner.
2923         message = Message(dn)
2924         message['nTSecurityDescriptor'] = MessageElement(
2925             ndr_pack(new_desc),
2926             FLAG_MOD_REPLACE,
2927             'nTSecurityDescriptor')
2928
2929         # Grant ourselves WRITE_OWNER.
2930         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2931         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2932
2933         # Check our effective rights.
2934         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
2935         expected_rights = None
2936         self.assertEqual(expected_rights, effective_rights)
2937
2938         # The update succeeds as admin when trying to specify another user.
2939         self.ldb_admin.modify(message, controls=owner_controls)
2940
2941     def test_modify_owner_admin_user(self):
2942         '''Show a domain admin can set the owner of a user's security descriptor to Domain Admins'''
2943
2944         ou_name = 'test_modify_ou1'
2945         ou_dn = f'OU={ou_name},{self.base_dn}'
2946
2947         username = 'test_modify_ou1_user'
2948         user_dn = Dn(self.ldb_admin, f'CN={username},{ou_dn}')
2949
2950         sd_sddl = 'O:BA'
2951         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
2952
2953         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
2954         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
2955         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
2956
2957         self.ldb_admin.newuser(username, self.user_pass,
2958                                userou=f'OU={ou_name}',
2959                                sd=descriptor)
2960
2961         # Try to modify the owner to Domain Admins.
2962         new_sddl = f'O:DA'
2963         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
2964
2965         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
2966
2967         # Check our effective rights.
2968         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2969         expected_rights = 0
2970         self.assertEqual(expected_rights, effective_rights)
2971
2972         # The user should not be able to modify the owner.
2973         message = Message(user_dn)
2974         message['nTSecurityDescriptor'] = MessageElement(
2975             ndr_pack(new_desc),
2976             FLAG_MOD_REPLACE,
2977             'nTSecurityDescriptor')
2978
2979         # Grant ourselves WRITE_OWNER.
2980         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
2981         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
2982
2983         # Check our effective rights.
2984         effective_rights = self.get_sd_rights_effective(self.ldb_user, user_dn)
2985         expected_rights = security.SECINFO_OWNER | security.SECINFO_GROUP
2986         self.assertEqual(expected_rights, effective_rights)
2987
2988         # The update succeeds as admin when specifying Domain Admins.
2989         self.ldb_admin.modify(message, controls=owner_controls)
2990
2991     def test_modify_owner_admin_computer(self):
2992         '''Show a domain admin can set the owner of a computer's security descriptor to Domain Admins'''
2993
2994         ou_name = 'test_modify_ou1'
2995         ou_dn = f'OU={ou_name},{self.base_dn}'
2996
2997         account_name = 'test_mod_hostname'
2998         dn = Dn(self.ldb_admin, f'CN={account_name},{ou_dn}')
2999
3000         sd_sddl = 'O:BA'
3001         descriptor = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
3002
3003         ou_sddl = f'D:(OA;CI;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_NT_SECURITY_DESCRIPTOR};;{self.user_sid})'
3004         ou_desc = security.descriptor.from_sddl(ou_sddl, self.domain_sid)
3005         self.ldb_admin.create_ou(ou_dn, name=ou_name, sd=ou_desc)
3006
3007         # Create the account.
3008         self.ldb_admin.add({
3009             'dn': dn,
3010             'objectClass': 'computer',
3011             'sAMAccountName': f'{account_name}$',
3012             'nTSecurityDescriptor': ndr_pack(descriptor),
3013         })
3014
3015         # Try to modify the owner to Domain Admins.
3016         new_sddl = f'O:DA'
3017         new_desc = security.descriptor.from_sddl(new_sddl, self.domain_sid)
3018
3019         owner_controls = [f'sd_flags:1:{security.SECINFO_OWNER}']
3020
3021         # Check our effective rights.
3022         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
3023         expected_rights = None
3024         self.assertEqual(expected_rights, effective_rights)
3025
3026         # The user should not be able to modify the owner.
3027         message = Message(dn)
3028         message['nTSecurityDescriptor'] = MessageElement(
3029             ndr_pack(new_desc),
3030             FLAG_MOD_REPLACE,
3031             'nTSecurityDescriptor')
3032
3033         # Grant ourselves WRITE_OWNER.
3034         owner_sddl = f'(A;CI;WO;;;{self.user_sid})'
3035         self.sd_utils.dacl_add_ace(ou_dn, owner_sddl)
3036
3037         # Check our effective rights.
3038         effective_rights = self.get_sd_rights_effective(self.ldb_user, dn)
3039         expected_rights = None
3040         self.assertEqual(expected_rights, effective_rights)
3041
3042         # The update succeeds as admin when specifying Domain Admins.
3043         self.ldb_admin.modify(message, controls=owner_controls)
3044
3045     def test_modify_anonymous(self):
3046         """Test add operation with anonymous user"""
3047         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
3048         self.ldb_admin.newuser("test_anonymous", "samba123@")
3049         m = Message()
3050         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
3051
3052         m["description"] = MessageElement("sambauser2",
3053                                           FLAG_MOD_ADD,
3054                                           "description")
3055         try:
3056             anonymous.modify(m)
3057         except LdbError as e14:
3058             (num, _) = e14.args
3059             self.assertEqual(num, ERR_OPERATIONS_ERROR)
3060         else:
3061             self.fail()
3062
3063     def test_modify_dns_host_name(self):
3064         '''Test modifying dNSHostName with validated write'''
3065
3066         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3067
3068         account_name = 'test_mod_hostname'
3069         dn = f'CN={account_name},{ou_dn}'
3070
3071         self.ldb_admin.create_ou(ou_dn)
3072
3073         # Grant Validated Write.
3074         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3075                f'{self.user_sid})')
3076         self.sd_utils.dacl_add_ace(ou_dn, mod)
3077
3078         # Create the account.
3079         self.ldb_admin.add({
3080             'dn': dn,
3081             'objectClass': 'computer',
3082             'sAMAccountName': f'{account_name}$',
3083         })
3084
3085         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3086
3087         m = Message(Dn(self.ldb_user, dn))
3088         m['dNSHostName'] = MessageElement(host_name,
3089                                           FLAG_MOD_REPLACE,
3090                                           'dNSHostName')
3091         try:
3092             self.ldb_user.modify(m)
3093         except LdbError:
3094             self.fail()
3095
3096     def test_modify_dns_host_name_no_validated_write(self):
3097         '''Test modifying dNSHostName without validated write'''
3098
3099         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3100
3101         account_name = 'test_mod_hostname'
3102         dn = f'CN={account_name},{ou_dn}'
3103
3104         self.ldb_admin.create_ou(ou_dn)
3105
3106         # Create the account.
3107         self.ldb_admin.add({
3108             'dn': dn,
3109             'objectClass': 'computer',
3110             'sAMAccountName': f'{account_name}$',
3111         })
3112
3113         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3114
3115         m = Message(Dn(self.ldb_user, dn))
3116         m['dNSHostName'] = MessageElement(host_name,
3117                                           FLAG_MOD_REPLACE,
3118                                           'dNSHostName')
3119         try:
3120             self.ldb_user.modify(m)
3121         except LdbError as err:
3122             num, estr = err.args
3123             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
3124         else:
3125             self.fail()
3126
3127     def test_modify_dns_host_name_invalid(self):
3128         '''Test modifying dNSHostName to an invalid value'''
3129
3130         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3131
3132         account_name = 'test_mod_hostname'
3133         dn = f'CN={account_name},{ou_dn}'
3134
3135         self.ldb_admin.create_ou(ou_dn)
3136
3137         # Grant Validated Write.
3138         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3139                f'{self.user_sid})')
3140         self.sd_utils.dacl_add_ace(ou_dn, mod)
3141
3142         # Create the account.
3143         self.ldb_admin.add({
3144             'dn': dn,
3145             'objectClass': 'computer',
3146             'sAMAccountName': f'{account_name}$',
3147         })
3148
3149         host_name = 'invalid'
3150
3151         m = Message(Dn(self.ldb_user, dn))
3152         m['dNSHostName'] = MessageElement(host_name,
3153                                           FLAG_MOD_REPLACE,
3154                                           'dNSHostName')
3155         try:
3156             self.ldb_user.modify(m)
3157         except LdbError as err:
3158             num, estr = err.args
3159             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3160         else:
3161             self.fail()
3162
3163     def test_modify_dns_host_name_invalid_wp(self):
3164         '''Test modifying dNSHostName to an invalid value when we have WP'''
3165
3166         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3167
3168         account_name = 'test_mod_hostname'
3169         dn = f'CN={account_name},{ou_dn}'
3170
3171         self.ldb_admin.create_ou(ou_dn)
3172
3173         # Grant Write Property.
3174         mod = (f'(OA;CI;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
3175                f'{self.user_sid})')
3176         self.sd_utils.dacl_add_ace(ou_dn, mod)
3177
3178         # Create the account.
3179         self.ldb_admin.add({
3180             'dn': dn,
3181             'objectClass': 'computer',
3182             'sAMAccountName': f'{account_name}$',
3183         })
3184
3185         host_name = 'invalid'
3186
3187         m = Message(Dn(self.ldb_user, dn))
3188         m['dNSHostName'] = MessageElement(host_name,
3189                                           FLAG_MOD_REPLACE,
3190                                           'dNSHostName')
3191         try:
3192             self.ldb_user.modify(m)
3193         except LdbError:
3194             self.fail()
3195
3196     def test_modify_dns_host_name_invalid_non_computer(self):
3197         '''Test modifying dNSHostName to an invalid value on a non-computer'''
3198
3199         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3200
3201         account_name = 'test_mod_hostname'
3202         dn = f'CN={account_name},{ou_dn}'
3203
3204         self.ldb_admin.create_ou(ou_dn)
3205
3206         # Grant Validated Write.
3207         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3208                f'{self.user_sid})')
3209         self.sd_utils.dacl_add_ace(ou_dn, mod)
3210
3211         # Create the account.
3212         self.ldb_admin.add({
3213             'dn': dn,
3214             'objectClass': 'user',
3215             'sAMAccountName': f'{account_name}',
3216         })
3217
3218         host_name = 'invalid'
3219
3220         m = Message(Dn(self.ldb_user, dn))
3221         m['dNSHostName'] = MessageElement(host_name,
3222                                           FLAG_MOD_REPLACE,
3223                                           'dNSHostName')
3224         try:
3225             self.ldb_user.modify(m)
3226         except LdbError as err:
3227             num, estr = err.args
3228             self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, num)
3229         else:
3230             self.fail()
3231
3232     def test_modify_dns_host_name_no_value(self):
3233         '''Test modifying dNSHostName with validated write with no value'''
3234
3235         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3236
3237         account_name = 'test_mod_hostname'
3238         dn = f'CN={account_name},{ou_dn}'
3239
3240         self.ldb_admin.create_ou(ou_dn)
3241
3242         # Grant Validated Write.
3243         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3244                f'{self.user_sid})')
3245         self.sd_utils.dacl_add_ace(ou_dn, mod)
3246
3247         # Create the account.
3248         self.ldb_admin.add({
3249             'dn': dn,
3250             'objectClass': 'computer',
3251             'sAMAccountName': f'{account_name}$',
3252         })
3253
3254         m = Message(Dn(self.ldb_user, dn))
3255         m['dNSHostName'] = MessageElement([],
3256                                           FLAG_MOD_REPLACE,
3257                                           'dNSHostName')
3258         try:
3259             self.ldb_user.modify(m)
3260         except LdbError as err:
3261             num, estr = err.args
3262             self.assertEqual(ERR_OPERATIONS_ERROR, num)
3263         else:
3264             # Windows accepts this.
3265             pass
3266
3267     def test_modify_dns_host_name_empty_string(self):
3268         '''Test modifying dNSHostName with validated write of an empty string'''
3269
3270         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3271
3272         account_name = 'test_mod_hostname'
3273         dn = f'CN={account_name},{ou_dn}'
3274
3275         self.ldb_admin.create_ou(ou_dn)
3276
3277         # Grant Validated Write.
3278         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3279                f'{self.user_sid})')
3280         self.sd_utils.dacl_add_ace(ou_dn, mod)
3281
3282         # Create the account.
3283         self.ldb_admin.add({
3284             'dn': dn,
3285             'objectClass': 'computer',
3286             'sAMAccountName': f'{account_name}$',
3287         })
3288
3289         m = Message(Dn(self.ldb_user, dn))
3290         m['dNSHostName'] = MessageElement('\0',
3291                                           FLAG_MOD_REPLACE,
3292                                           'dNSHostName')
3293         try:
3294             self.ldb_user.modify(m)
3295         except LdbError as err:
3296             num, estr = err.args
3297             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3298         else:
3299             self.fail()
3300
3301     def test_modify_dns_host_name_dollar(self):
3302         '''Test modifying dNSHostName with validated write of a value including a dollar'''
3303
3304         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3305
3306         account_name = 'test_mod_hostname'
3307         dn = f'CN={account_name},{ou_dn}'
3308
3309         self.ldb_admin.create_ou(ou_dn)
3310
3311         # Grant Validated Write.
3312         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3313                f'{self.user_sid})')
3314         self.sd_utils.dacl_add_ace(ou_dn, mod)
3315
3316         # Create the account.
3317         self.ldb_admin.add({
3318             'dn': dn,
3319             'objectClass': 'computer',
3320             'sAMAccountName': f'{account_name}$',
3321         })
3322
3323         host_name = f'{account_name}$.{self.ldb_user.domain_dns_name()}'
3324
3325         m = Message(Dn(self.ldb_user, dn))
3326         m['dNSHostName'] = MessageElement(host_name,
3327                                           FLAG_MOD_REPLACE,
3328                                           'dNSHostName')
3329         try:
3330             self.ldb_user.modify(m)
3331         except LdbError as err:
3332             num, estr = err.args
3333             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3334         else:
3335             self.fail()
3336
3337     def test_modify_dns_host_name_account_no_dollar(self):
3338         '''Test modifying dNSHostName with validated write with no dollar in sAMAccountName'''
3339
3340         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3341
3342         account_name = 'test_mod_hostname'
3343         dn = f'CN={account_name},{ou_dn}'
3344
3345         self.ldb_admin.create_ou(ou_dn)
3346
3347         # Grant Validated Write.
3348         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3349                f'{self.user_sid})')
3350         self.sd_utils.dacl_add_ace(ou_dn, mod)
3351
3352         # Create the account.
3353         self.ldb_admin.add({
3354             'dn': dn,
3355             'objectClass': 'computer',
3356             'sAMAccountName': f'{account_name}',
3357         })
3358
3359         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3360
3361         m = Message(Dn(self.ldb_user, dn))
3362         m['dNSHostName'] = MessageElement(host_name,
3363                                           FLAG_MOD_REPLACE,
3364                                           'dNSHostName')
3365         try:
3366             self.ldb_user.modify(m)
3367         except LdbError:
3368             self.fail()
3369
3370     def test_modify_dns_host_name_no_suffix(self):
3371         '''Test modifying dNSHostName with validated write of a value missing the suffix'''
3372
3373         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3374
3375         account_name = 'test_mod_hostname'
3376         dn = f'CN={account_name},{ou_dn}'
3377
3378         self.ldb_admin.create_ou(ou_dn)
3379
3380         # Grant Validated Write.
3381         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3382                f'{self.user_sid})')
3383         self.sd_utils.dacl_add_ace(ou_dn, mod)
3384
3385         # Create the account.
3386         self.ldb_admin.add({
3387             'dn': dn,
3388             'objectClass': 'computer',
3389             'sAMAccountName': f'{account_name}$',
3390         })
3391
3392         host_name = f'{account_name}'
3393
3394         m = Message(Dn(self.ldb_user, dn))
3395         m['dNSHostName'] = MessageElement(host_name,
3396                                           FLAG_MOD_REPLACE,
3397                                           'dNSHostName')
3398         try:
3399             self.ldb_user.modify(m)
3400         except LdbError as err:
3401             num, estr = err.args
3402             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3403         else:
3404             self.fail()
3405
3406     def test_modify_dns_host_name_wrong_prefix(self):
3407         '''Test modifying dNSHostName with validated write of a value with the wrong prefix'''
3408
3409         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3410
3411         account_name = 'test_mod_hostname'
3412         dn = f'CN={account_name},{ou_dn}'
3413
3414         self.ldb_admin.create_ou(ou_dn)
3415
3416         # Grant Validated Write.
3417         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3418                f'{self.user_sid})')
3419         self.sd_utils.dacl_add_ace(ou_dn, mod)
3420
3421         # Create the account.
3422         self.ldb_admin.add({
3423             'dn': dn,
3424             'objectClass': 'computer',
3425             'sAMAccountName': f'{account_name}$',
3426         })
3427
3428         host_name = f'invalid.{self.ldb_user.domain_dns_name()}'
3429
3430         m = Message(Dn(self.ldb_user, dn))
3431         m['dNSHostName'] = MessageElement(host_name,
3432                                           FLAG_MOD_REPLACE,
3433                                           'dNSHostName')
3434         try:
3435             self.ldb_user.modify(m)
3436         except LdbError as err:
3437             num, estr = err.args
3438             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3439         else:
3440             self.fail()
3441
3442     def test_modify_dns_host_name_wrong_suffix(self):
3443         '''Test modifying dNSHostName with validated write of a value with the wrong suffix'''
3444
3445         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3446
3447         account_name = 'test_mod_hostname'
3448         dn = f'CN={account_name},{ou_dn}'
3449
3450         self.ldb_admin.create_ou(ou_dn)
3451
3452         # Grant Validated Write.
3453         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3454                f'{self.user_sid})')
3455         self.sd_utils.dacl_add_ace(ou_dn, mod)
3456
3457         # Create the account.
3458         self.ldb_admin.add({
3459             'dn': dn,
3460             'objectClass': 'computer',
3461             'sAMAccountName': f'{account_name}$',
3462         })
3463
3464         host_name = f'{account_name}.invalid.example.com'
3465
3466         m = Message(Dn(self.ldb_user, dn))
3467         m['dNSHostName'] = MessageElement(host_name,
3468                                           FLAG_MOD_REPLACE,
3469                                           'dNSHostName')
3470         try:
3471             self.ldb_user.modify(m)
3472         except LdbError as err:
3473             num, estr = err.args
3474             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3475         else:
3476             self.fail()
3477
3478     def test_modify_dns_host_name_case(self):
3479         '''Test modifying dNSHostName with validated write of a value with irregular case'''
3480
3481         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3482
3483         account_name = 'test_mod_hostname'
3484         dn = f'CN={account_name},{ou_dn}'
3485
3486         self.ldb_admin.create_ou(ou_dn)
3487
3488         # Grant Validated Write.
3489         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3490                f'{self.user_sid})')
3491         self.sd_utils.dacl_add_ace(ou_dn, mod)
3492
3493         # Create the account.
3494         self.ldb_admin.add({
3495             'dn': dn,
3496             'objectClass': 'computer',
3497             'sAMAccountName': f'{account_name}$',
3498         })
3499
3500         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3501         host_name = host_name.capitalize()
3502
3503         m = Message(Dn(self.ldb_user, dn))
3504         m['dNSHostName'] = MessageElement(host_name,
3505                                           FLAG_MOD_REPLACE,
3506                                           'dNSHostName')
3507         try:
3508             self.ldb_user.modify(m)
3509         except LdbError:
3510             self.fail()
3511
3512     def test_modify_dns_host_name_allowed_suffixes(self):
3513         '''Test modifying dNSHostName with validated write and an allowed suffix'''
3514
3515         allowed_suffix = 'suffix.that.is.allowed'
3516
3517         # Add the allowed suffix.
3518
3519         res = self.ldb_admin.search(self.base_dn,
3520                                     scope=SCOPE_BASE,
3521                                     attrs=['msDS-AllowedDNSSuffixes'])
3522         self.assertEqual(1, len(res))
3523         old_allowed_suffixes = res[0].get('msDS-AllowedDNSSuffixes')
3524
3525         def modify_allowed_suffixes(suffixes):
3526             if suffixes is None:
3527                 suffixes = []
3528                 flag = FLAG_MOD_DELETE
3529             else:
3530                 flag = FLAG_MOD_REPLACE
3531
3532             m = Message(Dn(self.ldb_admin, self.base_dn))
3533             m['msDS-AllowedDNSSuffixes'] = MessageElement(
3534                 suffixes,
3535                 flag,
3536                 'msDS-AllowedDNSSuffixes')
3537             self.ldb_admin.modify(m)
3538
3539         self.addCleanup(modify_allowed_suffixes, old_allowed_suffixes)
3540
3541         if old_allowed_suffixes is None:
3542             allowed_suffixes = []
3543         else:
3544             allowed_suffixes = list(old_allowed_suffixes)
3545
3546         if (allowed_suffix not in allowed_suffixes and
3547             allowed_suffix.encode('utf-8') not in allowed_suffixes):
3548                 allowed_suffixes.append(allowed_suffix)
3549
3550         modify_allowed_suffixes(allowed_suffixes)
3551
3552         # Create the account and run the test.
3553
3554         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3555
3556         account_name = 'test_mod_hostname'
3557         dn = f'CN={account_name},{ou_dn}'
3558
3559         self.ldb_admin.create_ou(ou_dn)
3560
3561         # Grant Validated Write.
3562         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3563                f'{self.user_sid})')
3564         self.sd_utils.dacl_add_ace(ou_dn, mod)
3565
3566         # Create the account.
3567         self.ldb_admin.add({
3568             'dn': dn,
3569             'objectClass': 'computer',
3570             'sAMAccountName': f'{account_name}$',
3571         })
3572
3573         host_name = f'{account_name}.{allowed_suffix}'
3574
3575         m = Message(Dn(self.ldb_user, dn))
3576         m['dNSHostName'] = MessageElement(host_name,
3577                                           FLAG_MOD_REPLACE,
3578                                           'dNSHostName')
3579         try:
3580             self.ldb_user.modify(m)
3581         except LdbError:
3582             self.fail()
3583
3584     def test_modify_dns_host_name_spn(self):
3585         '''Test modifying dNSHostName and SPN with validated write'''
3586
3587         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3588
3589         account_name = 'test_mod_hostname'
3590         dn = f'CN={account_name},{ou_dn}'
3591
3592         self.ldb_admin.create_ou(ou_dn)
3593
3594         # Grant Validated Write.
3595         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3596                f'{self.user_sid})')
3597         self.sd_utils.dacl_add_ace(ou_dn, mod)
3598         mod = (f'(OA;CI;SW;{security.GUID_DRS_VALIDATE_SPN};;'
3599                f'{self.user_sid})')
3600         self.sd_utils.dacl_add_ace(ou_dn, mod)
3601
3602         # Create the account.
3603         self.ldb_admin.add({
3604             'dn': dn,
3605             'objectClass': 'computer',
3606             'sAMAccountName': f'{account_name}$',
3607         })
3608
3609         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3610         spn = f'host/{host_name}'
3611
3612         m = Message(Dn(self.ldb_user, dn))
3613         m['0'] = MessageElement(host_name,
3614                                 FLAG_MOD_REPLACE,
3615                                 'dNSHostName')
3616         m['1'] = MessageElement(spn,
3617                                 FLAG_MOD_ADD,
3618                                 'servicePrincipalName')
3619         try:
3620             self.ldb_user.modify(m)
3621         except LdbError:
3622             self.fail()
3623
3624     def test_modify_spn_matching_dns_host_name_invalid(self):
3625         '''Test modifying SPN with validated write, matching a valid dNSHostName '''
3626
3627         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3628
3629         account_name = 'test_mod_hostname'
3630         dn = f'CN={account_name},{ou_dn}'
3631
3632         self.ldb_admin.create_ou(ou_dn)
3633
3634         # Grant Write Property.
3635         mod = (f'(OA;CI;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
3636                f'{self.user_sid})')
3637         self.sd_utils.dacl_add_ace(ou_dn, mod)
3638         # Grant Validated Write.
3639         mod = (f'(OA;CI;SW;{security.GUID_DRS_VALIDATE_SPN};;'
3640                f'{self.user_sid})')
3641         self.sd_utils.dacl_add_ace(ou_dn, mod)
3642
3643         # Create the account.
3644         self.ldb_admin.add({
3645             'dn': dn,
3646             'objectClass': 'computer',
3647             'sAMAccountName': f'{account_name}$',
3648         })
3649
3650         invalid_host_name = 'invalid'
3651
3652         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3653         spn = f'host/{host_name}'
3654
3655         m = Message(Dn(self.ldb_user, dn))
3656         m['0'] = MessageElement(invalid_host_name,
3657                                 FLAG_MOD_REPLACE,
3658                                 'dNSHostName')
3659         m['1'] = MessageElement(spn,
3660                                 FLAG_MOD_ADD,
3661                                 'servicePrincipalName')
3662         m['2'] = MessageElement(host_name,
3663                                 FLAG_MOD_REPLACE,
3664                                 'dNSHostName')
3665         try:
3666             self.ldb_user.modify(m)
3667         except LdbError:
3668             self.fail()
3669
3670     def test_modify_spn_matching_dns_host_name_original(self):
3671         '''Test modifying SPN with validated write, matching the original dNSHostName '''
3672
3673         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3674
3675         account_name = 'test_mod_hostname'
3676         dn = f'CN={account_name},{ou_dn}'
3677
3678         self.ldb_admin.create_ou(ou_dn)
3679
3680         # Grant Validated Write.
3681         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3682                f'{self.user_sid})')
3683         self.sd_utils.dacl_add_ace(ou_dn, mod)
3684         mod = (f'(OA;CI;SW;{security.GUID_DRS_VALIDATE_SPN};;'
3685                f'{self.user_sid})')
3686         self.sd_utils.dacl_add_ace(ou_dn, mod)
3687
3688         original_host_name = 'invalid_host_name'
3689         original_spn = 'host/{original_host_name}'
3690
3691         # Create the account.
3692         self.ldb_admin.add({
3693             'dn': dn,
3694             'objectClass': 'computer',
3695             'sAMAccountName': f'{account_name}$',
3696             'dNSHostName': original_host_name,
3697         })
3698
3699         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3700
3701         m = Message(Dn(self.ldb_user, dn))
3702         m['0'] = MessageElement(original_spn,
3703                                 FLAG_MOD_ADD,
3704                                 'servicePrincipalName')
3705         m['1'] = MessageElement(host_name,
3706                                 FLAG_MOD_REPLACE,
3707                                 'dNSHostName')
3708         try:
3709             self.ldb_user.modify(m)
3710         except LdbError as err:
3711             num, estr = err.args
3712             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3713         else:
3714             self.fail()
3715
3716     def test_modify_dns_host_name_spn_matching_account_name_original(self):
3717         '''Test modifying dNSHostName and SPN with validated write, matching the original sAMAccountName'''
3718
3719         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3720
3721         account_name = 'test_mod_hostname'
3722         dn = f'CN={account_name},{ou_dn}'
3723
3724         self.ldb_admin.create_ou(ou_dn)
3725
3726         sam_account_name = '3e0abfd0-126a-11d0-a060-00aa006c33ed'
3727
3728         # Grant Write Property.
3729         mod = (f'(OA;CI;WP;{sam_account_name};;'
3730                f'{self.user_sid})')
3731         self.sd_utils.dacl_add_ace(ou_dn, mod)
3732         # Grant Validated Write.
3733         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3734                f'{self.user_sid})')
3735         self.sd_utils.dacl_add_ace(ou_dn, mod)
3736         mod = (f'(OA;CI;SW;{security.GUID_DRS_VALIDATE_SPN};;'
3737                f'{self.user_sid})')
3738         self.sd_utils.dacl_add_ace(ou_dn, mod)
3739
3740         # Create the account.
3741         self.ldb_admin.add({
3742             'dn': dn,
3743             'objectClass': 'computer',
3744             'sAMAccountName': f'{account_name}$',
3745         })
3746
3747         new_account_name = 'test_mod_hostname2'
3748         host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'
3749         spn = f'host/{host_name}'
3750
3751         m = Message(Dn(self.ldb_user, dn))
3752         m['0'] = MessageElement(host_name,
3753                                 FLAG_MOD_REPLACE,
3754                                 'dNSHostName')
3755         m['1'] = MessageElement(spn,
3756                                 FLAG_MOD_ADD,
3757                                 'servicePrincipalName')
3758         m['2'] = MessageElement(f'{new_account_name}$',
3759                                 FLAG_MOD_REPLACE,
3760                                 'sAMAccountName')
3761         try:
3762             self.ldb_user.modify(m)
3763         except LdbError as err:
3764             num, estr = err.args
3765             self.assertEqual(ERR_CONSTRAINT_VIOLATION, num)
3766         else:
3767             self.fail()
3768
3769     def test_modify_dns_host_name_spn_matching_account_name_new(self):
3770         '''Test modifying dNSHostName and SPN with validated write, matching the new sAMAccountName'''
3771
3772         ou_dn = f'OU=test_modify_ou1,{self.base_dn}'
3773
3774         account_name = 'test_mod_hostname'
3775         dn = f'CN={account_name},{ou_dn}'
3776
3777         self.ldb_admin.create_ou(ou_dn)
3778
3779         sam_account_name = '3e0abfd0-126a-11d0-a060-00aa006c33ed'
3780
3781         # Grant Write Property.
3782         mod = (f'(OA;CI;WP;{sam_account_name};;'
3783                f'{self.user_sid})')
3784         self.sd_utils.dacl_add_ace(ou_dn, mod)
3785         # Grant Validated Write.
3786         mod = (f'(OA;CI;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
3787                f'{self.user_sid})')
3788         self.sd_utils.dacl_add_ace(ou_dn, mod)
3789         mod = (f'(OA;CI;SW;{security.GUID_DRS_VALIDATE_SPN};;'
3790                f'{self.user_sid})')
3791         self.sd_utils.dacl_add_ace(ou_dn, mod)
3792
3793         # Create the account.
3794         self.ldb_admin.add({
3795             'dn': dn,
3796             'objectClass': 'computer',
3797             'sAMAccountName': f'{account_name}$',
3798         })
3799
3800         new_account_name = 'test_mod_hostname2'
3801         new_host_name = f'{new_account_name}.{self.ldb_user.domain_dns_name()}'
3802         new_spn = f'host/{new_host_name}'
3803
3804         m = Message(Dn(self.ldb_user, dn))
3805         m['0'] = MessageElement(new_spn,
3806                                 FLAG_MOD_ADD,
3807                                 'servicePrincipalName')
3808         m['1'] = MessageElement(new_host_name,
3809                                 FLAG_MOD_REPLACE,
3810                                 'dNSHostName')
3811         m['2'] = MessageElement(f'{new_account_name}$',
3812                                 FLAG_MOD_REPLACE,
3813                                 'sAMAccountName')
3814         try:
3815             self.ldb_user.modify(m)
3816         except LdbError:
3817             self.fail()
3818
3819 # enable these when we have search implemented
3820
3821
3822 class AclSearchTests(AclTests):
3823
3824     def setUp(self):
3825         super(AclSearchTests, self).setUp()
3826
3827         # permit password changes during this test
3828         PasswordCommon.allow_password_changes(self, self.ldb_admin)
3829
3830         self.u1 = "search_u1"
3831         self.u2 = "search_u2"
3832         self.u3 = "search_u3"
3833         self.group1 = "group1"
3834         self.ldb_admin.newuser(self.u1, self.user_pass)
3835         self.ldb_admin.newuser(self.u2, self.user_pass)
3836         self.ldb_admin.newuser(self.u3, self.user_pass)
3837         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
3838         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
3839                                                 add_members_operation=True)
3840         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
3841         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
3842         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
3843         self.full_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
3844                           Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
3845                           Dn(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
3846                           Dn(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
3847                           Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
3848                           Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
3849         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
3850         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
3851
3852     def create_clean_ou(self, object_dn):
3853         """ Base repeating setup for unittests to follow """
3854         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE,
3855                                     expression="distinguishedName=%s" % object_dn)
3856         # Make sure top testing OU has been deleted before starting the test
3857         self.assertEqual(len(res), 0)
3858         self.ldb_admin.create_ou(object_dn)
3859         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
3860         # Make sure there are inheritable ACEs initially
3861         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
3862         # Find and remove all inherit ACEs
3863         res = re.findall(r"\(.*?\)", desc_sddl)
3864         res = [x for x in res if ("CI" in x) or ("OI" in x)]
3865         for x in res:
3866             desc_sddl = desc_sddl.replace(x, "")
3867         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
3868         # can propagate from above
3869         # remove SACL, we are not interested
3870         desc_sddl = desc_sddl.replace(":AI", ":AIP")
3871         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
3872         # Verify all inheritable ACEs are gone
3873         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
3874         self.assertFalse("CI" in desc_sddl)
3875         self.assertFalse("OI" in desc_sddl)
3876
3877     def tearDown(self):
3878         super(AclSearchTests, self).tearDown()
3879         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
3880         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
3881         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
3882         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
3883         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
3884         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
3885         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
3886         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
3887         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
3888         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
3889         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
3890         delete_force(self.ldb_admin, self.get_user_dn("group1"))
3891
3892         del self.ldb_user
3893         del self.ldb_user2
3894         del self.ldb_user3
3895
3896     def test_search_anonymous1(self):
3897         """Verify access of rootDSE with the correct request"""
3898         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
3899         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
3900         self.assertEqual(len(res), 1)
3901         # verify some of the attributes
3902         # don't care about values
3903         self.assertTrue("ldapServiceName" in res[0])
3904         self.assertTrue("namingContexts" in res[0])
3905         self.assertTrue("isSynchronized" in res[0])
3906         self.assertTrue("dsServiceName" in res[0])
3907         self.assertTrue("supportedSASLMechanisms" in res[0])
3908         self.assertTrue("isGlobalCatalogReady" in res[0])
3909         self.assertTrue("domainControllerFunctionality" in res[0])
3910         self.assertTrue("serverName" in res[0])
3911
3912     def test_search_anonymous2(self):
3913         """Make sure we cannot access anything else"""
3914         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
3915         try:
3916             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
3917         except LdbError as e15:
3918             (num, _) = e15.args
3919             self.assertEqual(num, ERR_OPERATIONS_ERROR)
3920         else:
3921             self.fail()
3922         try:
3923             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
3924         except LdbError as e16:
3925             (num, _) = e16.args
3926             self.assertEqual(num, ERR_OPERATIONS_ERROR)
3927         else:
3928             self.fail()
3929         try:
3930             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
3931                                    scope=SCOPE_SUBTREE)
3932         except LdbError as e17:
3933             (num, _) = e17.args
3934             self.assertEqual(num, ERR_OPERATIONS_ERROR)
3935         else:
3936             self.fail()
3937
3938     def test_search_anonymous3(self):
3939         """Set dsHeuristics and repeat"""
3940         self.ldb_admin.set_dsheuristics("0000002")
3941         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
3942         mod = "(A;CI;LC;;;AN)"
3943         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
3944         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
3945         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
3946         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
3947                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
3948         self.assertEqual(len(res), 1)
3949         self.assertTrue("dn" in res[0])
3950         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
3951                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
3952         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
3953                                scope=SCOPE_SUBTREE)
3954         self.assertEqual(len(res), 1)
3955         self.assertTrue("dn" in res[0])
3956         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
3957
3958     def test_search1(self):
3959         """Make sure users can see us if given LC to user and group"""
3960         self.create_clean_ou("OU=ou1," + self.base_dn)
3961         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
3962         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
3963         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
3964                                                  self.domain_sid)
3965         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
3966         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
3967         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
3968         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
3969         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
3970
3971         # regular users must see only ou1 and ou2
3972         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
3973                                     scope=SCOPE_SUBTREE)
3974         self.assertEqual(len(res), 2)
3975         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
3976                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
3977
3978         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
3979         self.assertEqual(sorted(res_list), sorted(ok_list))
3980
3981         # these users should see all ous
3982         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
3983                                    scope=SCOPE_SUBTREE)
3984         self.assertEqual(len(res), 6)
3985         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
3986         self.assertEqual(sorted(res_list), sorted(self.full_list))
3987
3988         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
3989                                     scope=SCOPE_SUBTREE)
3990         self.assertEqual(len(res), 6)
3991         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
3992         self.assertEqual(sorted(res_list), sorted(self.full_list))
3993
3994     def test_search2(self):
3995         """Make sure users can't see us if access is explicitly denied"""
3996         self.create_clean_ou("OU=ou1," + self.base_dn)
3997         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
3998         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
3999         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
4000         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
4001         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
4002         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
4003         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
4004         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4005                                     scope=SCOPE_SUBTREE)
4006         # this user should see all ous
4007         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
4008         self.assertEqual(sorted(res_list), sorted(self.full_list))
4009
4010         # these users should see ou1, 2, 5 and 6 but not 3 and 4
4011         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4012                                    scope=SCOPE_SUBTREE)
4013         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
4014                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
4015                    Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
4016                    Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
4017         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4018         self.assertEqual(sorted(res_list), sorted(ok_list))
4019
4020         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4021                                     scope=SCOPE_SUBTREE)
4022         self.assertEqual(len(res), 4)
4023         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4024         self.assertEqual(sorted(res_list), sorted(ok_list))
4025
4026     def test_search3(self):
4027         """Make sure users can't see ous if access is explicitly denied - 2"""
4028         self.create_clean_ou("OU=ou1," + self.base_dn)
4029         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
4030         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4031         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
4032                                                  self.domain_sid)
4033         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4034         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4035         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4036         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4037         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4038
4039         print("Testing correct behavior on nonaccessible search base")
4040         try:
4041             self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
4042                                   scope=SCOPE_BASE)
4043         except LdbError as e18:
4044             (num, _) = e18.args
4045             self.assertEqual(num, ERR_NO_SUCH_OBJECT)
4046         else:
4047             self.fail()
4048
4049         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
4050         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
4051
4052         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
4053                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
4054
4055         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4056                                     scope=SCOPE_SUBTREE)
4057         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4058         self.assertEqual(sorted(res_list), sorted(ok_list))
4059
4060         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
4061                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
4062                    Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
4063                    Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
4064
4065         # should not see ou3 and ou4, but should see ou5 and ou6
4066         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4067                                    scope=SCOPE_SUBTREE)
4068         self.assertEqual(len(res), 4)
4069         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4070         self.assertEqual(sorted(res_list), sorted(ok_list))
4071
4072         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4073                                     scope=SCOPE_SUBTREE)
4074         self.assertEqual(len(res), 4)
4075         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4076         self.assertEqual(sorted(res_list), sorted(ok_list))
4077
4078     def test_search4(self):
4079         """There is no difference in visibility if the user is also creator"""
4080         self.create_clean_ou("OU=ou1," + self.base_dn)
4081         mod = "(A;CI;CCWD;;;%s)" % (str(self.user_sid))
4082         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4083         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
4084                                                  self.domain_sid)
4085         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4086         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4087         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4088         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4089         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4090
4091         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
4092                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
4093         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4094                                     scope=SCOPE_SUBTREE)
4095         self.assertEqual(len(res), 2)
4096         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4097         self.assertEqual(sorted(res_list), sorted(ok_list))
4098
4099         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
4100                                    scope=SCOPE_SUBTREE)
4101         self.assertEqual(len(res), 2)
4102         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
4103         self.assertEqual(sorted(res_list), sorted(ok_list))
4104
4105     def test_search5(self):
4106         """Make sure users can see only attributes they are allowed to see"""
4107         self.create_clean_ou("OU=ou1," + self.base_dn)
4108         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
4109         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4110         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
4111                                                  self.domain_sid)
4112         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4113         # assert user can only see dn
4114         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
4115                                    scope=SCOPE_SUBTREE)
4116         ok_list = ['dn']
4117         self.assertEqual(len(res), 1)
4118         res_list = list(res[0].keys())
4119         self.assertEqual(res_list, ok_list)
4120
4121         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
4122                                    scope=SCOPE_BASE, attrs=["ou"])
4123
4124         self.assertEqual(len(res), 1)
4125         res_list = list(res[0].keys())
4126         self.assertEqual(res_list, ok_list)
4127
4128         # give read property on ou and assert user can only see dn and ou
4129         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
4130         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4131         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
4132         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
4133                                    scope=SCOPE_SUBTREE)
4134         ok_list = ['dn', 'ou']
4135         self.assertEqual(len(res), 1)
4136         res_list = list(res[0].keys())
4137         self.assertEqual(sorted(res_list), sorted(ok_list))
4138
4139         # give read property on Public Information and assert user can see ou and other members
4140         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
4141         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4142         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
4143         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
4144                                    scope=SCOPE_SUBTREE)
4145
4146         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
4147         res_list = list(res[0].keys())
4148         self.assertEqual(sorted(res_list), sorted(ok_list))
4149
4150     def test_search6(self):
4151         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
4152         self.create_clean_ou("OU=ou1," + self.base_dn)
4153         mod = "(A;CI;LCCCWD;;;%s)" % (str(self.user_sid))
4154         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4155         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
4156                                                  self.domain_sid)
4157         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4158         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4159
4160         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
4161                                    scope=SCOPE_SUBTREE)
4162         # nothing should be returned as ou is not accessible
4163         self.assertEqual(len(res), 0)
4164
4165         # give read property on ou and assert user can only see dn and ou
4166         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
4167         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
4168         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
4169                                    scope=SCOPE_SUBTREE)
4170         self.assertEqual(len(res), 1)
4171         ok_list = ['dn', 'ou']
4172         res_list = list(res[0].keys())
4173         self.assertEqual(sorted(res_list), sorted(ok_list))
4174
4175         # give read property on Public Information and assert user can see ou and other members
4176         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
4177         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
4178         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
4179                                    scope=SCOPE_SUBTREE)
4180         self.assertEqual(len(res), 1)
4181         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
4182         res_list = list(res[0].keys())
4183         self.assertEqual(sorted(res_list), sorted(ok_list))
4184
4185     def assert_search_on_attr(self, dn, samdb, attr, expected_list):
4186
4187         expected_num = len(expected_list)
4188         res = samdb.search(dn, expression="(%s=*)" % attr, scope=SCOPE_SUBTREE)
4189         self.assertEqual(len(res), expected_num)
4190
4191         res_list = [ x["dn"] for x in res if x["dn"] in expected_list ]
4192         self.assertEqual(sorted(res_list), sorted(expected_list))
4193
4194     def test_search7(self):
4195         """Checks object search visibility when users don't have full rights"""
4196         self.create_clean_ou("OU=ou1," + self.base_dn)
4197         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid),
4198                                             str(self.group_sid))
4199         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
4200         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
4201                                                  self.domain_sid)
4202         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
4203         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
4204                                  sd=tmp_desc)
4205         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
4206                                  sd=tmp_desc)
4207         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
4208                                  sd=tmp_desc)
4209         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
4210                                  sd=tmp_desc)
4211
4212         ou2_dn = Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn)
4213         ou1_dn = Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)
4214
4215         # even though unprivileged users can't read these attributes for OU2,
4216         # the object should still be visible in searches, because they have
4217         # 'List Contents' rights still. This isn't really disclosive because
4218         # ALL objects have these attributes
4219         visible_attrs = ["objectClass", "distinguishedName", "name",
4220                          "objectGUID"]
4221         two_objects = [ou2_dn, ou1_dn]
4222
4223         for attr in visible_attrs:
4224             # a regular user should just see the 2 objects
4225             self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
4226                                        expected_list=two_objects)
4227
4228             # whereas the following users have LC rights for all the objects,
4229             # so they should see them all
4230             self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
4231                                        expected_list=self.full_list)
4232             self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
4233                                        expected_list=self.full_list)
4234
4235         # however when searching on the following attributes, objects will not
4236         # be visible unless the user has Read Property rights
4237         hidden_attrs = ["objectCategory", "instanceType", "ou", "uSNChanged",
4238                         "uSNCreated", "whenCreated"]
4239         one_object = [ou1_dn]
4240
4241         for attr in hidden_attrs:
4242             self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
4243                                        expected_list=one_object)
4244             self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
4245                                        expected_list=one_object)
4246             self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
4247                                        expected_list=one_object)
4248
4249             # admin has RP rights so can still see all the objects
4250             self.assert_search_on_attr(str(ou1_dn), self.ldb_admin, attr,
4251                                        expected_list=self.full_list)
4252
4253
4254 # tests on ldap delete operations
4255
4256
4257 class AclDeleteTests(AclTests):
4258
4259     def setUp(self):
4260         super(AclDeleteTests, self).setUp()
4261         self.regular_user = "acl_delete_user1"
4262         # Create regular user
4263         self.ldb_admin.newuser(self.regular_user, self.user_pass)
4264         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
4265
4266     def tearDown(self):
4267         super(AclDeleteTests, self).tearDown()
4268         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
4269         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
4270         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
4271
4272         del self.ldb_user
4273
4274     def test_delete_u1(self):
4275         """User is prohibited by default to delete another User object"""
4276         # Create user that we try to delete
4277         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
4278         # Here delete User object should ALWAYS through exception
4279         try:
4280             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
4281         except LdbError as e19:
4282             (num, _) = e19.args
4283             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4284         else:
4285             self.fail()
4286
4287     def test_delete_u2(self):
4288         """User's group has RIGHT_DELETE to another User object"""
4289         user_dn = self.get_user_dn("test_delete_user1")
4290         # Create user that we try to delete
4291         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
4292         mod = "(A;;SD;;;AU)"
4293         self.sd_utils.dacl_add_ace(user_dn, mod)
4294         # Try to delete User object
4295         self.ldb_user.delete(user_dn)
4296         res = self.ldb_admin.search(self.base_dn,
4297                                     expression="(distinguishedName=%s)" % user_dn)
4298         self.assertEqual(len(res), 0)
4299
4300     def test_delete_u3(self):
4301         """User indentified by SID has RIGHT_DELETE to another User object"""
4302         user_dn = self.get_user_dn("test_delete_user1")
4303         # Create user that we try to delete
4304         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
4305         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4306         self.sd_utils.dacl_add_ace(user_dn, mod)
4307         # Try to delete User object
4308         self.ldb_user.delete(user_dn)
4309         res = self.ldb_admin.search(self.base_dn,
4310                                     expression="(distinguishedName=%s)" % user_dn)
4311         self.assertEqual(len(res), 0)
4312
4313     def test_delete_anonymous(self):
4314         """Test add operation with anonymous user"""
4315         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
4316         self.ldb_admin.newuser("test_anonymous", "samba123@")
4317
4318         try:
4319             anonymous.delete(self.get_user_dn("test_anonymous"))
4320         except LdbError as e20:
4321             (num, _) = e20.args
4322             self.assertEqual(num, ERR_OPERATIONS_ERROR)
4323         else:
4324             self.fail()
4325
4326 # tests on ldap rename operations
4327
4328
4329 class AclRenameTests(AclTests):
4330
4331     def setUp(self):
4332         super(AclRenameTests, self).setUp()
4333         self.regular_user = "acl_rename_user1"
4334         self.ou1 = "OU=test_rename_ou1"
4335         self.ou2 = "OU=test_rename_ou2"
4336         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
4337         self.testuser1 = "test_rename_user1"
4338         self.testuser2 = "test_rename_user2"
4339         self.testuser3 = "test_rename_user3"
4340         self.testuser4 = "test_rename_user4"
4341         self.testuser5 = "test_rename_user5"
4342         # Create regular user
4343         self.ldb_admin.newuser(self.regular_user, self.user_pass)
4344         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
4345
4346     def tearDown(self):
4347         super(AclRenameTests, self).tearDown()
4348         # Rename OU3
4349         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
4350         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
4351         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
4352         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
4353         # Rename OU2
4354         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
4355         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
4356         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
4357         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
4358         # Rename OU1
4359         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
4360         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
4361         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
4362         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
4363         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
4364         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
4365
4366         del self.ldb_user
4367
4368     def test_rename_u1(self):
4369         """Regular user fails to rename 'User object' within single OU"""
4370         # Create OU structure
4371         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
4372         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
4373         try:
4374             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn),
4375                                  "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
4376         except LdbError as e21:
4377             (num, _) = e21.args
4378             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4379         else:
4380             self.fail()
4381
4382     def test_rename_u2(self):
4383         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
4384         ou_dn = "OU=test_rename_ou1," + self.base_dn
4385         user_dn = "CN=test_rename_user1," + ou_dn
4386         rename_user_dn = "CN=test_rename_user5," + ou_dn
4387         # Create OU structure
4388         self.ldb_admin.create_ou(ou_dn)
4389         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
4390         mod = "(A;;WP;;;AU)"
4391         self.sd_utils.dacl_add_ace(user_dn, mod)
4392         # Rename 'User object' having WP to AU
4393         self.ldb_user.rename(user_dn, rename_user_dn)
4394         res = self.ldb_admin.search(self.base_dn,
4395                                     expression="(distinguishedName=%s)" % user_dn)
4396         self.assertEqual(len(res), 0)
4397         res = self.ldb_admin.search(self.base_dn,
4398                                     expression="(distinguishedName=%s)" % rename_user_dn)
4399         self.assertNotEqual(len(res), 0)
4400
4401     def test_rename_u3(self):
4402         """Test rename with rights granted to 'User object' SID"""
4403         ou_dn = "OU=test_rename_ou1," + self.base_dn
4404         user_dn = "CN=test_rename_user1," + ou_dn
4405         rename_user_dn = "CN=test_rename_user5," + ou_dn
4406         # Create OU structure
4407         self.ldb_admin.create_ou(ou_dn)
4408         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
4409         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4410         mod = "(A;;WP;;;%s)" % str(sid)
4411         self.sd_utils.dacl_add_ace(user_dn, mod)
4412         # Rename 'User object' having WP to AU
4413         self.ldb_user.rename(user_dn, rename_user_dn)
4414         res = self.ldb_admin.search(self.base_dn,
4415                                     expression="(distinguishedName=%s)" % user_dn)
4416         self.assertEqual(len(res), 0)
4417         res = self.ldb_admin.search(self.base_dn,
4418                                     expression="(distinguishedName=%s)" % rename_user_dn)
4419         self.assertNotEqual(len(res), 0)
4420
4421     def test_rename_u4(self):
4422         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
4423         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4424         ou2_dn = "OU=test_rename_ou2," + self.base_dn
4425         user_dn = "CN=test_rename_user2," + ou1_dn
4426         rename_user_dn = "CN=test_rename_user5," + ou2_dn
4427         # Create OU structure
4428         self.ldb_admin.create_ou(ou1_dn)
4429         self.ldb_admin.create_ou(ou2_dn)
4430         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
4431         mod = "(A;;WPSD;;;AU)"
4432         self.sd_utils.dacl_add_ace(user_dn, mod)
4433         mod = "(A;;CC;;;AU)"
4434         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4435         # Rename 'User object' having SD and CC to AU
4436         self.ldb_user.rename(user_dn, rename_user_dn)
4437         res = self.ldb_admin.search(self.base_dn,
4438                                     expression="(distinguishedName=%s)" % user_dn)
4439         self.assertEqual(len(res), 0)
4440         res = self.ldb_admin.search(self.base_dn,
4441                                     expression="(distinguishedName=%s)" % rename_user_dn)
4442         self.assertNotEqual(len(res), 0)
4443
4444     def test_rename_u5(self):
4445         """Test rename with rights granted to 'User object' SID"""
4446         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4447         ou2_dn = "OU=test_rename_ou2," + self.base_dn
4448         user_dn = "CN=test_rename_user2," + ou1_dn
4449         rename_user_dn = "CN=test_rename_user5," + ou2_dn
4450         # Create OU structure
4451         self.ldb_admin.create_ou(ou1_dn)
4452         self.ldb_admin.create_ou(ou2_dn)
4453         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
4454         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4455         mod = "(A;;WPSD;;;%s)" % str(sid)
4456         self.sd_utils.dacl_add_ace(user_dn, mod)
4457         mod = "(A;;CC;;;%s)" % str(sid)
4458         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4459         # Rename 'User object' having SD and CC to AU
4460         self.ldb_user.rename(user_dn, rename_user_dn)
4461         res = self.ldb_admin.search(self.base_dn,
4462                                     expression="(distinguishedName=%s)" % user_dn)
4463         self.assertEqual(len(res), 0)
4464         res = self.ldb_admin.search(self.base_dn,
4465                                     expression="(distinguishedName=%s)" % rename_user_dn)
4466         self.assertNotEqual(len(res), 0)
4467
4468     def test_rename_u6(self):
4469         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
4470         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4471         ou2_dn = "OU=test_rename_ou2," + self.base_dn
4472         user_dn = "CN=test_rename_user2," + ou1_dn
4473         rename_user_dn = "CN=test_rename_user2," + ou2_dn
4474         # Create OU structure
4475         self.ldb_admin.create_ou(ou1_dn)
4476         self.ldb_admin.create_ou(ou2_dn)
4477         #mod = "(A;CI;DCWP;;;AU)"
4478         mod = "(A;;DC;;;AU)"
4479         self.sd_utils.dacl_add_ace(ou1_dn, mod)
4480         mod = "(A;;CC;;;AU)"
4481         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4482         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
4483         mod = "(A;;WP;;;AU)"
4484         self.sd_utils.dacl_add_ace(user_dn, mod)
4485         # Rename 'User object' having SD and CC to AU
4486         self.ldb_user.rename(user_dn, rename_user_dn)
4487         res = self.ldb_admin.search(self.base_dn,
4488                                     expression="(distinguishedName=%s)" % user_dn)
4489         self.assertEqual(len(res), 0)
4490         res = self.ldb_admin.search(self.base_dn,
4491                                     expression="(distinguishedName=%s)" % rename_user_dn)
4492         self.assertNotEqual(len(res), 0)
4493
4494     def test_rename_u7(self):
4495         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
4496         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4497         ou2_dn = "OU=test_rename_ou2," + self.base_dn
4498         ou3_dn = "OU=test_rename_ou3," + ou2_dn
4499         user_dn = "CN=test_rename_user2," + ou1_dn
4500         rename_user_dn = "CN=test_rename_user5," + ou3_dn
4501         # Create OU structure
4502         self.ldb_admin.create_ou(ou1_dn)
4503         self.ldb_admin.create_ou(ou2_dn)
4504         self.ldb_admin.create_ou(ou3_dn)
4505         mod = "(A;CI;WPDC;;;AU)"
4506         self.sd_utils.dacl_add_ace(ou1_dn, mod)
4507         mod = "(A;;CC;;;AU)"
4508         self.sd_utils.dacl_add_ace(ou3_dn, mod)
4509         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
4510         # Rename 'User object' having SD and CC to AU
4511         self.ldb_user.rename(user_dn, rename_user_dn)
4512         res = self.ldb_admin.search(self.base_dn,
4513                                     expression="(distinguishedName=%s)" % user_dn)
4514         self.assertEqual(len(res), 0)
4515         res = self.ldb_admin.search(self.base_dn,
4516                                     expression="(distinguishedName=%s)" % rename_user_dn)
4517         self.assertNotEqual(len(res), 0)
4518
4519     def test_rename_u8(self):
4520         """Test rename on an object with and without modify access on the RDN attribute"""
4521         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4522         ou2_dn = "OU=test_rename_ou2," + ou1_dn
4523         ou3_dn = "OU=test_rename_ou3," + ou1_dn
4524         # Create OU structure
4525         self.ldb_admin.create_ou(ou1_dn)
4526         self.ldb_admin.create_ou(ou2_dn)
4527         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4528         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
4529         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4530         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
4531         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4532         try:
4533             self.ldb_user.rename(ou2_dn, ou3_dn)
4534         except LdbError as e22:
4535             (num, _) = e22.args
4536             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4537         else:
4538             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
4539             self.fail()
4540         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4541         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
4542         self.sd_utils.dacl_add_ace(ou2_dn, mod)
4543         self.ldb_user.rename(ou2_dn, ou3_dn)
4544         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
4545         self.assertEqual(len(res), 0)
4546         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
4547         self.assertNotEqual(len(res), 0)
4548
4549     def test_rename_u9(self):
4550         """Rename 'User object' cross OU, with explicit deny on sd and dc"""
4551         ou1_dn = "OU=test_rename_ou1," + self.base_dn
4552         ou2_dn = "OU=test_rename_ou2," + self.base_dn
4553         user_dn = "CN=test_rename_user2," + ou1_dn
4554         rename_user_dn = "CN=test_rename_user5," + ou2_dn
4555         # Create OU structure
4556         self.ldb_admin.create_ou(ou1_dn)
4557         self.ldb_admin.create_ou(ou2_dn)
4558         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
4559         mod = "(D;;SD;;;DA)"
4560         self.sd_utils.dacl_add_ace(user_dn, mod)
4561         mod = "(D;;DC;;;DA)"
4562         self.sd_utils.dacl_add_ace(ou1_dn, mod)
4563         # Rename 'User object' having SD and CC to AU
4564         try:
4565             self.ldb_admin.rename(user_dn, rename_user_dn)
4566         except LdbError as e23:
4567             (num, _) = e23.args
4568             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4569         else:
4570             self.fail()
4571         # add an allow ace so we can delete this ou
4572         mod = "(A;;DC;;;DA)"
4573         self.sd_utils.dacl_add_ace(ou1_dn, mod)
4574
4575
4576 # tests on Control Access Rights
4577 class AclCARTests(AclTests):
4578
4579     def setUp(self):
4580         super(AclCARTests, self).setUp()
4581
4582         # Get the old "dSHeuristics" if it was set
4583         dsheuristics = self.ldb_admin.get_dsheuristics()
4584         # Reset the "dSHeuristics" as they were before
4585         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
4586         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
4587         self.ldb_admin.set_dsheuristics("000000001")
4588         # Get the old "minPwdAge"
4589         minPwdAge = self.ldb_admin.get_minPwdAge()
4590         # Reset the "minPwdAge" as it was before
4591         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
4592         # Set it temporarely to "0"
4593         self.ldb_admin.set_minPwdAge("0")
4594
4595         self.user_with_wp = "acl_car_user1"
4596         self.user_with_pc = "acl_car_user2"
4597         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
4598         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
4599         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
4600         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
4601
4602     def tearDown(self):
4603         super(AclCARTests, self).tearDown()
4604         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
4605         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
4606
4607         del self.ldb_user
4608         del self.ldb_user2
4609
4610     def test_change_password1(self):
4611         """Try a password change operation without any CARs given"""
4612         # users have change password by default - remove for negative testing
4613         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4614         sddl = desc.as_sddl(self.domain_sid)
4615         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
4616         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
4617         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
4618         try:
4619             self.ldb_user.modify_ldif("""
4620 dn: """ + self.get_user_dn(self.user_with_wp) + """
4621 changetype: modify
4622 delete: unicodePwd
4623 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
4624 add: unicodePwd
4625 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
4626 """)
4627         except LdbError as e24:
4628             (num, _) = e24.args
4629             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
4630         else:
4631             # for some reason we get constraint violation instead of insufficient access error
4632             self.fail()
4633
4634     def test_change_password2(self):
4635         """Make sure WP has no influence"""
4636         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4637         sddl = desc.as_sddl(self.domain_sid)
4638         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
4639         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
4640         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
4641         mod = "(A;;WP;;;PS)"
4642         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4643         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4644         sddl = desc.as_sddl(self.domain_sid)
4645         try:
4646             self.ldb_user.modify_ldif("""
4647 dn: """ + self.get_user_dn(self.user_with_wp) + """
4648 changetype: modify
4649 delete: unicodePwd
4650 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
4651 add: unicodePwd
4652 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
4653 """)
4654         except LdbError as e25:
4655             (num, _) = e25.args
4656             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
4657         else:
4658             # for some reason we get constraint violation instead of insufficient access error
4659             self.fail()
4660
4661     def test_change_password3(self):
4662         """Make sure WP has no influence"""
4663         mod = "(D;;WP;;;PS)"
4664         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4665         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4666         sddl = desc.as_sddl(self.domain_sid)
4667         self.ldb_user.modify_ldif("""
4668 dn: """ + self.get_user_dn(self.user_with_wp) + """
4669 changetype: modify
4670 delete: unicodePwd
4671 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
4672 add: unicodePwd
4673 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
4674 """)
4675
4676     def test_change_password5(self):
4677         """Make sure rights have no influence on dBCSPwd"""
4678         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4679         sddl = desc.as_sddl(self.domain_sid)
4680         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
4681         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
4682         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
4683         mod = "(D;;WP;;;PS)"
4684         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4685         try:
4686             self.ldb_user.modify_ldif("""
4687 dn: """ + self.get_user_dn(self.user_with_wp) + """
4688 changetype: modify
4689 delete: dBCSPwd
4690 dBCSPwd: XXXXXXXXXXXXXXXX
4691 add: dBCSPwd
4692 dBCSPwd: YYYYYYYYYYYYYYYY
4693 """)
4694         except LdbError as e26:
4695             (num, _) = e26.args
4696             self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
4697         else:
4698             self.fail()
4699
4700     def test_change_password6(self):
4701         """Test uneven delete/adds"""
4702         try:
4703             self.ldb_user.modify_ldif("""
4704 dn: """ + self.get_user_dn(self.user_with_wp) + """
4705 changetype: modify
4706 delete: userPassword
4707 userPassword: thatsAcomplPASS1
4708 delete: userPassword
4709 userPassword: thatsAcomplPASS1
4710 add: userPassword
4711 userPassword: thatsAcomplPASS2
4712 """)
4713         except LdbError as e27:
4714             (num, _) = e27.args
4715             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4716         else:
4717             self.fail()
4718         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
4719         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4720         try:
4721             self.ldb_user.modify_ldif("""
4722 dn: """ + self.get_user_dn(self.user_with_wp) + """
4723 changetype: modify
4724 delete: userPassword
4725 userPassword: thatsAcomplPASS1
4726 delete: userPassword
4727 userPassword: thatsAcomplPASS1
4728 add: userPassword
4729 userPassword: thatsAcomplPASS2
4730 """)
4731             # This fails on Windows 2000 domain level with constraint violation
4732         except LdbError as e28:
4733             (num, _) = e28.args
4734             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
4735                             num == ERR_UNWILLING_TO_PERFORM)
4736         else:
4737             self.fail()
4738
4739     def test_change_password7(self):
4740         """Try a password change operation without any CARs given"""
4741         # users have change password by default - remove for negative testing
4742         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
4743         sddl = desc.as_sddl(self.domain_sid)
4744         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
4745         # first change our own password
4746         self.ldb_user2.modify_ldif("""
4747 dn: """ + self.get_user_dn(self.user_with_pc) + """
4748 changetype: modify
4749 delete: unicodePwd
4750 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
4751 add: unicodePwd
4752 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
4753 """)
4754         # then someone else's
4755         self.ldb_user2.modify_ldif("""
4756 dn: """ + self.get_user_dn(self.user_with_wp) + """
4757 changetype: modify
4758 delete: unicodePwd
4759 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
4760 add: unicodePwd
4761 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
4762 """)
4763
4764     def test_reset_password1(self):
4765         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
4766         try:
4767             self.ldb_user.modify_ldif("""
4768 dn: """ + self.get_user_dn(self.user_with_wp) + """
4769 changetype: modify
4770 replace: unicodePwd
4771 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
4772 """)
4773         except LdbError as e29:
4774             (num, _) = e29.args
4775             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4776         else:
4777             self.fail()
4778         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
4779         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4780         self.ldb_user.modify_ldif("""
4781 dn: """ + self.get_user_dn(self.user_with_wp) + """
4782 changetype: modify
4783 replace: unicodePwd
4784 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
4785 """)
4786
4787     def test_reset_password2(self):
4788         """Try a user password reset operation (userPassword) before and after granting CAR"""
4789         try:
4790             self.ldb_user.modify_ldif("""
4791 dn: """ + self.get_user_dn(self.user_with_wp) + """
4792 changetype: modify
4793 replace: userPassword
4794 userPassword: thatsAcomplPASS1
4795 """)
4796         except LdbError as e30:
4797             (num, _) = e30.args
4798             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4799         else:
4800             self.fail()
4801         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
4802         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4803         try:
4804             self.ldb_user.modify_ldif("""
4805 dn: """ + self.get_user_dn(self.user_with_wp) + """
4806 changetype: modify
4807 replace: userPassword
4808 userPassword: thatsAcomplPASS1
4809 """)
4810             # This fails on Windows 2000 domain level with constraint violation
4811         except LdbError as e31:
4812             (num, _) = e31.args
4813             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
4814         else:
4815             pass # Not self.fail() as we normally want success.
4816
4817     def test_reset_password3(self):
4818         """Grant WP and see what happens (unicodePwd)"""
4819         mod = "(A;;WP;;;PS)"
4820         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4821         try:
4822             self.ldb_user.modify_ldif("""
4823 dn: """ + self.get_user_dn(self.user_with_wp) + """
4824 changetype: modify
4825 replace: unicodePwd
4826 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
4827 """)
4828         except LdbError as e32:
4829             (num, _) = e32.args
4830             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4831         else:
4832             self.fail()
4833
4834     def test_reset_password4(self):
4835         """Grant WP and see what happens (userPassword)"""
4836         mod = "(A;;WP;;;PS)"
4837         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4838         try:
4839             self.ldb_user.modify_ldif("""
4840 dn: """ + self.get_user_dn(self.user_with_wp) + """
4841 changetype: modify
4842 replace: userPassword
4843 userPassword: thatsAcomplPASS1
4844 """)
4845         except LdbError as e33:
4846             (num, _) = e33.args
4847             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
4848         else:
4849             self.fail()
4850
4851     def test_reset_password5(self):
4852         """Explicitly deny WP but grant CAR (unicodePwd)"""
4853         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
4854         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4855         self.ldb_user.modify_ldif("""
4856 dn: """ + self.get_user_dn(self.user_with_wp) + """
4857 changetype: modify
4858 replace: unicodePwd
4859 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
4860 """)
4861
4862     def test_reset_password6(self):
4863         """Explicitly deny WP but grant CAR (userPassword)"""
4864         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
4865         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
4866         try:
4867             self.ldb_user.modify_ldif("""
4868 dn: """ + self.get_user_dn(self.user_with_wp) + """
4869 changetype: modify
4870 replace: userPassword
4871 userPassword: thatsAcomplPASS1
4872 """)
4873             # This fails on Windows 2000 domain level with constraint violation
4874         except LdbError as e34:
4875             (num, _) = e34.args
4876             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
4877         else:
4878             pass # Not self.fail() as we normally want success
4879
4880
4881 class AclExtendedTests(AclTests):
4882
4883     def setUp(self):
4884         super(AclExtendedTests, self).setUp()
4885         # regular user, will be the creator
4886         self.u1 = "ext_u1"
4887         # regular user
4888         self.u2 = "ext_u2"
4889         # admin user
4890         self.u3 = "ext_u3"
4891         self.ldb_admin.newuser(self.u1, self.user_pass)
4892         self.ldb_admin.newuser(self.u2, self.user_pass)
4893         self.ldb_admin.newuser(self.u3, self.user_pass)
4894         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
4895                                                 add_members_operation=True)
4896         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
4897         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
4898         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
4899         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
4900         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
4901
4902     def tearDown(self):
4903         super(AclExtendedTests, self).tearDown()
4904         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
4905         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
4906         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
4907         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
4908         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
4909
4910         del self.ldb_user1
4911         del self.ldb_user2
4912         del self.ldb_user3
4913
4914     def test_ntSecurityDescriptor(self):
4915         # create empty ou
4916         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
4917         # give u1 Create children access
4918         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
4919         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
4920         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
4921         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
4922         # create a group under that, grant RP to u2
4923         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
4924                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
4925         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
4926         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
4927         # u2 must not read the descriptor
4928         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
4929                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
4930         self.assertNotEqual(len(res), 0)
4931         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
4932         # grant RC to u2 - still no access
4933         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
4934         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
4935         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
4936                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
4937         self.assertNotEqual(len(res), 0)
4938         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
4939         # u3 is member of administrators group, should be able to read sd
4940         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
4941                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
4942         self.assertEqual(len(res), 1)
4943         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
4944
4945
4946 class AclUndeleteTests(AclTests):
4947
4948     def setUp(self):
4949         super(AclUndeleteTests, self).setUp()
4950         self.regular_user = "undeleter1"
4951         self.ou1 = "OU=undeleted_ou,"
4952         self.testuser1 = "to_be_undeleted1"
4953         self.testuser2 = "to_be_undeleted2"
4954         self.testuser3 = "to_be_undeleted3"
4955         self.testuser4 = "to_be_undeleted4"
4956         self.testuser5 = "to_be_undeleted5"
4957         self.testuser6 = "to_be_undeleted6"
4958
4959         self.new_dn_ou = "CN=" + self.testuser4 + "," + self.ou1 + self.base_dn
4960
4961         # Create regular user
4962         self.testuser1_dn = self.get_user_dn(self.testuser1)
4963         self.testuser2_dn = self.get_user_dn(self.testuser2)
4964         self.testuser3_dn = self.get_user_dn(self.testuser3)
4965         self.testuser4_dn = self.get_user_dn(self.testuser4)
4966         self.testuser5_dn = self.get_user_dn(self.testuser5)
4967         self.deleted_dn1 = self.create_delete_user(self.testuser1)
4968         self.deleted_dn2 = self.create_delete_user(self.testuser2)
4969         self.deleted_dn3 = self.create_delete_user(self.testuser3)
4970         self.deleted_dn4 = self.create_delete_user(self.testuser4)
4971         self.deleted_dn5 = self.create_delete_user(self.testuser5)
4972
4973         self.ldb_admin.create_ou(self.ou1 + self.base_dn)
4974
4975         self.ldb_admin.newuser(self.regular_user, self.user_pass)
4976         self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
4977                                                 add_members_operation=True)
4978         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
4979         self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
4980
4981     def tearDown(self):
4982         super(AclUndeleteTests, self).tearDown()
4983         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
4984         delete_force(self.ldb_admin, self.get_user_dn(self.testuser1))
4985         delete_force(self.ldb_admin, self.get_user_dn(self.testuser2))
4986         delete_force(self.ldb_admin, self.get_user_dn(self.testuser3))
4987         delete_force(self.ldb_admin, self.get_user_dn(self.testuser4))
4988         delete_force(self.ldb_admin, self.get_user_dn(self.testuser5))
4989         delete_force(self.ldb_admin, self.new_dn_ou)
4990         delete_force(self.ldb_admin, self.ou1 + self.base_dn)
4991
4992         del self.ldb_user
4993
4994     def GUID_string(self, guid):
4995         return get_string(ldb.schema_format_value("objectGUID", guid))
4996
4997     def create_delete_user(self, new_user):
4998         self.ldb_admin.newuser(new_user, self.user_pass)
4999
5000         res = self.ldb_admin.search(expression="(objectClass=*)",
5001                                     base=self.get_user_dn(new_user),
5002                                     scope=SCOPE_BASE,
5003                                     controls=["show_deleted:1"])
5004         guid = res[0]["objectGUID"][0]
5005         self.ldb_admin.delete(self.get_user_dn(new_user))
5006         res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
5007                                     scope=SCOPE_BASE, controls=["show_deleted:1"])
5008         self.assertEqual(len(res), 1)
5009         return str(res[0].dn)
5010
5011     def undelete_deleted(self, olddn, newdn):
5012         msg = Message()
5013         msg.dn = Dn(self.ldb_user, olddn)
5014         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
5015         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
5016         res = self.ldb_user.modify(msg, ["show_recycled:1"])
5017
5018     def undelete_deleted_with_mod(self, olddn, newdn):
5019         msg = Message()
5020         msg.dn = Dn(ldb, olddn)
5021         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
5022         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
5023         msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
5024         res = self.ldb_user.modify(msg, ["show_deleted:1"])
5025
5026     def test_undelete(self):
5027         # it appears the user has to have LC on the old parent to be able to move the object
5028         # otherwise we get no such object. Since only System can modify the SD on deleted object
5029         # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
5030
5031         # deny write property on rdn, should fail
5032         mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
5033         self.sd_utils.dacl_add_ace(self.deleted_dn1, mod)
5034         try:
5035             self.undelete_deleted(self.deleted_dn1, self.testuser1_dn)
5036             self.fail()
5037         except LdbError as e35:
5038             (num, _) = e35.args
5039             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5040
5041         # seems that permissions on isDeleted and distinguishedName are irrelevant
5042         mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
5043         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
5044         mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
5045         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
5046         self.undelete_deleted(self.deleted_dn2, self.testuser2_dn)
5047
5048         # attempt undelete with simultanious addition of url, WP to which is denied
5049         mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid)
5050         self.sd_utils.dacl_add_ace(self.deleted_dn3, mod)
5051         try:
5052             self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn)
5053             self.fail()
5054         except LdbError as e36:
5055             (num, _) = e36.args
5056             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5057
5058         # undelete in an ou, in which we have no right to create children
5059         mod = "(D;;CC;;;%s)" % str(self.sid)
5060         self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod)
5061         try:
5062             self.undelete_deleted(self.deleted_dn4, self.new_dn_ou)
5063             self.fail()
5064         except LdbError as e37:
5065             (num, _) = e37.args
5066             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5067
5068         # delete is not required
5069         mod = "(D;;SD;;;%s)" % str(self.sid)
5070         self.sd_utils.dacl_add_ace(self.deleted_dn5, mod)
5071         self.undelete_deleted(self.deleted_dn5, self.testuser5_dn)
5072
5073         # deny Reanimate-Tombstone, should fail
5074         mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid)
5075         self.sd_utils.dacl_add_ace(self.base_dn, mod)
5076         try:
5077             self.undelete_deleted(self.deleted_dn4, self.testuser4_dn)
5078             self.fail()
5079         except LdbError as e38:
5080             (num, _) = e38.args
5081             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5082
5083
5084 class AclSPNTests(AclTests):
5085
5086     def setUp(self):
5087         super(AclSPNTests, self).setUp()
5088         self.dcname = "TESTSRV8"
5089         self.rodcname = "TESTRODC8"
5090         self.computername = "testcomp8"
5091         self.test_user = "spn_test_user8"
5092         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
5093         self.user_object = "user_with_spn"
5094         self.user_object_dn = "CN=%s,CN=Users,%s" % (self.user_object, self.base_dn)
5095         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
5096         self.site = "Default-First-Site-Name"
5097         self.rodcctx = DCJoinContext(server=host, creds=creds, lp=lp,
5098                                      site=self.site, netbios_name=self.rodcname,
5099                                      targetdir=None, domain=None)
5100         self.dcctx = DCJoinContext(server=host, creds=creds, lp=lp,
5101                                    site=self.site, netbios_name=self.dcname,
5102                                    targetdir=None, domain=None)
5103         self.ldb_admin.newuser(self.test_user, self.user_pass)
5104         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
5105         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
5106         self.create_computer(self.computername, self.dcctx.dnsdomain)
5107         self.create_rodc(self.rodcctx)
5108         self.create_dc(self.dcctx)
5109
5110     def tearDown(self):
5111         super(AclSPNTests, self).tearDown()
5112         self.rodcctx.cleanup_old_join()
5113         self.dcctx.cleanup_old_join()
5114         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
5115         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
5116         delete_force(self.ldb_admin, self.user_object_dn)
5117
5118         del self.ldb_user1
5119
5120     def replace_spn(self, _ldb, dn, spn):
5121         print("Setting spn %s on %s" % (spn, dn))
5122         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
5123                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
5124         if "servicePrincipalName" in res[0].keys():
5125             flag = FLAG_MOD_REPLACE
5126         else:
5127             flag = FLAG_MOD_ADD
5128
5129         msg = Message()
5130         msg.dn = Dn(self.ldb_admin, dn)
5131         msg["servicePrincipalName"] = MessageElement(spn, flag,
5132                                                      "servicePrincipalName")
5133         _ldb.modify(msg)
5134
5135     def create_computer(self, computername, domainname):
5136         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
5137         samaccountname = computername + "$"
5138         dnshostname = "%s.%s" % (computername, domainname)
5139         self.ldb_admin.add({
5140             "dn": dn,
5141             "objectclass": "computer",
5142             "sAMAccountName": samaccountname,
5143             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
5144             "dNSHostName": dnshostname})
5145
5146     # same as for join_RODC, but do not set any SPNs
5147     def create_rodc(self, ctx):
5148         ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
5149         ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
5150         ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
5151
5152         ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
5153                                 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
5154                                 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
5155                                 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
5156                                 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
5157         ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
5158
5159         mysid = ctx.get_mysid()
5160         admin_dn = "<SID=%s>" % mysid
5161         ctx.managedby = admin_dn
5162
5163         ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
5164                                   samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
5165                                   samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
5166
5167         ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
5168         ctx.secure_channel_type = misc.SEC_CHAN_RODC
5169         ctx.RODC = True
5170         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
5171                              drsuapi.DRSUAPI_DRS_PER_SYNC |
5172                              drsuapi.DRSUAPI_DRS_GET_ANC |
5173                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
5174                              drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
5175
5176         ctx.join_add_objects()
5177
5178     def create_dc(self, ctx):
5179         ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
5180         ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
5181         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
5182         ctx.secure_channel_type = misc.SEC_CHAN_BDC
5183         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
5184                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
5185                              drsuapi.DRSUAPI_DRS_PER_SYNC |
5186                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
5187                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
5188
5189         ctx.join_add_objects()
5190
5191     def dc_spn_test(self, ctx):
5192         netbiosdomain = self.dcctx.get_domain_name()
5193         try:
5194             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
5195         except LdbError as e39:
5196             (num, _) = e39.args
5197             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5198         else:
5199             self.fail()
5200
5201         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
5202         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
5203         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
5204         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
5205         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
5206                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
5207         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
5208         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
5209                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
5210         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
5211                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
5212         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
5213         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
5214                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
5215         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
5216         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
5217         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
5218                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
5219         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
5220         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
5221                          (ctx.myname, ctx.dnsdomain))
5222         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
5223                          (ctx.myname))
5224         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
5225                          (ctx.myname, ctx.dnsdomain))
5226         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
5227                          (ctx.myname, ctx.dnsdomain))
5228         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
5229                          (ctx.ntds_guid, ctx.dnsdomain))
5230
5231         # the following spns do not match the restrictions and should fail
5232         try:
5233             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
5234                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
5235         except LdbError as e40:
5236             (num, _) = e40.args
5237             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5238         else:
5239             self.fail()
5240         try:
5241             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
5242                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
5243         except LdbError as e41:
5244             (num, _) = e41.args
5245             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5246         else:
5247             self.fail()
5248         try:
5249             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
5250         except LdbError as e42:
5251             (num, _) = e42.args
5252             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5253         else:
5254             self.fail()
5255         try:
5256             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
5257                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
5258         except LdbError as e43:
5259             (num, _) = e43.args
5260             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5261         else:
5262             self.fail()
5263         try:
5264             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
5265                              (ctx.ntds_guid, ctx.dnsdomain))
5266         except LdbError as e44:
5267             (num, _) = e44.args
5268             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5269         else:
5270             self.fail()
5271
5272     def test_computer_spn(self):
5273         # with WP, any value can be set
5274         netbiosdomain = self.dcctx.get_domain_name()
5275         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
5276                          (self.computername, netbiosdomain))
5277         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
5278         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
5279                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
5280         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
5281                          (self.computername, self.dcctx.dnsdomain))
5282         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
5283                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5284         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
5285                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
5286         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
5287         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
5288                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5289         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
5290                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5291         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
5292                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
5293         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
5294         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
5295                          (self.computername, self.dcctx.dnsdomain))
5296         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
5297                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5298         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
5299                          (self.computername, self.dcctx.dnsdomain))
5300         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
5301                          (self.computername, self.dcctx.dnsdomain))
5302         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
5303                          (self.computername))
5304         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
5305                          (self.computername, self.dcctx.dnsdomain))
5306         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
5307                          (self.computername, self.dcctx.dnsdomain))
5308         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
5309
5310         # user has neither WP nor Validated-SPN, access denied expected
5311         try:
5312             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
5313         except LdbError as e45:
5314             (num, _) = e45.args
5315             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5316         else:
5317             self.fail()
5318
5319         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
5320         self.sd_utils.dacl_add_ace(self.computerdn, mod)
5321         # grant Validated-SPN and check which values are accepted
5322         # see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
5323
5324         # for regular computer objects we shouldalways get constraint violation
5325
5326         # This does not pass against Windows, although it should according to docs
5327         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
5328         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
5329                          (self.computername, self.dcctx.dnsdomain))
5330
5331         try:
5332             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
5333         except LdbError as e46:
5334             (num, _) = e46.args
5335             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5336         else:
5337             self.fail()
5338         try:
5339             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
5340                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
5341         except LdbError as e47:
5342             (num, _) = e47.args
5343             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5344         else:
5345             self.fail()
5346         try:
5347             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
5348                              (self.computername, self.dcctx.dnsdomain))
5349         except LdbError as e48:
5350             (num, _) = e48.args
5351             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5352         else:
5353             self.fail()
5354         try:
5355             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
5356                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5357         except LdbError as e49:
5358             (num, _) = e49.args
5359             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5360         else:
5361             self.fail()
5362         try:
5363             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
5364                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
5365         except LdbError as e50:
5366             (num, _) = e50.args
5367             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5368         else:
5369             self.fail()
5370         try:
5371             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
5372         except LdbError as e51:
5373             (num, _) = e51.args
5374             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5375         else:
5376             self.fail()
5377         try:
5378             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
5379                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
5380         except LdbError as e52:
5381             (num, _) = e52.args
5382             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5383         else:
5384             self.fail()
5385
5386     def test_spn_rwdc(self):
5387         self.dc_spn_test(self.dcctx)
5388
5389     def test_spn_rodc(self):
5390         self.dc_spn_test(self.rodcctx)
5391
5392     def test_user_spn(self):
5393         #grant SW to a regular user and try to set the spn on a user object
5394         #should get  ERR_INSUFFICIENT_ACCESS_RIGHTS, since Validate-SPN only applies to computer
5395         self.ldb_admin.newuser(self.user_object, self.user_pass)
5396         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
5397         self.sd_utils.dacl_add_ace(self.user_object_dn, mod)
5398         try:
5399             self.replace_spn(self.ldb_user1, self.user_object_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
5400         except LdbError as e60:
5401             (num, _) = e60.args
5402             self.assertEqual(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
5403         else:
5404             self.fail()
5405
5406     def test_delete_add_spn(self):
5407         # Grant Validated-SPN property.
5408         mod = f'(OA;;SW;{security.GUID_DRS_VALIDATE_SPN};;{self.user_sid1})'
5409         self.sd_utils.dacl_add_ace(self.computerdn, mod)
5410
5411         spn_base = f'HOST/{self.computername}'
5412
5413         allowed_spn = f'{spn_base}.{self.dcctx.dnsdomain}'
5414         not_allowed_spn = f'{spn_base}/{self.dcctx.get_domain_name()}'
5415
5416         # Ensure we are able to add an allowed SPN.
5417         msg = Message(Dn(self.ldb_user1, self.computerdn))
5418         msg['servicePrincipalName'] = MessageElement(allowed_spn,
5419                                                      FLAG_MOD_ADD,
5420                                                      'servicePrincipalName')
5421         self.ldb_user1.modify(msg)
5422
5423         # Ensure we are not able to add a disallowed SPN.
5424         msg = Message(Dn(self.ldb_user1, self.computerdn))
5425         msg['servicePrincipalName'] = MessageElement(not_allowed_spn,
5426                                                      FLAG_MOD_ADD,
5427                                                      'servicePrincipalName')
5428         try:
5429             self.ldb_user1.modify(msg)
5430         except LdbError as e:
5431             num, _ = e.args
5432             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5433         else:
5434             self.fail(f'able to add disallowed SPN {not_allowed_spn}')
5435
5436         # Ensure that deleting an existing SPN followed by adding a disallowed
5437         # SPN fails.
5438         msg = Message(Dn(self.ldb_user1, self.computerdn))
5439         msg['0'] = MessageElement([],
5440                                   FLAG_MOD_DELETE,
5441                                   'servicePrincipalName')
5442         msg['1'] = MessageElement(not_allowed_spn,
5443                                   FLAG_MOD_ADD,
5444                                   'servicePrincipalName')
5445         try:
5446             self.ldb_user1.modify(msg)
5447         except LdbError as e:
5448             num, _ = e.args
5449             self.assertEqual(num, ERR_CONSTRAINT_VIOLATION)
5450         else:
5451             self.fail(f'able to add disallowed SPN {not_allowed_spn}')
5452
5453     def test_delete_disallowed_spn(self):
5454         # Grant Validated-SPN property.
5455         mod = f'(OA;;SW;{security.GUID_DRS_VALIDATE_SPN};;{self.user_sid1})'
5456         self.sd_utils.dacl_add_ace(self.computerdn, mod)
5457
5458         spn_base = f'HOST/{self.computername}'
5459
5460         not_allowed_spn = f'{spn_base}/{self.dcctx.get_domain_name()}'
5461
5462         # Add a disallowed SPN as admin.
5463         msg = Message(Dn(self.ldb_admin, self.computerdn))
5464         msg['servicePrincipalName'] = MessageElement(not_allowed_spn,
5465                                                      FLAG_MOD_ADD,
5466                                                      'servicePrincipalName')
5467         self.ldb_admin.modify(msg)
5468
5469         # Ensure we are able to delete a disallowed SPN.
5470         msg = Message(Dn(self.ldb_user1, self.computerdn))
5471         msg['servicePrincipalName'] = MessageElement(not_allowed_spn,
5472                                                      FLAG_MOD_DELETE,
5473                                                      'servicePrincipalName')
5474         try:
5475             self.ldb_user1.modify(msg)
5476         except LdbError:
5477             self.fail(f'unable to delete disallowed SPN {not_allowed_spn}')
5478
5479
5480 # tests SEC_ADS_LIST vs. SEC_ADS_LIST_OBJECT
5481 @DynamicTestCase
5482 class AclVisibiltyTests(AclTests):
5483
5484     envs = {
5485         "No": False,
5486         "Do": True,
5487     }
5488     modes = {
5489         "Allow": False,
5490         "Deny": True,
5491     }
5492     perms = {
5493         "nn": 0,
5494         "Cn": security.SEC_ADS_LIST,
5495         "nO": security.SEC_ADS_LIST_OBJECT,
5496         "CO": security.SEC_ADS_LIST | security.SEC_ADS_LIST_OBJECT,
5497     }
5498
5499     @classmethod
5500     def setUpDynamicTestCases(cls):
5501         for le in cls.envs.keys():
5502             for lm in cls.modes.keys():
5503                 for l1 in cls.perms.keys():
5504                     for l2 in cls.perms.keys():
5505                         for l3 in cls.perms.keys():
5506                             tname = "%s_%s_%s_%s_%s" % (le, lm, l1, l2, l3)
5507                             ve = cls.envs[le]
5508                             vm = cls.modes[lm]
5509                             v1 = cls.perms[l1]
5510                             v2 = cls.perms[l2]
5511                             v3 = cls.perms[l3]
5512                             targs = (tname, ve, vm, v1, v2, v3)
5513                             cls.generate_dynamic_test("test_visibility",
5514                                                       tname, *targs)
5515         return
5516
5517     def setUp(self):
5518         super(AclVisibiltyTests, self).setUp()
5519
5520         # Get the old "dSHeuristics" if it was set
5521         self.dsheuristics = self.ldb_admin.get_dsheuristics()
5522         # Reset the "dSHeuristics" as they were before
5523         self.addCleanup(self.ldb_admin.set_dsheuristics, self.dsheuristics)
5524
5525         # Domain Admins and SYSTEM get full access
5526         self.sddl_dacl = "D:PAI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
5527         self.set_dacl_control = ["sd_flags:1:%d" % security.SECINFO_DACL]
5528
5529         self.level_idxs = [ 1, 2, 3, 4 ]
5530         self.oul1 = "OU=acl_visibility_oul1"
5531         self.oul1_dn_str = "%s,%s" % (self.oul1, self.base_dn)
5532         self.oul2 = "OU=oul2,%s" % self.oul1
5533         self.oul2_dn_str = "%s,%s" % (self.oul2, self.base_dn)
5534         self.oul3 = "OU=oul3,%s" % self.oul2
5535         self.oul3_dn_str = "%s,%s" % (self.oul3, self.base_dn)
5536         self.user_name = "acl_visibility_user"
5537         self.user_dn_str = "CN=%s,%s" % (self.user_name, self.oul3_dn_str)
5538         delete_force(self.ldb_admin, self.user_dn_str)
5539         delete_force(self.ldb_admin, self.oul3_dn_str)
5540         delete_force(self.ldb_admin, self.oul2_dn_str)
5541         delete_force(self.ldb_admin, self.oul1_dn_str)
5542         self.ldb_admin.create_ou(self.oul1_dn_str)
5543         self.sd_utils.modify_sd_on_dn(self.oul1_dn_str,
5544                                       self.sddl_dacl,
5545                                       controls=self.set_dacl_control)
5546         self.ldb_admin.create_ou(self.oul2_dn_str)
5547         self.sd_utils.modify_sd_on_dn(self.oul2_dn_str,
5548                                       self.sddl_dacl,
5549                                       controls=self.set_dacl_control)
5550         self.ldb_admin.create_ou(self.oul3_dn_str)
5551         self.sd_utils.modify_sd_on_dn(self.oul3_dn_str,
5552                                       self.sddl_dacl,
5553                                       controls=self.set_dacl_control)
5554
5555         self.ldb_admin.newuser(self.user_name, self.user_pass, userou=self.oul3)
5556         self.user_sid = self.sd_utils.get_object_sid(self.user_dn_str)
5557         self.ldb_user = self.get_ldb_connection(self.user_name, self.user_pass)
5558
5559     def tearDown(self):
5560         super(AclVisibiltyTests, self).tearDown()
5561         delete_force(self.ldb_admin, self.user_dn_str)
5562         delete_force(self.ldb_admin, self.oul3_dn_str)
5563         delete_force(self.ldb_admin, self.oul2_dn_str)
5564         delete_force(self.ldb_admin, self.oul1_dn_str)
5565
5566         del self.ldb_user
5567
5568     def _test_visibility_with_args(self,
5569                                    tname,
5570                                    fDoListObject,
5571                                    modeDeny,
5572                                    l1_allow,
5573                                    l2_allow,
5574                                    l3_allow):
5575         l1_deny = 0
5576         l2_deny = 0
5577         l3_deny = 0
5578         if modeDeny:
5579             l1_deny = ~l1_allow
5580             l2_deny = ~l2_allow
5581             l3_deny = ~l3_allow
5582         print("Testing: fDoListObject=%s, modeDeny=%s, l1_allow=0x%02x, l2_allow=0x%02x, l3_allow=0x%02x)" % (
5583               fDoListObject, modeDeny, l1_allow, l2_allow, l3_allow))
5584         if fDoListObject:
5585             self.ldb_admin.set_dsheuristics("001")
5586         else:
5587             self.ldb_admin.set_dsheuristics("000")
5588
5589         def _generate_dacl(allow, deny):
5590             dacl = self.sddl_dacl
5591             drights = ""
5592             if deny & security.SEC_ADS_LIST:
5593                 drights += "LC"
5594             if deny & security.SEC_ADS_LIST_OBJECT:
5595                 drights += "LO"
5596             if len(drights) > 0:
5597                 dacl += "(D;;%s;;;%s)" % (drights, self.user_sid)
5598             arights = ""
5599             if allow & security.SEC_ADS_LIST:
5600                 arights += "LC"
5601             if allow & security.SEC_ADS_LIST_OBJECT:
5602                 arights += "LO"
5603             if len(arights) > 0:
5604                 dacl += "(A;;%s;;;%s)" % (arights, self.user_sid)
5605             print("dacl: %s" % dacl)
5606             return dacl
5607
5608         l1_dacl = _generate_dacl(l1_allow, l1_deny)
5609         l2_dacl = _generate_dacl(l2_allow, l2_deny)
5610         l3_dacl = _generate_dacl(l3_allow, l3_deny)
5611         self.sd_utils.modify_sd_on_dn(self.oul1_dn_str,
5612                                       l1_dacl,
5613                                       controls=self.set_dacl_control)
5614         self.sd_utils.modify_sd_on_dn(self.oul2_dn_str,
5615                                       l2_dacl,
5616                                       controls=self.set_dacl_control)
5617         self.sd_utils.modify_sd_on_dn(self.oul3_dn_str,
5618                                       l3_dacl,
5619                                       controls=self.set_dacl_control)
5620
5621         def _generate_levels(_l1_allow,
5622                              _l1_deny,
5623                              _l2_allow,
5624                              _l2_deny,
5625                              _l3_allow,
5626                              _l3_deny):
5627             _l0_allow = security.SEC_ADS_LIST | security.SEC_ADS_LIST_OBJECT | security.SEC_ADS_READ_PROP
5628             _l0_deny = 0
5629             _l4_allow = security.SEC_ADS_LIST | security.SEC_ADS_LIST_OBJECT | security.SEC_ADS_READ_PROP
5630             _l4_deny = 0
5631             _levels = [{
5632                 "dn": str(self.base_dn),
5633                 "allow": _l0_allow,
5634                 "deny": _l0_deny,
5635             },{
5636                 "dn": str(self.oul1_dn_str),
5637                 "allow": _l1_allow,
5638                 "deny": _l1_deny,
5639             },{
5640                 "dn": str(self.oul2_dn_str),
5641                 "allow": _l2_allow,
5642                 "deny": _l2_deny,
5643             },{
5644                 "dn": str(self.oul3_dn_str),
5645                 "allow": _l3_allow,
5646                 "deny": _l3_deny,
5647             },{
5648                 "dn": str(self.user_dn_str),
5649                 "allow": _l4_allow,
5650                 "deny": _l4_deny,
5651             }]
5652             return _levels
5653
5654         def _generate_admin_levels():
5655             _l1_allow = security.SEC_ADS_LIST | security.SEC_ADS_READ_PROP
5656             _l1_deny = 0
5657             _l2_allow = security.SEC_ADS_LIST | security.SEC_ADS_READ_PROP
5658             _l2_deny = 0
5659             _l3_allow = security.SEC_ADS_LIST | security.SEC_ADS_READ_PROP
5660             _l3_deny = 0
5661             return _generate_levels(_l1_allow, _l1_deny,
5662                                     _l2_allow, _l2_deny,
5663                                     _l3_allow, _l3_deny)
5664
5665         def _generate_user_levels():
5666             return _generate_levels(l1_allow, l1_deny,
5667                                     l2_allow, l2_deny,
5668                                     l3_allow, l3_deny)
5669
5670         admin_levels = _generate_admin_levels()
5671         user_levels = _generate_user_levels()
5672
5673         def _msg_require_name(msg, idx, e):
5674             self.assertIn("name", msg)
5675             self.assertEqual(len(msg["name"]), 1)
5676
5677         def _msg_no_name(msg, idx, e):
5678             self.assertNotIn("name", msg)
5679
5680         def _has_right(allow, deny, bit):
5681             if allow & bit:
5682                 if not (deny & bit):
5683                     return True
5684             return False
5685
5686         def _is_visible(p_allow, p_deny, o_allow, o_deny):
5687             plc = _has_right(p_allow, p_deny, security.SEC_ADS_LIST)
5688             if plc:
5689                 return True
5690             if not fDoListObject:
5691                 return False
5692             plo = _has_right(p_allow, p_deny, security.SEC_ADS_LIST_OBJECT)
5693             if not plo:
5694                 return False
5695             olo = _has_right(o_allow, o_deny, security.SEC_ADS_LIST_OBJECT)
5696             if not olo:
5697                 return False
5698             return True
5699
5700         def _generate_expected(scope, base_level, levels):
5701             expected = {}
5702
5703             p = levels[base_level-1]
5704             o = levels[base_level]
5705             base_visible = _is_visible(p["allow"], p["deny"],
5706                                        o["allow"], o["deny"])
5707
5708             if scope == SCOPE_BASE:
5709                 lmin = base_level
5710                 lmax = base_level
5711             elif scope == SCOPE_ONELEVEL:
5712                 lmin = base_level+1
5713                 lmax = base_level+1
5714             else:
5715                 lmin = base_level
5716                 lmax = len(levels)
5717
5718             next_idx = 0
5719             for li in self.level_idxs:
5720                 if li < lmin:
5721                     continue
5722                 if li > lmax:
5723                     break
5724                 p = levels[li-1]
5725                 o = levels[li]
5726                 visible = _is_visible(p["allow"], p["deny"],
5727                                       o["allow"], o["deny"])
5728                 if not visible:
5729                     continue
5730                 read = _has_right(o["allow"], o["deny"], security.SEC_ADS_READ_PROP)
5731                 if read:
5732                     check_msg_fn = _msg_require_name
5733                 else:
5734                     check_msg_fn = _msg_no_name
5735                 expected[o["dn"]] = {
5736                     "idx": next_idx,
5737                     "check_msg_fn": check_msg_fn,
5738                 }
5739                 next_idx += 1
5740
5741             if len(expected) == 0 and not base_visible:
5742                 # This means we're expecting NO_SUCH_OBJECT
5743                 return None
5744             return expected
5745
5746         def _verify_result_array(results,
5747                                  description,
5748                                  expected):
5749             print("%s Results: %d" % (description, len(results)))
5750             for msg in results:
5751                 print("%s" % msg)
5752             self.assertIsNotNone(expected)
5753             print("%s Expected: %d" % (description, len(expected)))
5754             for e in expected:
5755                 print("%s" % e)
5756             self.assertEqual(len(results), len(expected))
5757             idx = 0
5758             found = {}
5759             for msg in results:
5760                 dn_str = str(msg.dn)
5761                 self.assertIn(dn_str, expected)
5762                 self.assertNotIn(dn_str, found)
5763                 found[dn_str] = idx
5764                 e = expected[dn_str]
5765                 if self.strict_checking:
5766                     self.assertEqual(idx, int(e["idx"]))
5767                 if "check_msg_fn" in e:
5768                     check_msg_fn = e["check_msg_fn"]
5769                     check_msg_fn(msg, idx, e)
5770                 idx += 1
5771
5772             return
5773
5774         for li in self.level_idxs:
5775             base_dn = admin_levels[li]["dn"]
5776             for scope in [SCOPE_BASE, SCOPE_ONELEVEL, SCOPE_SUBTREE]:
5777                 print("\nTesting SCOPE[%d] %s" % (scope, base_dn))
5778                 admin_expected = _generate_expected(scope, li, admin_levels)
5779                 admin_res = self.ldb_admin.search(base_dn, scope=scope, attrs=["name"])
5780                 _verify_result_array(admin_res, "Admin", admin_expected)
5781
5782                 user_expected = _generate_expected(scope, li, user_levels)
5783                 try:
5784                     user_res = self.ldb_user.search(base_dn, scope=scope, attrs=["name"])
5785                 except LdbError as e:
5786                     (num, _) = e.args
5787                     if user_expected is None:
5788                         self.assertEqual(num, ERR_NO_SUCH_OBJECT)
5789                         print("User: NO_SUCH_OBJECT")
5790                         continue
5791                     self.fail(e)
5792                 _verify_result_array(user_res, "User", user_expected)
5793
5794 # Important unit running information
5795
5796 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
5797
5798 TestProgram(module=__name__, opts=subunitopts)