PEP8: fix E241: multiple spaces after ','
[amitay/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 from __future__ import print_function
6 import optparse
7 import sys
8 import base64
9 import re
10 sys.path.insert(0, "bin/python")
11 import samba
12
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
14
15 import samba.getopt as options
16 from samba.join import DCJoinContext
17
18 from ldb import (
19     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
20     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
21 from ldb import ERR_CONSTRAINT_VIOLATION
22 from ldb import ERR_OPERATIONS_ERROR
23 from ldb import Message, MessageElement, Dn
24 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
25 from samba.dcerpc import security, drsuapi, misc
26
27 from samba.auth import system_session
28 from samba import gensec, sd_utils
29 from samba.samdb import SamDB
30 from samba.credentials import Credentials, DONT_USE_KERBEROS
31 import samba.tests
32 from samba.tests import delete_force
33 import samba.dsdb
34 from samba.tests.password_test import PasswordCommon
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 subunitopts = SubunitOptions(parser)
45 parser.add_option_group(subunitopts)
46
47 opts, args = parser.parse_args()
48
49 if len(args) < 1:
50     parser.print_usage()
51     sys.exit(1)
52
53 host = args[0]
54 if not "://" in host:
55     ldaphost = "ldap://%s" % host
56 else:
57     ldaphost = host
58     start = host.rindex("://")
59     host = host.lstrip(start + 3)
60
61 lp = sambaopts.get_loadparm()
62 creds = credopts.get_credentials(lp)
63 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
64
65 #
66 # Tests start here
67 #
68
69 class AclTests(samba.tests.TestCase):
70
71     def setUp(self):
72         super(AclTests, self).setUp()
73         self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
74         self.base_dn = self.ldb_admin.domain_dn()
75         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
76         self.user_pass = "samba123@"
77         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
78         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
79         self.addCleanup(self.delete_admin_connection)
80         #used for anonymous login
81         self.creds_tmp = Credentials()
82         self.creds_tmp.set_username("")
83         self.creds_tmp.set_password("")
84         self.creds_tmp.set_domain(creds.get_domain())
85         self.creds_tmp.set_realm(creds.get_realm())
86         self.creds_tmp.set_workstation(creds.get_workstation())
87         print("baseDN: %s" % self.base_dn)
88
89     def get_user_dn(self, name):
90         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
91
92     def get_ldb_connection(self, target_username, target_password):
93         creds_tmp = Credentials()
94         creds_tmp.set_username(target_username)
95         creds_tmp.set_password(target_password)
96         creds_tmp.set_domain(creds.get_domain())
97         creds_tmp.set_realm(creds.get_realm())
98         creds_tmp.set_workstation(creds.get_workstation())
99         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
100                                       | gensec.FEATURE_SEAL)
101         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
102         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
103         return ldb_target
104
105     # Test if we have any additional groups for users than default ones
106     def assert_user_no_group_member(self, username):
107         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
108         try:
109             self.assertEqual(res[0]["memberOf"][0], "")
110         except KeyError:
111             pass
112         else:
113             self.fail()
114
115     def delete_admin_connection(self):
116         del self.sd_utils
117         del self.ldb_admin
118
119 #tests on ldap add operations
120 class AclAddTests(AclTests):
121
122     def setUp(self):
123         super(AclAddTests, self).setUp()
124         # Domain admin that will be creator of OU parent-child structure
125         self.usr_admin_owner = "acl_add_user1"
126         # Second domain admin that will not be creator of OU parent-child structure
127         self.usr_admin_not_owner = "acl_add_user2"
128         # Regular user
129         self.regular_user = "acl_add_user3"
130         self.test_user1 = "test_add_user1"
131         self.test_group1 = "test_add_group1"
132         self.ou1 = "OU=test_add_ou1"
133         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
134         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
135         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
136         self.ldb_admin.newuser(self.regular_user, self.user_pass)
137
138         # add admins to the Domain Admins group
139         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
140                                                 add_members_operation=True)
141         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
142                                                 add_members_operation=True)
143
144         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
145         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
146         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
147
148     def tearDown(self):
149         super(AclAddTests, self).tearDown()
150         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
151                      (self.test_user1, self.ou2, self.base_dn))
152         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
153                      (self.test_group1, self.ou2, self.base_dn))
154         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
155         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
156         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
157         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
158         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
159         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
160
161         del self.ldb_notowner
162         del self.ldb_owner
163         del self.ldb_user
164
165     # Make sure top OU is deleted (and so everything under it)
166     def assert_top_ou_deleted(self):
167         res = self.ldb_admin.search(self.base_dn,
168                                     expression="(distinguishedName=%s,%s)" % (
169                 "OU=test_add_ou1", self.base_dn))
170         self.assertEqual(len(res), 0)
171
172     def test_add_u1(self):
173         """Testing OU with the rights of Doman Admin not creator of the OU """
174         self.assert_top_ou_deleted()
175         # Change descriptor for top level OU
176         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
177         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
178         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
179         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
180         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
181         # Test user and group creation with another domain admin's credentials
182         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
183         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
184                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
185         # Make sure we HAVE created the two objects -- user and group
186         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
187         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
188         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
189         self.assertTrue(len(res) > 0)
190         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
191         self.assertTrue(len(res) > 0)
192
193     def test_add_u2(self):
194         """Testing OU with the regular user that has no rights granted over the OU """
195         self.assert_top_ou_deleted()
196         # Create a parent-child OU structure with domain admin credentials
197         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
198         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
199         # Test user and group creation with regular user credentials
200         try:
201             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
202             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
203                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
204         except LdbError as e:
205             (num, _) = e.args
206             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
207         else:
208             self.fail()
209         # Make sure we HAVEN'T created any of two objects -- user or group
210         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
211         self.assertEqual(len(res), 0)
212         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
213         self.assertEqual(len(res), 0)
214
215     def test_add_u3(self):
216         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
217         self.assert_top_ou_deleted()
218         # Change descriptor for top level OU
219         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
220         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
221         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
222         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
223         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
224         # Test user and group creation with granted user only to one of the objects
225         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
226         try:
227             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
228                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
229         except LdbError as e1:
230             (num, _) = e1.args
231             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
232         else:
233             self.fail()
234         # Make sure we HAVE created the one of two objects -- user
235         res = self.ldb_admin.search(self.base_dn,
236                                     expression="(distinguishedName=%s,%s)" %
237                                     ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
238                                      self.base_dn))
239         self.assertNotEqual(len(res), 0)
240         res = self.ldb_admin.search(self.base_dn,
241                                     expression="(distinguishedName=%s,%s)" %
242                                     ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
243                                      self.base_dn))
244         self.assertEqual(len(res), 0)
245
246     def test_add_u4(self):
247         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
248         self.assert_top_ou_deleted()
249         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
250         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
251         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
252         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
253                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
254         # Make sure we have successfully created the two objects -- user and group
255         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
256         self.assertTrue(len(res) > 0)
257         res = self.ldb_admin.search(self.base_dn,
258                                     expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
259         self.assertTrue(len(res) > 0)
260
261     def test_add_anonymous(self):
262         """Test add operation with anonymous user"""
263         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
264         try:
265             anonymous.newuser("test_add_anonymous", self.user_pass)
266         except LdbError as e2:
267             (num, _) = e2.args
268             self.assertEquals(num, ERR_OPERATIONS_ERROR)
269         else:
270             self.fail()
271
272 #tests on ldap modify operations
273 class AclModifyTests(AclTests):
274
275     def setUp(self):
276         super(AclModifyTests, self).setUp()
277         self.user_with_wp = "acl_mod_user1"
278         self.user_with_sm = "acl_mod_user2"
279         self.user_with_group_sm = "acl_mod_user3"
280         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
281         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
282         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
283         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
284         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
285         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
286         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
287         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
288         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
289         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
290
291     def tearDown(self):
292         super(AclModifyTests, self).tearDown()
293         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
294         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
295         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
296         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
297         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
298         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
299         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
300         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
301         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
302         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
303
304         del self.ldb_user
305         del self.ldb_user2
306         del self.ldb_user3
307
308     def test_modify_u1(self):
309         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
310         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
311         # First test object -- User
312         print("Testing modify on User object")
313         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
314         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
315         ldif = """
316 dn: """ + self.get_user_dn("test_modify_user1") + """
317 changetype: modify
318 replace: displayName
319 displayName: test_changed"""
320         self.ldb_user.modify_ldif(ldif)
321         res = self.ldb_admin.search(self.base_dn,
322                                     expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
323         self.assertEqual(res[0]["displayName"][0], "test_changed")
324         # Second test object -- Group
325         print("Testing modify on Group object")
326         self.ldb_admin.newgroup("test_modify_group1",
327                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
328         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
329         ldif = """
330 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
331 changetype: modify
332 replace: displayName
333 displayName: test_changed"""
334         self.ldb_user.modify_ldif(ldif)
335         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
336         self.assertEqual(res[0]["displayName"][0], "test_changed")
337         # Third test object -- Organizational Unit
338         print("Testing modify on OU object")
339         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
340         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
341         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
342         ldif = """
343 dn: OU=test_modify_ou1,""" + self.base_dn + """
344 changetype: modify
345 replace: displayName
346 displayName: test_changed"""
347         self.ldb_user.modify_ldif(ldif)
348         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
349         self.assertEqual(res[0]["displayName"][0], "test_changed")
350
351     def test_modify_u2(self):
352         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
353         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
354         # First test object -- User
355         print("Testing modify on User object")
356         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
357         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
358         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
359         # Modify on attribute you have rights for
360         ldif = """
361 dn: """ + self.get_user_dn("test_modify_user1") + """
362 changetype: modify
363 replace: displayName
364 displayName: test_changed"""
365         self.ldb_user.modify_ldif(ldif)
366         res = self.ldb_admin.search(self.base_dn,
367                                     expression="(distinguishedName=%s)" %
368                                     self.get_user_dn("test_modify_user1"))
369         self.assertEqual(res[0]["displayName"][0], "test_changed")
370         # Modify on attribute you do not have rights for granted
371         ldif = """
372 dn: """ + self.get_user_dn("test_modify_user1") + """
373 changetype: modify
374 replace: url
375 url: www.samba.org"""
376         try:
377             self.ldb_user.modify_ldif(ldif)
378         except LdbError as e3:
379             (num, _) = e3.args
380             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
381         else:
382             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
383             self.fail()
384         # Second test object -- Group
385         print("Testing modify on Group object")
386         self.ldb_admin.newgroup("test_modify_group1",
387                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
388         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
389         ldif = """
390 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
391 changetype: modify
392 replace: displayName
393 displayName: test_changed"""
394         self.ldb_user.modify_ldif(ldif)
395         res = self.ldb_admin.search(self.base_dn,
396                                     expression="(distinguishedName=%s)" %
397                                     str("CN=test_modify_group1,CN=Users," + self.base_dn))
398         self.assertEqual(res[0]["displayName"][0], "test_changed")
399         # Modify on attribute you do not have rights for granted
400         ldif = """
401 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
402 changetype: modify
403 replace: url
404 url: www.samba.org"""
405         try:
406             self.ldb_user.modify_ldif(ldif)
407         except LdbError as e4:
408             (num, _) = e4.args
409             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
410         else:
411             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
412             self.fail()
413         # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
414         ldif = """
415 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
416 changetype: modify
417 replace: url
418 url: www.samba.org
419 replace: displayName
420 displayName: test_changed"""
421         try:
422             self.ldb_user.modify_ldif(ldif)
423         except LdbError as e5:
424             (num, _) = e5.args
425             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
426         else:
427             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
428             self.fail()
429         # Second test object -- Organizational Unit
430         print("Testing modify on OU object")
431         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
432         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
433         ldif = """
434 dn: OU=test_modify_ou1,""" + self.base_dn + """
435 changetype: modify
436 replace: displayName
437 displayName: test_changed"""
438         self.ldb_user.modify_ldif(ldif)
439         res = self.ldb_admin.search(self.base_dn,
440                                     expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
441                                                                               + self.base_dn))
442         self.assertEqual(res[0]["displayName"][0], "test_changed")
443         # Modify on attribute you do not have rights for granted
444         ldif = """
445 dn: OU=test_modify_ou1,""" + self.base_dn + """
446 changetype: modify
447 replace: url
448 url: www.samba.org"""
449         try:
450             self.ldb_user.modify_ldif(ldif)
451         except LdbError as e6:
452             (num, _) = e6.args
453             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
454         else:
455             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
456             self.fail()
457
458     def test_modify_u3(self):
459         """7 Modify one attribute as you have no what so ever rights granted"""
460         # First test object -- User
461         print("Testing modify on User object")
462         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
463         # Modify on attribute you do not have rights for granted
464         ldif = """
465 dn: """ + self.get_user_dn("test_modify_user1") + """
466 changetype: modify
467 replace: url
468 url: www.samba.org"""
469         try:
470             self.ldb_user.modify_ldif(ldif)
471         except LdbError as e7:
472             (num, _) = e7.args
473             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
474         else:
475             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
476             self.fail()
477
478         # Second test object -- Group
479         print("Testing modify on Group object")
480         self.ldb_admin.newgroup("test_modify_group1",
481                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
482         # Modify on attribute you do not have rights for granted
483         ldif = """
484 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
485 changetype: modify
486 replace: url
487 url: www.samba.org"""
488         try:
489             self.ldb_user.modify_ldif(ldif)
490         except LdbError as e8:
491             (num, _) = e8.args
492             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
493         else:
494             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
495             self.fail()
496
497         # Second test object -- Organizational Unit
498         print("Testing modify on OU object")
499         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
500         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
501         # Modify on attribute you do not have rights for granted
502         ldif = """
503 dn: OU=test_modify_ou1,""" + self.base_dn + """
504 changetype: modify
505 replace: url
506 url: www.samba.org"""
507         try:
508             self.ldb_user.modify_ldif(ldif)
509         except LdbError as e9:
510             (num, _) = e9.args
511             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
512         else:
513             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
514             self.fail()
515
516
517     def test_modify_u4(self):
518         """11 Grant WP to PRINCIPAL_SELF and test modify"""
519         ldif = """
520 dn: """ + self.get_user_dn(self.user_with_wp) + """
521 changetype: modify
522 add: adminDescription
523 adminDescription: blah blah blah"""
524         try:
525             self.ldb_user.modify_ldif(ldif)
526         except LdbError as e10:
527             (num, _) = e10.args
528             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
529         else:
530             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
531             self.fail()
532
533         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
534         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
535         # Modify on attribute you have rights for
536         self.ldb_user.modify_ldif(ldif)
537         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
538                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"])
539         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
540
541     def test_modify_u5(self):
542         """12 test self membership"""
543         ldif = """
544 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
545 changetype: modify
546 add: Member
547 Member: """ + self.get_user_dn(self.user_with_sm)
548 #the user has no rights granted, this should fail
549         try:
550             self.ldb_user2.modify_ldif(ldif)
551         except LdbError as e11:
552             (num, _) = e11.args
553             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
554         else:
555             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
556             self.fail()
557
558 #grant self-membership, should be able to add himself
559         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
560         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
561         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
562         self.ldb_user2.modify_ldif(ldif)
563         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
564                                      % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
565         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
566 #but not other users
567         ldif = """
568 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
569 changetype: modify
570 add: Member
571 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
572         try:
573             self.ldb_user2.modify_ldif(ldif)
574         except LdbError as e12:
575             (num, _) = e12.args
576             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
577         else:
578             self.fail()
579
580     def test_modify_u6(self):
581         """13 test self membership"""
582         ldif = """
583 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
584 changetype: modify
585 add: Member
586 Member: """ + self.get_user_dn(self.user_with_sm) + """
587 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
588
589 #grant self-membership, should be able to add himself  but not others at the same time
590         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
591         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
592         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
593         try:
594             self.ldb_user2.modify_ldif(ldif)
595         except LdbError as e13:
596             (num, _) = e13.args
597             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
598         else:
599             self.fail()
600
601     def test_modify_u7(self):
602         """13 User with WP modifying Member"""
603 #a second user is given write property permission
604         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
605         mod = "(A;;WP;;;%s)" % str(user_sid)
606         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
607         ldif = """
608 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
609 changetype: modify
610 add: Member
611 Member: """ + self.get_user_dn(self.user_with_wp)
612         self.ldb_user.modify_ldif(ldif)
613         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
614                                      % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
615         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
616         ldif = """
617 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
618 changetype: modify
619 delete: Member"""
620         self.ldb_user.modify_ldif(ldif)
621         ldif = """
622 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
623 changetype: modify
624 add: Member
625 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
626         self.ldb_user.modify_ldif(ldif)
627         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
628                                      % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
629         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
630
631     def test_modify_anonymous(self):
632         """Test add operation with anonymous user"""
633         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
634         self.ldb_admin.newuser("test_anonymous", "samba123@")
635         m = Message()
636         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
637
638         m["description"] = MessageElement("sambauser2",
639                                           FLAG_MOD_ADD,
640                                           "description")
641         try:
642             anonymous.modify(m)
643         except LdbError as e14:
644             (num, _) = e14.args
645             self.assertEquals(num, ERR_OPERATIONS_ERROR)
646         else:
647             self.fail()
648
649 #enable these when we have search implemented
650 class AclSearchTests(AclTests):
651
652     def setUp(self):
653         super(AclSearchTests, self).setUp()
654
655         # permit password changes during this test
656         PasswordCommon.allow_password_changes(self, self.ldb_admin)
657
658         self.u1 = "search_u1"
659         self.u2 = "search_u2"
660         self.u3 = "search_u3"
661         self.group1 = "group1"
662         self.ldb_admin.newuser(self.u1, self.user_pass)
663         self.ldb_admin.newuser(self.u2, self.user_pass)
664         self.ldb_admin.newuser(self.u3, self.user_pass)
665         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
666         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
667                                                 add_members_operation=True)
668         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
669         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
670         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
671         self.full_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
672                           Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
673                           Dn(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
674                           Dn(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
675                           Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
676                           Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
677         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
678         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
679
680     def create_clean_ou(self, object_dn):
681         """ Base repeating setup for unittests to follow """
682         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
683                                     expression="distinguishedName=%s" % object_dn)
684         # Make sure top testing OU has been deleted before starting the test
685         self.assertEqual(len(res), 0)
686         self.ldb_admin.create_ou(object_dn)
687         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
688         # Make sure there are inheritable ACEs initially
689         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
690         # Find and remove all inherit ACEs
691         res = re.findall("\(.*?\)", desc_sddl)
692         res = [x for x in res if ("CI" in x) or ("OI" in x)]
693         for x in res:
694             desc_sddl = desc_sddl.replace(x, "")
695         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
696         # can propagate from above
697         # remove SACL, we are not interested
698         desc_sddl = desc_sddl.replace(":AI", ":AIP")
699         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
700         # Verify all inheritable ACEs are gone
701         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
702         self.assertFalse("CI" in desc_sddl)
703         self.assertFalse("OI" in desc_sddl)
704
705     def tearDown(self):
706         super(AclSearchTests, self).tearDown()
707         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
708         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
709         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
710         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
711         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
712         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
713         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
714         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
715         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
716         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
717         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
718         delete_force(self.ldb_admin, self.get_user_dn("group1"))
719
720         del self.ldb_user
721         del self.ldb_user2
722         del self.ldb_user3
723
724     def test_search_anonymous1(self):
725         """Verify access of rootDSE with the correct request"""
726         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
727         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
728         self.assertEquals(len(res), 1)
729         #verify some of the attributes
730         #don't care about values
731         self.assertTrue("ldapServiceName" in res[0])
732         self.assertTrue("namingContexts" in res[0])
733         self.assertTrue("isSynchronized" in res[0])
734         self.assertTrue("dsServiceName" in res[0])
735         self.assertTrue("supportedSASLMechanisms" in res[0])
736         self.assertTrue("isGlobalCatalogReady" in res[0])
737         self.assertTrue("domainControllerFunctionality" in res[0])
738         self.assertTrue("serverName" in res[0])
739
740     def test_search_anonymous2(self):
741         """Make sure we cannot access anything else"""
742         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
743         try:
744             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
745         except LdbError as e15:
746             (num, _) = e15.args
747             self.assertEquals(num, ERR_OPERATIONS_ERROR)
748         else:
749             self.fail()
750         try:
751             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
752         except LdbError as e16:
753             (num, _) = e16.args
754             self.assertEquals(num, ERR_OPERATIONS_ERROR)
755         else:
756             self.fail()
757         try:
758             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
759                                    scope=SCOPE_SUBTREE)
760         except LdbError as e17:
761             (num, _) = e17.args
762             self.assertEquals(num, ERR_OPERATIONS_ERROR)
763         else:
764             self.fail()
765
766     def test_search_anonymous3(self):
767         """Set dsHeuristics and repeat"""
768         self.ldb_admin.set_dsheuristics("0000002")
769         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
770         mod = "(A;CI;LC;;;AN)"
771         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
772         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
773         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
774         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
775                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
776         self.assertEquals(len(res), 1)
777         self.assertTrue("dn" in res[0])
778         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
779                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
780         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
781                                scope=SCOPE_SUBTREE)
782         self.assertEquals(len(res), 1)
783         self.assertTrue("dn" in res[0])
784         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
785
786     def test_search1(self):
787         """Make sure users can see us if given LC to user and group"""
788         self.create_clean_ou("OU=ou1," + self.base_dn)
789         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
790         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
791         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
792                                                  self.domain_sid)
793         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
794         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
795         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
796         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
797         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
798
799         #regular users must see only ou1 and ou2
800         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
801                                     scope=SCOPE_SUBTREE)
802         self.assertEquals(len(res), 2)
803         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
804                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
805
806         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
807         self.assertEquals(sorted(res_list), sorted(ok_list))
808
809         #these users should see all ous
810         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
811                                    scope=SCOPE_SUBTREE)
812         self.assertEquals(len(res), 6)
813         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
814         self.assertEquals(sorted(res_list), sorted(self.full_list))
815
816         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
817                                     scope=SCOPE_SUBTREE)
818         self.assertEquals(len(res), 6)
819         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
820         self.assertEquals(sorted(res_list), sorted(self.full_list))
821
822     def test_search2(self):
823         """Make sure users can't see us if access is explicitly denied"""
824         self.create_clean_ou("OU=ou1," + self.base_dn)
825         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
826         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
827         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
828         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
829         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
830         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
831         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
832         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
833                                     scope=SCOPE_SUBTREE)
834         #this user should see all ous
835         res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
836         self.assertEquals(sorted(res_list), sorted(self.full_list))
837
838         #these users should see ou1, 2, 5 and 6 but not 3 and 4
839         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
840                                    scope=SCOPE_SUBTREE)
841         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
842                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
843                    Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
844                    Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
845         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
846         self.assertEquals(sorted(res_list), sorted(ok_list))
847
848         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
849                                     scope=SCOPE_SUBTREE)
850         self.assertEquals(len(res), 4)
851         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
852         self.assertEquals(sorted(res_list), sorted(ok_list))
853
854     def test_search3(self):
855         """Make sure users can't see ous if access is explicitly denied - 2"""
856         self.create_clean_ou("OU=ou1," + self.base_dn)
857         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
858         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
859         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
860                                                  self.domain_sid)
861         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
862         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
863         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
864         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
865         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
866
867         print("Testing correct behavior on nonaccessible search base")
868         try:
869             self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
870                                   scope=SCOPE_BASE)
871         except LdbError as e18:
872             (num, _) = e18.args
873             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
874         else:
875             self.fail()
876
877         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
878         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
879
880         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
881                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
882
883         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
884                                     scope=SCOPE_SUBTREE)
885         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
886         self.assertEquals(sorted(res_list), sorted(ok_list))
887
888         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
889                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
890                    Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
891                    Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
892
893         #should not see ou3 and ou4, but should see ou5 and ou6
894         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
895                                    scope=SCOPE_SUBTREE)
896         self.assertEquals(len(res), 4)
897         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
898         self.assertEquals(sorted(res_list), sorted(ok_list))
899
900         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
901                                     scope=SCOPE_SUBTREE)
902         self.assertEquals(len(res), 4)
903         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
904         self.assertEquals(sorted(res_list), sorted(ok_list))
905
906     def test_search4(self):
907         """There is no difference in visibility if the user is also creator"""
908         self.create_clean_ou("OU=ou1," + self.base_dn)
909         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
910         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
911         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
912                                                  self.domain_sid)
913         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
914         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
915         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
916         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
917         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
918
919         ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
920                    Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
921         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
922                                     scope=SCOPE_SUBTREE)
923         self.assertEquals(len(res), 2)
924         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
925         self.assertEquals(sorted(res_list), sorted(ok_list))
926
927         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
928                                    scope=SCOPE_SUBTREE)
929         self.assertEquals(len(res), 2)
930         res_list = [x["dn"] for x in res if x["dn"] in ok_list]
931         self.assertEquals(sorted(res_list), sorted(ok_list))
932
933     def test_search5(self):
934         """Make sure users can see only attributes they are allowed to see"""
935         self.create_clean_ou("OU=ou1," + self.base_dn)
936         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
937         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
938         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
939                                                  self.domain_sid)
940         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
941         # assert user can only see dn
942         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
943                                    scope=SCOPE_SUBTREE)
944         ok_list = ['dn']
945         self.assertEquals(len(res), 1)
946         res_list = list(res[0].keys())
947         self.assertEquals(res_list, ok_list)
948
949         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
950                                    scope=SCOPE_BASE, attrs=["ou"])
951
952         self.assertEquals(len(res), 1)
953         res_list = list(res[0].keys())
954         self.assertEquals(res_list, ok_list)
955
956         #give read property on ou and assert user can only see dn and ou
957         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
958         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
959         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
960         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
961                                    scope=SCOPE_SUBTREE)
962         ok_list = ['dn', 'ou']
963         self.assertEquals(len(res), 1)
964         res_list = list(res[0].keys())
965         self.assertEquals(sorted(res_list), sorted(ok_list))
966
967         #give read property on Public Information and assert user can see ou and other members
968         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
969         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
970         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
971         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
972                                    scope=SCOPE_SUBTREE)
973
974         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
975         res_list = list(res[0].keys())
976         self.assertEquals(sorted(res_list), sorted(ok_list))
977
978     def test_search6(self):
979         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
980         self.create_clean_ou("OU=ou1," + self.base_dn)
981         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
982         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
983         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
984                                                  self.domain_sid)
985         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
986         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
987
988         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
989                                    scope=SCOPE_SUBTREE)
990         #nothing should be returned as ou is not accessible
991         self.assertEquals(len(res), 0)
992
993         #give read property on ou and assert user can only see dn and ou
994         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
995         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
996         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
997                                    scope=SCOPE_SUBTREE)
998         self.assertEquals(len(res), 1)
999         ok_list = ['dn', 'ou']
1000         res_list = list(res[0].keys())
1001         self.assertEquals(sorted(res_list), sorted(ok_list))
1002
1003         #give read property on Public Information and assert user can see ou and other members
1004         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
1005         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
1006         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
1007                                    scope=SCOPE_SUBTREE)
1008         self.assertEquals(len(res), 1)
1009         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
1010         res_list = list(res[0].keys())
1011         self.assertEquals(sorted(res_list), sorted(ok_list))
1012
1013     def assert_search_on_attr(self, dn, samdb, attr, expected_list):
1014
1015         expected_num = len(expected_list)
1016         res = samdb.search(dn, expression="(%s=*)" % attr, scope=SCOPE_SUBTREE)
1017         self.assertEquals(len(res), expected_num)
1018
1019         res_list = [ x["dn"] for x in res if x["dn"] in expected_list ]
1020         self.assertEquals(sorted(res_list), sorted(expected_list))
1021
1022     def test_search7(self):
1023         """Checks object search visibility when users don't have full rights"""
1024         self.create_clean_ou("OU=ou1," + self.base_dn)
1025         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid),
1026                                             str(self.group_sid))
1027         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
1028         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
1029                                                  self.domain_sid)
1030         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
1031         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
1032                                  sd=tmp_desc)
1033         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
1034                                  sd=tmp_desc)
1035         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
1036                                  sd=tmp_desc)
1037         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
1038                                  sd=tmp_desc)
1039
1040         ou2_dn = Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn)
1041         ou1_dn = Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)
1042
1043         # even though unprivileged users can't read these attributes for OU2,
1044         # the object should still be visible in searches, because they have
1045         # 'List Contents' rights still. This isn't really disclosive because
1046         # ALL objects have these attributes
1047         visible_attrs = ["objectClass", "distinguishedName", "name",
1048                          "objectGUID"]
1049         two_objects = [ou2_dn, ou1_dn]
1050
1051         for attr in visible_attrs:
1052             # a regular user should just see the 2 objects
1053             self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
1054                                        expected_list=two_objects)
1055
1056             # whereas the following users have LC rights for all the objects,
1057             # so they should see them all
1058             self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
1059                                        expected_list=self.full_list)
1060             self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
1061                                        expected_list=self.full_list)
1062
1063         # however when searching on the following attributes, objects will not
1064         # be visible unless the user has Read Property rights
1065         hidden_attrs = ["objectCategory", "instanceType", "ou", "uSNChanged",
1066                         "uSNCreated", "whenCreated"]
1067         one_object = [ou1_dn]
1068
1069         for attr in hidden_attrs:
1070             self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
1071                                        expected_list=one_object)
1072             self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
1073                                        expected_list=one_object)
1074             self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
1075                                        expected_list=one_object)
1076
1077             # admin has RP rights so can still see all the objects
1078             self.assert_search_on_attr(str(ou1_dn), self.ldb_admin, attr,
1079                                        expected_list=self.full_list)
1080
1081 #tests on ldap delete operations
1082 class AclDeleteTests(AclTests):
1083
1084     def setUp(self):
1085         super(AclDeleteTests, self).setUp()
1086         self.regular_user = "acl_delete_user1"
1087         # Create regular user
1088         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1089         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1090
1091     def tearDown(self):
1092         super(AclDeleteTests, self).tearDown()
1093         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
1094         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1095         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
1096
1097         del self.ldb_user
1098
1099     def test_delete_u1(self):
1100         """User is prohibited by default to delete another User object"""
1101         # Create user that we try to delete
1102         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1103         # Here delete User object should ALWAYS through exception
1104         try:
1105             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
1106         except LdbError as e19:
1107             (num, _) = e19.args
1108             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1109         else:
1110             self.fail()
1111
1112     def test_delete_u2(self):
1113         """User's group has RIGHT_DELETE to another User object"""
1114         user_dn = self.get_user_dn("test_delete_user1")
1115         # Create user that we try to delete
1116         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1117         mod = "(A;;SD;;;AU)"
1118         self.sd_utils.dacl_add_ace(user_dn, mod)
1119         # Try to delete User object
1120         self.ldb_user.delete(user_dn)
1121         res = self.ldb_admin.search(self.base_dn,
1122                                     expression="(distinguishedName=%s)" % user_dn)
1123         self.assertEqual(len(res), 0)
1124
1125     def test_delete_u3(self):
1126         """User indentified by SID has RIGHT_DELETE to another User object"""
1127         user_dn = self.get_user_dn("test_delete_user1")
1128         # Create user that we try to delete
1129         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1130         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1131         self.sd_utils.dacl_add_ace(user_dn, mod)
1132         # Try to delete User object
1133         self.ldb_user.delete(user_dn)
1134         res = self.ldb_admin.search(self.base_dn,
1135                                     expression="(distinguishedName=%s)" % user_dn)
1136         self.assertEqual(len(res), 0)
1137
1138     def test_delete_anonymous(self):
1139         """Test add operation with anonymous user"""
1140         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1141         self.ldb_admin.newuser("test_anonymous", "samba123@")
1142
1143         try:
1144             anonymous.delete(self.get_user_dn("test_anonymous"))
1145         except LdbError as e20:
1146             (num, _) = e20.args
1147             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1148         else:
1149             self.fail()
1150
1151 #tests on ldap rename operations
1152 class AclRenameTests(AclTests):
1153
1154     def setUp(self):
1155         super(AclRenameTests, self).setUp()
1156         self.regular_user = "acl_rename_user1"
1157         self.ou1 = "OU=test_rename_ou1"
1158         self.ou2 = "OU=test_rename_ou2"
1159         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1160         self.testuser1 = "test_rename_user1"
1161         self.testuser2 = "test_rename_user2"
1162         self.testuser3 = "test_rename_user3"
1163         self.testuser4 = "test_rename_user4"
1164         self.testuser5 = "test_rename_user5"
1165         # Create regular user
1166         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1167         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1168
1169     def tearDown(self):
1170         super(AclRenameTests, self).tearDown()
1171         # Rename OU3
1172         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1173         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1174         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1175         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1176         # Rename OU2
1177         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1178         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1179         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1180         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1181         # Rename OU1
1182         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1183         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1184         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1185         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1186         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1187         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1188
1189         del self.ldb_user
1190
1191     def test_rename_u1(self):
1192         """Regular user fails to rename 'User object' within single OU"""
1193         # Create OU structure
1194         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1195         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1196         try:
1197             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1198                                  "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1199         except LdbError as e21:
1200             (num, _) = e21.args
1201             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1202         else:
1203             self.fail()
1204
1205     def test_rename_u2(self):
1206         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1207         ou_dn = "OU=test_rename_ou1," + self.base_dn
1208         user_dn = "CN=test_rename_user1," + ou_dn
1209         rename_user_dn = "CN=test_rename_user5," + ou_dn
1210         # Create OU structure
1211         self.ldb_admin.create_ou(ou_dn)
1212         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1213         mod = "(A;;WP;;;AU)"
1214         self.sd_utils.dacl_add_ace(user_dn, mod)
1215         # Rename 'User object' having WP to AU
1216         self.ldb_user.rename(user_dn, rename_user_dn)
1217         res = self.ldb_admin.search(self.base_dn,
1218                                     expression="(distinguishedName=%s)" % user_dn)
1219         self.assertEqual(len(res), 0)
1220         res = self.ldb_admin.search(self.base_dn,
1221                                     expression="(distinguishedName=%s)" % rename_user_dn)
1222         self.assertNotEqual(len(res), 0)
1223
1224     def test_rename_u3(self):
1225         """Test rename with rights granted to 'User object' SID"""
1226         ou_dn = "OU=test_rename_ou1," + self.base_dn
1227         user_dn = "CN=test_rename_user1," + ou_dn
1228         rename_user_dn = "CN=test_rename_user5," + ou_dn
1229         # Create OU structure
1230         self.ldb_admin.create_ou(ou_dn)
1231         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1232         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1233         mod = "(A;;WP;;;%s)" % str(sid)
1234         self.sd_utils.dacl_add_ace(user_dn, mod)
1235         # Rename 'User object' having WP to AU
1236         self.ldb_user.rename(user_dn, rename_user_dn)
1237         res = self.ldb_admin.search(self.base_dn,
1238                                     expression="(distinguishedName=%s)" % user_dn)
1239         self.assertEqual(len(res), 0)
1240         res = self.ldb_admin.search(self.base_dn,
1241                                     expression="(distinguishedName=%s)" % rename_user_dn)
1242         self.assertNotEqual(len(res), 0)
1243
1244     def test_rename_u4(self):
1245         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1246         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1247         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1248         user_dn = "CN=test_rename_user2," + ou1_dn
1249         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1250         # Create OU structure
1251         self.ldb_admin.create_ou(ou1_dn)
1252         self.ldb_admin.create_ou(ou2_dn)
1253         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1254         mod = "(A;;WPSD;;;AU)"
1255         self.sd_utils.dacl_add_ace(user_dn, mod)
1256         mod = "(A;;CC;;;AU)"
1257         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1258         # Rename 'User object' having SD and CC to AU
1259         self.ldb_user.rename(user_dn, rename_user_dn)
1260         res = self.ldb_admin.search(self.base_dn,
1261                                     expression="(distinguishedName=%s)" % user_dn)
1262         self.assertEqual(len(res), 0)
1263         res = self.ldb_admin.search(self.base_dn,
1264                                     expression="(distinguishedName=%s)" % rename_user_dn)
1265         self.assertNotEqual(len(res), 0)
1266
1267     def test_rename_u5(self):
1268         """Test rename with rights granted to 'User object' SID"""
1269         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1270         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1271         user_dn = "CN=test_rename_user2," + ou1_dn
1272         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1273         # Create OU structure
1274         self.ldb_admin.create_ou(ou1_dn)
1275         self.ldb_admin.create_ou(ou2_dn)
1276         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1277         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1278         mod = "(A;;WPSD;;;%s)" % str(sid)
1279         self.sd_utils.dacl_add_ace(user_dn, mod)
1280         mod = "(A;;CC;;;%s)" % str(sid)
1281         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1282         # Rename 'User object' having SD and CC to AU
1283         self.ldb_user.rename(user_dn, rename_user_dn)
1284         res = self.ldb_admin.search(self.base_dn,
1285                                     expression="(distinguishedName=%s)" % user_dn)
1286         self.assertEqual(len(res), 0)
1287         res = self.ldb_admin.search(self.base_dn,
1288                                     expression="(distinguishedName=%s)" % rename_user_dn)
1289         self.assertNotEqual(len(res), 0)
1290
1291     def test_rename_u6(self):
1292         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1293         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1294         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1295         user_dn = "CN=test_rename_user2," + ou1_dn
1296         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1297         # Create OU structure
1298         self.ldb_admin.create_ou(ou1_dn)
1299         self.ldb_admin.create_ou(ou2_dn)
1300         #mod = "(A;CI;DCWP;;;AU)"
1301         mod = "(A;;DC;;;AU)"
1302         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1303         mod = "(A;;CC;;;AU)"
1304         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1305         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1306         mod = "(A;;WP;;;AU)"
1307         self.sd_utils.dacl_add_ace(user_dn, mod)
1308         # Rename 'User object' having SD and CC to AU
1309         self.ldb_user.rename(user_dn, rename_user_dn)
1310         res = self.ldb_admin.search(self.base_dn,
1311                                     expression="(distinguishedName=%s)" % user_dn)
1312         self.assertEqual(len(res), 0)
1313         res = self.ldb_admin.search(self.base_dn,
1314                                     expression="(distinguishedName=%s)" % rename_user_dn)
1315         self.assertNotEqual(len(res), 0)
1316
1317     def test_rename_u7(self):
1318         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1319         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1320         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1321         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1322         user_dn = "CN=test_rename_user2," + ou1_dn
1323         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1324         # Create OU structure
1325         self.ldb_admin.create_ou(ou1_dn)
1326         self.ldb_admin.create_ou(ou2_dn)
1327         self.ldb_admin.create_ou(ou3_dn)
1328         mod = "(A;CI;WPDC;;;AU)"
1329         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1330         mod = "(A;;CC;;;AU)"
1331         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1332         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1333         # Rename 'User object' having SD and CC to AU
1334         self.ldb_user.rename(user_dn, rename_user_dn)
1335         res = self.ldb_admin.search(self.base_dn,
1336                                     expression="(distinguishedName=%s)" % user_dn)
1337         self.assertEqual(len(res), 0)
1338         res = self.ldb_admin.search(self.base_dn,
1339                                     expression="(distinguishedName=%s)" % rename_user_dn)
1340         self.assertNotEqual(len(res), 0)
1341
1342     def test_rename_u8(self):
1343         """Test rename on an object with and without modify access on the RDN attribute"""
1344         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1345         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1346         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1347         # Create OU structure
1348         self.ldb_admin.create_ou(ou1_dn)
1349         self.ldb_admin.create_ou(ou2_dn)
1350         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1351         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1352         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1353         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1354         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1355         try:
1356             self.ldb_user.rename(ou2_dn, ou3_dn)
1357         except LdbError as e22:
1358             (num, _) = e22.args
1359             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1360         else:
1361             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1362             self.fail()
1363         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1364         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1365         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1366         self.ldb_user.rename(ou2_dn, ou3_dn)
1367         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1368         self.assertEqual(len(res), 0)
1369         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1370         self.assertNotEqual(len(res), 0)
1371
1372     def test_rename_u9(self):
1373         """Rename 'User object' cross OU, with explicit deny on sd and dc"""
1374         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1375         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1376         user_dn = "CN=test_rename_user2," + ou1_dn
1377         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1378         # Create OU structure
1379         self.ldb_admin.create_ou(ou1_dn)
1380         self.ldb_admin.create_ou(ou2_dn)
1381         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1382         mod = "(D;;SD;;;DA)"
1383         self.sd_utils.dacl_add_ace(user_dn, mod)
1384         mod = "(D;;DC;;;DA)"
1385         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1386         # Rename 'User object' having SD and CC to AU
1387         try:
1388             self.ldb_admin.rename(user_dn, rename_user_dn)
1389         except LdbError as e23:
1390             (num, _) = e23.args
1391             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1392         else:
1393             self.fail()
1394         #add an allow ace so we can delete this ou
1395         mod = "(A;;DC;;;DA)"
1396         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1397
1398
1399 #tests on Control Access Rights
1400 class AclCARTests(AclTests):
1401
1402     def setUp(self):
1403         super(AclCARTests, self).setUp()
1404
1405         # Get the old "dSHeuristics" if it was set
1406         dsheuristics = self.ldb_admin.get_dsheuristics()
1407         # Reset the "dSHeuristics" as they were before
1408         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
1409         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1410         self.ldb_admin.set_dsheuristics("000000001")
1411         # Get the old "minPwdAge"
1412         minPwdAge = self.ldb_admin.get_minPwdAge()
1413         # Reset the "minPwdAge" as it was before
1414         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
1415         # Set it temporarely to "0"
1416         self.ldb_admin.set_minPwdAge("0")
1417
1418         self.user_with_wp = "acl_car_user1"
1419         self.user_with_pc = "acl_car_user2"
1420         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1421         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1422         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1423         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1424
1425     def tearDown(self):
1426         super(AclCARTests, self).tearDown()
1427         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1428         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1429
1430         del self.ldb_user
1431         del self.ldb_user2
1432
1433     def test_change_password1(self):
1434         """Try a password change operation without any CARs given"""
1435         #users have change password by default - remove for negative testing
1436         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1437         sddl = desc.as_sddl(self.domain_sid)
1438         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1439         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1440         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1441         try:
1442             self.ldb_user.modify_ldif("""
1443 dn: """ + self.get_user_dn(self.user_with_wp) + """
1444 changetype: modify
1445 delete: unicodePwd
1446 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
1447 add: unicodePwd
1448 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
1449 """)
1450         except LdbError as e24:
1451             (num, _) = e24.args
1452             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1453         else:
1454             # for some reason we get constraint violation instead of insufficient access error
1455             self.fail()
1456
1457     def test_change_password2(self):
1458         """Make sure WP has no influence"""
1459         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1460         sddl = desc.as_sddl(self.domain_sid)
1461         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1462         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1463         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1464         mod = "(A;;WP;;;PS)"
1465         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1466         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1467         sddl = desc.as_sddl(self.domain_sid)
1468         try:
1469             self.ldb_user.modify_ldif("""
1470 dn: """ + self.get_user_dn(self.user_with_wp) + """
1471 changetype: modify
1472 delete: unicodePwd
1473 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
1474 add: unicodePwd
1475 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
1476 """)
1477         except LdbError as e25:
1478             (num, _) = e25.args
1479             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1480         else:
1481             # for some reason we get constraint violation instead of insufficient access error
1482             self.fail()
1483
1484     def test_change_password3(self):
1485         """Make sure WP has no influence"""
1486         mod = "(D;;WP;;;PS)"
1487         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1488         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1489         sddl = desc.as_sddl(self.domain_sid)
1490         self.ldb_user.modify_ldif("""
1491 dn: """ + self.get_user_dn(self.user_with_wp) + """
1492 changetype: modify
1493 delete: unicodePwd
1494 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
1495 add: unicodePwd
1496 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
1497 """)
1498
1499     def test_change_password5(self):
1500         """Make sure rights have no influence on dBCSPwd"""
1501         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1502         sddl = desc.as_sddl(self.domain_sid)
1503         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1504         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1505         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1506         mod = "(D;;WP;;;PS)"
1507         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1508         try:
1509             self.ldb_user.modify_ldif("""
1510 dn: """ + self.get_user_dn(self.user_with_wp) + """
1511 changetype: modify
1512 delete: dBCSPwd
1513 dBCSPwd: XXXXXXXXXXXXXXXX
1514 add: dBCSPwd
1515 dBCSPwd: YYYYYYYYYYYYYYYY
1516 """)
1517         except LdbError as e26:
1518             (num, _) = e26.args
1519             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1520         else:
1521             self.fail()
1522
1523     def test_change_password6(self):
1524         """Test uneven delete/adds"""
1525         try:
1526             self.ldb_user.modify_ldif("""
1527 dn: """ + self.get_user_dn(self.user_with_wp) + """
1528 changetype: modify
1529 delete: userPassword
1530 userPassword: thatsAcomplPASS1
1531 delete: userPassword
1532 userPassword: thatsAcomplPASS1
1533 add: userPassword
1534 userPassword: thatsAcomplPASS2
1535 """)
1536         except LdbError as e27:
1537             (num, _) = e27.args
1538             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1539         else:
1540             self.fail()
1541         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1542         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1543         try:
1544             self.ldb_user.modify_ldif("""
1545 dn: """ + self.get_user_dn(self.user_with_wp) + """
1546 changetype: modify
1547 delete: userPassword
1548 userPassword: thatsAcomplPASS1
1549 delete: userPassword
1550 userPassword: thatsAcomplPASS1
1551 add: userPassword
1552 userPassword: thatsAcomplPASS2
1553 """)
1554             # This fails on Windows 2000 domain level with constraint violation
1555         except LdbError as e28:
1556             (num, _) = e28.args
1557             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1558                             num == ERR_UNWILLING_TO_PERFORM)
1559         else:
1560             self.fail()
1561
1562
1563     def test_change_password7(self):
1564         """Try a password change operation without any CARs given"""
1565         #users have change password by default - remove for negative testing
1566         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1567         sddl = desc.as_sddl(self.domain_sid)
1568         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1569         #first change our own password
1570         self.ldb_user2.modify_ldif("""
1571 dn: """ + self.get_user_dn(self.user_with_pc) + """
1572 changetype: modify
1573 delete: unicodePwd
1574 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
1575 add: unicodePwd
1576 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
1577 """)
1578         #then someone else's
1579         self.ldb_user2.modify_ldif("""
1580 dn: """ + self.get_user_dn(self.user_with_wp) + """
1581 changetype: modify
1582 delete: unicodePwd
1583 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')).decode('utf8') + """
1584 add: unicodePwd
1585 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')).decode('utf8') + """
1586 """)
1587
1588     def test_reset_password1(self):
1589         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1590         try:
1591             self.ldb_user.modify_ldif("""
1592 dn: """ + self.get_user_dn(self.user_with_wp) + """
1593 changetype: modify
1594 replace: unicodePwd
1595 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
1596 """)
1597         except LdbError as e29:
1598             (num, _) = e29.args
1599             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1600         else:
1601             self.fail()
1602         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1603         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1604         self.ldb_user.modify_ldif("""
1605 dn: """ + self.get_user_dn(self.user_with_wp) + """
1606 changetype: modify
1607 replace: unicodePwd
1608 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
1609 """)
1610
1611     def test_reset_password2(self):
1612         """Try a user password reset operation (userPassword) before and after granting CAR"""
1613         try:
1614             self.ldb_user.modify_ldif("""
1615 dn: """ + self.get_user_dn(self.user_with_wp) + """
1616 changetype: modify
1617 replace: userPassword
1618 userPassword: thatsAcomplPASS1
1619 """)
1620         except LdbError as e30:
1621             (num, _) = e30.args
1622             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1623         else:
1624             self.fail()
1625         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1626         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1627         try:
1628             self.ldb_user.modify_ldif("""
1629 dn: """ + self.get_user_dn(self.user_with_wp) + """
1630 changetype: modify
1631 replace: userPassword
1632 userPassword: thatsAcomplPASS1
1633 """)
1634             # This fails on Windows 2000 domain level with constraint violation
1635         except LdbError as e31:
1636             (num, _) = e31.args
1637             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1638
1639     def test_reset_password3(self):
1640         """Grant WP and see what happens (unicodePwd)"""
1641         mod = "(A;;WP;;;PS)"
1642         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1643         try:
1644             self.ldb_user.modify_ldif("""
1645 dn: """ + self.get_user_dn(self.user_with_wp) + """
1646 changetype: modify
1647 replace: unicodePwd
1648 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
1649 """)
1650         except LdbError as e32:
1651             (num, _) = e32.args
1652             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1653         else:
1654             self.fail()
1655
1656     def test_reset_password4(self):
1657         """Grant WP and see what happens (userPassword)"""
1658         mod = "(A;;WP;;;PS)"
1659         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1660         try:
1661             self.ldb_user.modify_ldif("""
1662 dn: """ + self.get_user_dn(self.user_with_wp) + """
1663 changetype: modify
1664 replace: userPassword
1665 userPassword: thatsAcomplPASS1
1666 """)
1667         except LdbError as e33:
1668             (num, _) = e33.args
1669             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1670         else:
1671             self.fail()
1672
1673     def test_reset_password5(self):
1674         """Explicitly deny WP but grant CAR (unicodePwd)"""
1675         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1676         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1677         self.ldb_user.modify_ldif("""
1678 dn: """ + self.get_user_dn(self.user_with_wp) + """
1679 changetype: modify
1680 replace: unicodePwd
1681 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
1682 """)
1683
1684     def test_reset_password6(self):
1685         """Explicitly deny WP but grant CAR (userPassword)"""
1686         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1687         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1688         try:
1689             self.ldb_user.modify_ldif("""
1690 dn: """ + self.get_user_dn(self.user_with_wp) + """
1691 changetype: modify
1692 replace: userPassword
1693 userPassword: thatsAcomplPASS1
1694 """)
1695             # This fails on Windows 2000 domain level with constraint violation
1696         except LdbError as e34:
1697             (num, _) = e34.args
1698             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1699
1700 class AclExtendedTests(AclTests):
1701
1702     def setUp(self):
1703         super(AclExtendedTests, self).setUp()
1704         #regular user, will be the creator
1705         self.u1 = "ext_u1"
1706         #regular user
1707         self.u2 = "ext_u2"
1708         #admin user
1709         self.u3 = "ext_u3"
1710         self.ldb_admin.newuser(self.u1, self.user_pass)
1711         self.ldb_admin.newuser(self.u2, self.user_pass)
1712         self.ldb_admin.newuser(self.u3, self.user_pass)
1713         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
1714                                                 add_members_operation=True)
1715         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1716         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1717         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1718         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1719         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1720
1721     def tearDown(self):
1722         super(AclExtendedTests, self).tearDown()
1723         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1724         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1725         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1726         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1727         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1728
1729         del self.ldb_user1
1730         del self.ldb_user2
1731         del self.ldb_user3
1732
1733     def test_ntSecurityDescriptor(self):
1734         #create empty ou
1735         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1736         #give u1 Create children access
1737         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1738         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1739         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1740         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1741         #create a group under that, grant RP to u2
1742         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1743                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1744         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1745         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1746         #u2 must not read the descriptor
1747         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1748                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1749         self.assertNotEqual(len(res), 0)
1750         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1751         #grant RC to u2 - still no access
1752         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1753         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1754         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1755                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1756         self.assertNotEqual(len(res), 0)
1757         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1758         #u3 is member of administrators group, should be able to read sd
1759         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1760                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1761         self.assertEqual(len(res), 1)
1762         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1763
1764 class AclUndeleteTests(AclTests):
1765
1766     def setUp(self):
1767         super(AclUndeleteTests, self).setUp()
1768         self.regular_user = "undeleter1"
1769         self.ou1 = "OU=undeleted_ou,"
1770         self.testuser1 = "to_be_undeleted1"
1771         self.testuser2 = "to_be_undeleted2"
1772         self.testuser3 = "to_be_undeleted3"
1773         self.testuser4 = "to_be_undeleted4"
1774         self.testuser5 = "to_be_undeleted5"
1775         self.testuser6 = "to_be_undeleted6"
1776
1777         self.new_dn_ou = "CN=" + self.testuser4 + "," + self.ou1 + self.base_dn
1778
1779         # Create regular user
1780         self.testuser1_dn = self.get_user_dn(self.testuser1)
1781         self.testuser2_dn = self.get_user_dn(self.testuser2)
1782         self.testuser3_dn = self.get_user_dn(self.testuser3)
1783         self.testuser4_dn = self.get_user_dn(self.testuser4)
1784         self.testuser5_dn = self.get_user_dn(self.testuser5)
1785         self.deleted_dn1 = self.create_delete_user(self.testuser1)
1786         self.deleted_dn2 = self.create_delete_user(self.testuser2)
1787         self.deleted_dn3 = self.create_delete_user(self.testuser3)
1788         self.deleted_dn4 = self.create_delete_user(self.testuser4)
1789         self.deleted_dn5 = self.create_delete_user(self.testuser5)
1790
1791         self.ldb_admin.create_ou(self.ou1 + self.base_dn)
1792
1793         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1794         self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
1795                                                 add_members_operation=True)
1796         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1797         self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1798
1799     def tearDown(self):
1800         super(AclUndeleteTests, self).tearDown()
1801         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1802         delete_force(self.ldb_admin, self.get_user_dn(self.testuser1))
1803         delete_force(self.ldb_admin, self.get_user_dn(self.testuser2))
1804         delete_force(self.ldb_admin, self.get_user_dn(self.testuser3))
1805         delete_force(self.ldb_admin, self.get_user_dn(self.testuser4))
1806         delete_force(self.ldb_admin, self.get_user_dn(self.testuser5))
1807         delete_force(self.ldb_admin, self.new_dn_ou)
1808         delete_force(self.ldb_admin, self.ou1 + self.base_dn)
1809
1810         del self.ldb_user
1811
1812     def GUID_string(self, guid):
1813         return ldb.schema_format_value("objectGUID", guid)
1814
1815     def create_delete_user(self, new_user):
1816         self.ldb_admin.newuser(new_user, self.user_pass)
1817
1818         res = self.ldb_admin.search(expression="(objectClass=*)",
1819                                     base=self.get_user_dn(new_user),
1820                                     scope=SCOPE_BASE,
1821                                     controls=["show_deleted:1"])
1822         guid = res[0]["objectGUID"][0]
1823         self.ldb_admin.delete(self.get_user_dn(new_user))
1824         res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
1825                                     scope=SCOPE_BASE, controls=["show_deleted:1"])
1826         self.assertEquals(len(res), 1)
1827         return str(res[0].dn)
1828
1829     def undelete_deleted(self, olddn, newdn):
1830         msg = Message()
1831         msg.dn = Dn(self.ldb_user, olddn)
1832         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1833         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1834         res = self.ldb_user.modify(msg, ["show_recycled:1"])
1835
1836     def undelete_deleted_with_mod(self, olddn, newdn):
1837         msg = Message()
1838         msg.dn = Dn(ldb, olddn)
1839         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1840         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1841         msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
1842         res = self.ldb_user.modify(msg, ["show_deleted:1"])
1843
1844     def test_undelete(self):
1845         # it appears the user has to have LC on the old parent to be able to move the object
1846         # otherwise we get no such object. Since only System can modify the SD on deleted object
1847         # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
1848
1849         # deny write property on rdn, should fail
1850         mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1851         self.sd_utils.dacl_add_ace(self.deleted_dn1, mod)
1852         try:
1853             self.undelete_deleted(self.deleted_dn1, self.testuser1_dn)
1854             self.fail()
1855         except LdbError as e35:
1856             (num, _) = e35.args
1857             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1858
1859         # seems that permissions on isDeleted and distinguishedName are irrelevant
1860         mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1861         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1862         mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1863         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1864         self.undelete_deleted(self.deleted_dn2, self.testuser2_dn)
1865
1866         # attempt undelete with simultanious addition of url, WP to which is denied
1867         mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid)
1868         self.sd_utils.dacl_add_ace(self.deleted_dn3, mod)
1869         try:
1870             self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn)
1871             self.fail()
1872         except LdbError as e36:
1873             (num, _) = e36.args
1874             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1875
1876         # undelete in an ou, in which we have no right to create children
1877         mod = "(D;;CC;;;%s)" % str(self.sid)
1878         self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod)
1879         try:
1880             self.undelete_deleted(self.deleted_dn4, self.new_dn_ou)
1881             self.fail()
1882         except LdbError as e37:
1883             (num, _) = e37.args
1884             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1885
1886         # delete is not required
1887         mod = "(D;;SD;;;%s)" % str(self.sid)
1888         self.sd_utils.dacl_add_ace(self.deleted_dn5, mod)
1889         self.undelete_deleted(self.deleted_dn5, self.testuser5_dn)
1890
1891         # deny Reanimate-Tombstone, should fail
1892         mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid)
1893         self.sd_utils.dacl_add_ace(self.base_dn, mod)
1894         try:
1895             self.undelete_deleted(self.deleted_dn4, self.testuser4_dn)
1896             self.fail()
1897         except LdbError as e38:
1898             (num, _) = e38.args
1899             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1900
1901 class AclSPNTests(AclTests):
1902
1903     def setUp(self):
1904         super(AclSPNTests, self).setUp()
1905         self.dcname = "TESTSRV8"
1906         self.rodcname = "TESTRODC8"
1907         self.computername = "testcomp8"
1908         self.test_user = "spn_test_user8"
1909         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1910         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1911         self.site = "Default-First-Site-Name"
1912         self.rodcctx = DCJoinContext(server=host, creds=creds, lp=lp,
1913                                      site=self.site, netbios_name=self.rodcname,
1914                                      targetdir=None, domain=None)
1915         self.dcctx = DCJoinContext(server=host, creds=creds, lp=lp,
1916                                    site=self.site, netbios_name=self.dcname,
1917                                    targetdir=None, domain=None)
1918         self.ldb_admin.newuser(self.test_user, self.user_pass)
1919         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1920         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1921         self.create_computer(self.computername, self.dcctx.dnsdomain)
1922         self.create_rodc(self.rodcctx)
1923         self.create_dc(self.dcctx)
1924
1925     def tearDown(self):
1926         super(AclSPNTests, self).tearDown()
1927         self.rodcctx.cleanup_old_join()
1928         self.dcctx.cleanup_old_join()
1929         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1930         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1931
1932         del self.ldb_user1
1933
1934     def replace_spn(self, _ldb, dn, spn):
1935         print("Setting spn %s on %s" % (spn, dn))
1936         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1937                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1938         if "servicePrincipalName" in res[0].keys():
1939             flag = FLAG_MOD_REPLACE
1940         else:
1941             flag = FLAG_MOD_ADD
1942
1943         msg = Message()
1944         msg.dn = Dn(self.ldb_admin, dn)
1945         msg["servicePrincipalName"] = MessageElement(spn, flag,
1946                                                      "servicePrincipalName")
1947         _ldb.modify(msg)
1948
1949     def create_computer(self, computername, domainname):
1950         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1951         samaccountname = computername + "$"
1952         dnshostname = "%s.%s" % (computername, domainname)
1953         self.ldb_admin.add({
1954             "dn": dn,
1955             "objectclass": "computer",
1956             "sAMAccountName": samaccountname,
1957             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1958             "dNSHostName": dnshostname})
1959
1960     # same as for join_RODC, but do not set any SPNs
1961     def create_rodc(self, ctx):
1962         ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1963         ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1964         ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1965
1966         ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1967                                  "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1968                                  "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1969                                  "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1970                                  "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
1971         ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1972
1973         mysid = ctx.get_mysid()
1974         admin_dn = "<SID=%s>" % mysid
1975         ctx.managedby = admin_dn
1976
1977         ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1978                                   samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1979                                   samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1980
1981         ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1982         ctx.secure_channel_type = misc.SEC_CHAN_RODC
1983         ctx.RODC = True
1984         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1985                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1986                              drsuapi.DRSUAPI_DRS_GET_ANC |
1987                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1988                              drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1989
1990         ctx.join_add_objects()
1991
1992     def create_dc(self, ctx):
1993         ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1994         ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1995         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1996         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1997         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1998                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1999                              drsuapi.DRSUAPI_DRS_PER_SYNC |
2000                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
2001                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
2002
2003         ctx.join_add_objects()
2004
2005     def dc_spn_test(self, ctx):
2006         netbiosdomain = self.dcctx.get_domain_name()
2007         try:
2008             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
2009         except LdbError as e39:
2010             (num, _) = e39.args
2011             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
2012
2013         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
2014         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
2015         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
2016         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
2017         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
2018                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
2019         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
2020         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
2021                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
2022         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
2023                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
2024         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
2025         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
2026                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
2027         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
2028         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
2029         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
2030                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
2031         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
2032         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
2033                          (ctx.myname, ctx.dnsdomain))
2034         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
2035                          (ctx.myname))
2036         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
2037                          (ctx.myname, ctx.dnsdomain))
2038         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
2039                          (ctx.myname, ctx.dnsdomain))
2040         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
2041                          (ctx.ntds_guid, ctx.dnsdomain))
2042
2043         #the following spns do not match the restrictions and should fail
2044         try:
2045             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
2046                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
2047         except LdbError as e40:
2048             (num, _) = e40.args
2049             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2050         try:
2051             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
2052                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
2053         except LdbError as e41:
2054             (num, _) = e41.args
2055             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2056         try:
2057             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
2058         except LdbError as e42:
2059             (num, _) = e42.args
2060             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2061         try:
2062             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
2063                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
2064         except LdbError as e43:
2065             (num, _) = e43.args
2066             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2067         try:
2068             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
2069                              (ctx.ntds_guid, ctx.dnsdomain))
2070         except LdbError as e44:
2071             (num, _) = e44.args
2072             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2073
2074     def test_computer_spn(self):
2075         # with WP, any value can be set
2076         netbiosdomain = self.dcctx.get_domain_name()
2077         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
2078                          (self.computername, netbiosdomain))
2079         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
2080         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
2081                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
2082         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
2083                          (self.computername, self.dcctx.dnsdomain))
2084         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
2085                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2086         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
2087                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
2088         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
2089         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
2090                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2091         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
2092                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2093         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
2094                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
2095         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
2096         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
2097                          (self.computername, self.dcctx.dnsdomain))
2098         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
2099                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2100         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
2101                          (self.computername, self.dcctx.dnsdomain))
2102         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
2103                          (self.computername, self.dcctx.dnsdomain))
2104         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
2105                          (self.computername))
2106         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
2107                          (self.computername, self.dcctx.dnsdomain))
2108         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
2109                          (self.computername, self.dcctx.dnsdomain))
2110         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
2111
2112         #user has neither WP nor Validated-SPN, access denied expected
2113         try:
2114             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
2115         except LdbError as e45:
2116             (num, _) = e45.args
2117             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
2118
2119         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
2120         self.sd_utils.dacl_add_ace(self.computerdn, mod)
2121         #grant Validated-SPN and check which values are accepted
2122         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
2123
2124         # for regular computer objects we shouldalways get constraint violation
2125
2126         # This does not pass against Windows, although it should according to docs
2127         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
2128         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
2129                          (self.computername, self.dcctx.dnsdomain))
2130
2131         try:
2132             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
2133         except LdbError as e46:
2134             (num, _) = e46.args
2135             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2136         try:
2137             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
2138                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
2139         except LdbError as e47:
2140             (num, _) = e47.args
2141             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2142         try:
2143             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
2144                              (self.computername, self.dcctx.dnsdomain))
2145         except LdbError as e48:
2146             (num, _) = e48.args
2147             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2148         try:
2149             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
2150                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2151         except LdbError as e49:
2152             (num, _) = e49.args
2153             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2154         try:
2155             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
2156                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
2157         except LdbError as e50:
2158             (num, _) = e50.args
2159             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2160         try:
2161             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
2162         except LdbError as e51:
2163             (num, _) = e51.args
2164             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2165         try:
2166             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
2167                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2168         except LdbError as e52:
2169             (num, _) = e52.args
2170             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2171
2172     def test_spn_rwdc(self):
2173         self.dc_spn_test(self.dcctx)
2174
2175     def test_spn_rodc(self):
2176         self.dc_spn_test(self.rodcctx)
2177
2178
2179 # Important unit running information
2180
2181 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
2182
2183 TestProgram(module=__name__, opts=subunitopts)