238889a9d059ca217325d748fa4796802df3a44d
[samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11 samba.ensure_external_module("testtools", "testtools")
12 samba.ensure_external_module("subunit", "subunit/python")
13
14 import samba.getopt as options
15 from samba.join import dc_join
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
24 from samba.dcerpc import security, drsuapi, misc
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials
30 import samba.tests
31 from samba.tests import delete_force
32 from subunit.run import SubunitTestRunner
33 import unittest
34 import samba.dsdb
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51 if not "://" in host:
52     ldaphost = "ldap://%s" % host
53 else:
54     ldaphost = host
55     start = host.rindex("://")
56     host = host.lstrip(start+3)
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
61
62 #
63 # Tests start here
64 #
65
66 class AclTests(samba.tests.TestCase):
67
68     def setUp(self):
69         super(AclTests, self).setUp()
70         self.ldb_admin = ldb
71         self.base_dn = ldb.domain_dn()
72         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
73         self.user_pass = "samba123@"
74         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
75         self.sd_utils = sd_utils.SDUtils(ldb)
76         #used for anonymous login
77         self.creds_tmp = Credentials()
78         self.creds_tmp.set_username("")
79         self.creds_tmp.set_password("")
80         self.creds_tmp.set_domain(creds.get_domain())
81         self.creds_tmp.set_realm(creds.get_realm())
82         self.creds_tmp.set_workstation(creds.get_workstation())
83         print "baseDN: %s" % self.base_dn
84
85     def get_user_dn(self, name):
86         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
87
88     def get_ldb_connection(self, target_username, target_password):
89         creds_tmp = Credentials()
90         creds_tmp.set_username(target_username)
91         creds_tmp.set_password(target_password)
92         creds_tmp.set_domain(creds.get_domain())
93         creds_tmp.set_realm(creds.get_realm())
94         creds_tmp.set_workstation(creds.get_workstation())
95         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
96                                       | gensec.FEATURE_SEAL)
97         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
98         return ldb_target
99
100     # Test if we have any additional groups for users than default ones
101     def assert_user_no_group_member(self, username):
102         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
103         try:
104             self.assertEqual(res[0]["memberOf"][0], "")
105         except KeyError:
106             pass
107         else:
108             self.fail()
109
110 #tests on ldap add operations
111 class AclAddTests(AclTests):
112
113     def setUp(self):
114         super(AclAddTests, self).setUp()
115         # Domain admin that will be creator of OU parent-child structure
116         self.usr_admin_owner = "acl_add_user1"
117         # Second domain admin that will not be creator of OU parent-child structure
118         self.usr_admin_not_owner = "acl_add_user2"
119         # Regular user
120         self.regular_user = "acl_add_user3"
121         self.test_user1 = "test_add_user1"
122         self.test_group1 = "test_add_group1"
123         self.ou1 = "OU=test_add_ou1"
124         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
125         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
126         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
127         self.ldb_admin.newuser(self.regular_user, self.user_pass)
128
129         # add admins to the Domain Admins group
130         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
131                        add_members_operation=True)
132         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
133                        add_members_operation=True)
134
135         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
136         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
137         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
138
139     def tearDown(self):
140         super(AclAddTests, self).tearDown()
141         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
142                           (self.test_user1, self.ou2, self.base_dn))
143         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
144                           (self.test_group1, self.ou2, self.base_dn))
145         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
146         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
147         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
148         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
149         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
150         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
151
152     # Make sure top OU is deleted (and so everything under it)
153     def assert_top_ou_deleted(self):
154         res = self.ldb_admin.search(self.base_dn,
155             expression="(distinguishedName=%s,%s)" % (
156                 "OU=test_add_ou1", self.base_dn))
157         self.assertEqual(len(res), 0)
158
159     def test_add_u1(self):
160         """Testing OU with the rights of Doman Admin not creator of the OU """
161         self.assert_top_ou_deleted()
162         # Change descriptor for top level OU
163         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
164         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
165         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
166         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
167         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
168         # Test user and group creation with another domain admin's credentials
169         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
170         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
171                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
172         # Make sure we HAVE created the two objects -- user and group
173         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
174         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
175         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
176         self.assertTrue(len(res) > 0)
177         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
178         self.assertTrue(len(res) > 0)
179
180     def test_add_u2(self):
181         """Testing OU with the regular user that has no rights granted over the OU """
182         self.assert_top_ou_deleted()
183         # Create a parent-child OU structure with domain admin credentials
184         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
185         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
186         # Test user and group creation with regular user credentials
187         try:
188             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
189             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
190                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
191         except LdbError, (num, _):
192             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
193         else:
194             self.fail()
195         # Make sure we HAVEN'T created any of two objects -- user or group
196         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
197         self.assertEqual(len(res), 0)
198         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
199         self.assertEqual(len(res), 0)
200
201     def test_add_u3(self):
202         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
203         self.assert_top_ou_deleted()
204         # Change descriptor for top level OU
205         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
206         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
207         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
208         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
209         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
210         # Test user and group creation with granted user only to one of the objects
211         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
212         try:
213             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
214                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
215         except LdbError, (num, _):
216             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
217         else:
218             self.fail()
219         # Make sure we HAVE created the one of two objects -- user
220         res = self.ldb_admin.search(self.base_dn,
221                 expression="(distinguishedName=%s,%s)" %
222                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
223                     self.base_dn))
224         self.assertNotEqual(len(res), 0)
225         res = self.ldb_admin.search(self.base_dn,
226                 expression="(distinguishedName=%s,%s)" %
227                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
228                     self.base_dn) )
229         self.assertEqual(len(res), 0)
230
231     def test_add_u4(self):
232         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
233         self.assert_top_ou_deleted()
234         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
235         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
236         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
237         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
238                                  grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
239         # Make sure we have successfully created the two objects -- user and group
240         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
241         self.assertTrue(len(res) > 0)
242         res = self.ldb_admin.search(self.base_dn,
243                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
244         self.assertTrue(len(res) > 0)
245
246     def test_add_anonymous(self):
247         """Test add operation with anonymous user"""
248         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
249         try:
250             anonymous.newuser("test_add_anonymous", self.user_pass)
251         except LdbError, (num, _):
252             self.assertEquals(num, ERR_OPERATIONS_ERROR)
253         else:
254             self.fail()
255
256 #tests on ldap modify operations
257 class AclModifyTests(AclTests):
258
259     def setUp(self):
260         super(AclModifyTests, self).setUp()
261         self.user_with_wp = "acl_mod_user1"
262         self.user_with_sm = "acl_mod_user2"
263         self.user_with_group_sm = "acl_mod_user3"
264         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
265         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
266         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
267         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
268         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
269         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
270         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
271         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
272         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
273         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
274
275     def tearDown(self):
276         super(AclModifyTests, self).tearDown()
277         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
278         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
279         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
280         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
281         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
282         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
283         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
284         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
285         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
286         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
287
288     def test_modify_u1(self):
289         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
290         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
291         # First test object -- User
292         print "Testing modify on User object"
293         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
294         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
295         ldif = """
296 dn: """ + self.get_user_dn("test_modify_user1") + """
297 changetype: modify
298 replace: displayName
299 displayName: test_changed"""
300         self.ldb_user.modify_ldif(ldif)
301         res = self.ldb_admin.search(self.base_dn,
302                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
303         self.assertEqual(res[0]["displayName"][0], "test_changed")
304         # Second test object -- Group
305         print "Testing modify on Group object"
306         self.ldb_admin.newgroup("test_modify_group1",
307                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
308         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
309         ldif = """
310 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
311 changetype: modify
312 replace: displayName
313 displayName: test_changed"""
314         self.ldb_user.modify_ldif(ldif)
315         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
316         self.assertEqual(res[0]["displayName"][0], "test_changed")
317         # Third test object -- Organizational Unit
318         print "Testing modify on OU object"
319         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
320         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
321         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
322         ldif = """
323 dn: OU=test_modify_ou1,""" + self.base_dn + """
324 changetype: modify
325 replace: displayName
326 displayName: test_changed"""
327         self.ldb_user.modify_ldif(ldif)
328         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
329         self.assertEqual(res[0]["displayName"][0], "test_changed")
330
331     def test_modify_u2(self):
332         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
333         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
334         # First test object -- User
335         print "Testing modify on User object"
336         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
337         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
338         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
339         # Modify on attribute you have rights for
340         ldif = """
341 dn: """ + self.get_user_dn("test_modify_user1") + """
342 changetype: modify
343 replace: displayName
344 displayName: test_changed"""
345         self.ldb_user.modify_ldif(ldif)
346         res = self.ldb_admin.search(self.base_dn,
347                 expression="(distinguishedName=%s)" %
348                 self.get_user_dn("test_modify_user1"))
349         self.assertEqual(res[0]["displayName"][0], "test_changed")
350         # Modify on attribute you do not have rights for granted
351         ldif = """
352 dn: """ + self.get_user_dn("test_modify_user1") + """
353 changetype: modify
354 replace: url
355 url: www.samba.org"""
356         try:
357             self.ldb_user.modify_ldif(ldif)
358         except LdbError, (num, _):
359             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
360         else:
361             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
362             self.fail()
363         # Second test object -- Group
364         print "Testing modify on Group object"
365         self.ldb_admin.newgroup("test_modify_group1",
366                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
367         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
368         ldif = """
369 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
370 changetype: modify
371 replace: displayName
372 displayName: test_changed"""
373         self.ldb_user.modify_ldif(ldif)
374         res = self.ldb_admin.search(self.base_dn,
375                 expression="(distinguishedName=%s)" %
376                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
377         self.assertEqual(res[0]["displayName"][0], "test_changed")
378         # Modify on attribute you do not have rights for granted
379         ldif = """
380 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
381 changetype: modify
382 replace: url
383 url: www.samba.org"""
384         try:
385             self.ldb_user.modify_ldif(ldif)
386         except LdbError, (num, _):
387             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
388         else:
389             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
390             self.fail()
391         # Second test object -- Organizational Unit
392         print "Testing modify on OU object"
393         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
394         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
395         ldif = """
396 dn: OU=test_modify_ou1,""" + self.base_dn + """
397 changetype: modify
398 replace: displayName
399 displayName: test_changed"""
400         self.ldb_user.modify_ldif(ldif)
401         res = self.ldb_admin.search(self.base_dn,
402                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
403                     + self.base_dn))
404         self.assertEqual(res[0]["displayName"][0], "test_changed")
405         # Modify on attribute you do not have rights for granted
406         ldif = """
407 dn: OU=test_modify_ou1,""" + self.base_dn + """
408 changetype: modify
409 replace: url
410 url: www.samba.org"""
411         try:
412             self.ldb_user.modify_ldif(ldif)
413         except LdbError, (num, _):
414             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
415         else:
416             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
417             self.fail()
418
419     def test_modify_u3(self):
420         """7 Modify one attribute as you have no what so ever rights granted"""
421         # First test object -- User
422         print "Testing modify on User object"
423         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
424         # Modify on attribute you do not have rights for granted
425         ldif = """
426 dn: """ + self.get_user_dn("test_modify_user1") + """
427 changetype: modify
428 replace: url
429 url: www.samba.org"""
430         try:
431             self.ldb_user.modify_ldif(ldif)
432         except LdbError, (num, _):
433             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
434         else:
435             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
436             self.fail()
437
438         # Second test object -- Group
439         print "Testing modify on Group object"
440         self.ldb_admin.newgroup("test_modify_group1",
441                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
442         # Modify on attribute you do not have rights for granted
443         ldif = """
444 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
445 changetype: modify
446 replace: url
447 url: www.samba.org"""
448         try:
449             self.ldb_user.modify_ldif(ldif)
450         except LdbError, (num, _):
451             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
452         else:
453             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
454             self.fail()
455
456         # Second test object -- Organizational Unit
457         print "Testing modify on OU object"
458         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
459         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
460         # Modify on attribute you do not have rights for granted
461         ldif = """
462 dn: OU=test_modify_ou1,""" + self.base_dn + """
463 changetype: modify
464 replace: url
465 url: www.samba.org"""
466         try:
467             self.ldb_user.modify_ldif(ldif)
468         except LdbError, (num, _):
469             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
470         else:
471             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
472             self.fail()
473
474
475     def test_modify_u4(self):
476         """11 Grant WP to PRINCIPAL_SELF and test modify"""
477         ldif = """
478 dn: """ + self.get_user_dn(self.user_with_wp) + """
479 changetype: modify
480 add: adminDescription
481 adminDescription: blah blah blah"""
482         try:
483             self.ldb_user.modify_ldif(ldif)
484         except LdbError, (num, _):
485             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
486         else:
487             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
488             self.fail()
489
490         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
491         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
492         # Modify on attribute you have rights for
493         self.ldb_user.modify_ldif(ldif)
494         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
495                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
496         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
497
498     def test_modify_u5(self):
499         """12 test self membership"""
500         ldif = """
501 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
502 changetype: modify
503 add: Member
504 Member: """ +  self.get_user_dn(self.user_with_sm)
505 #the user has no rights granted, this should fail
506         try:
507             self.ldb_user2.modify_ldif(ldif)
508         except LdbError, (num, _):
509             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
510         else:
511             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
512             self.fail()
513
514 #grant self-membership, should be able to add himself
515         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
516         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
517         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
518         self.ldb_user2.modify_ldif(ldif)
519         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
520                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
521         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
522 #but not other users
523         ldif = """
524 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
525 changetype: modify
526 add: Member
527 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
528         try:
529             self.ldb_user2.modify_ldif(ldif)
530         except LdbError, (num, _):
531             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
532         else:
533             self.fail()
534
535     def test_modify_u6(self):
536         """13 test self membership"""
537         ldif = """
538 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
539 changetype: modify
540 add: Member
541 Member: """ +  self.get_user_dn(self.user_with_sm) + """
542 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
543
544 #grant self-membership, should be able to add himself  but not others at the same time
545         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
546         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
547         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
548         try:
549             self.ldb_user2.modify_ldif(ldif)
550         except LdbError, (num, _):
551             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
552         else:
553             self.fail()
554
555     def test_modify_u7(self):
556         """13 User with WP modifying Member"""
557 #a second user is given write property permission
558         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
559         mod = "(A;;WP;;;%s)" % str(user_sid)
560         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
561         ldif = """
562 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
563 changetype: modify
564 add: Member
565 Member: """ +  self.get_user_dn(self.user_with_wp)
566         self.ldb_user.modify_ldif(ldif)
567         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
568                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
569         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
570         ldif = """
571 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
572 changetype: modify
573 delete: Member"""
574         self.ldb_user.modify_ldif(ldif)
575         ldif = """
576 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
577 changetype: modify
578 add: Member
579 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
580         self.ldb_user.modify_ldif(ldif)
581         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
582                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
583         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
584
585     def test_modify_anonymous(self):
586         """Test add operation with anonymous user"""
587         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
588         self.ldb_admin.newuser("test_anonymous", "samba123@")
589         m = Message()
590         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
591
592         m["description"] = MessageElement("sambauser2",
593                                           FLAG_MOD_ADD,
594                                           "description")
595         try:
596             anonymous.modify(m)
597         except LdbError, (num, _):
598             self.assertEquals(num, ERR_OPERATIONS_ERROR)
599         else:
600             self.fail()
601
602 #enable these when we have search implemented
603 class AclSearchTests(AclTests):
604
605     def setUp(self):
606         super(AclSearchTests, self).setUp()
607         self.u1 = "search_u1"
608         self.u2 = "search_u2"
609         self.u3 = "search_u3"
610         self.group1 = "group1"
611         self.ldb_admin.newuser(self.u1, self.user_pass)
612         self.ldb_admin.newuser(self.u2, self.user_pass)
613         self.ldb_admin.newuser(self.u3, self.user_pass)
614         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
615         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
616                                                 add_members_operation=True)
617         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
618         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
619         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
620         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
621                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
622                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
623                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
624                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
625                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
626         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
627         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
628
629     def create_clean_ou(self, object_dn):
630         """ Base repeating setup for unittests to follow """
631         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
632                 expression="distinguishedName=%s" % object_dn)
633         # Make sure top testing OU has been deleted before starting the test
634         self.assertEqual(len(res), 0)
635         self.ldb_admin.create_ou(object_dn)
636         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
637         # Make sure there are inheritable ACEs initially
638         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
639         # Find and remove all inherit ACEs
640         res = re.findall("\(.*?\)", desc_sddl)
641         res = [x for x in res if ("CI" in x) or ("OI" in x)]
642         for x in res:
643             desc_sddl = desc_sddl.replace(x, "")
644         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
645         # can propagate from above
646         # remove SACL, we are not interested
647         desc_sddl = desc_sddl.replace(":AI", ":AIP")
648         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
649         # Verify all inheritable ACEs are gone
650         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
651         self.assertFalse("CI" in desc_sddl)
652         self.assertFalse("OI" in desc_sddl)
653
654     def tearDown(self):
655         super(AclSearchTests, self).tearDown()
656         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
657         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
658         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
659         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
660         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
661         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
662         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
663         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
664         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
665         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
666         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
667         delete_force(self.ldb_admin, self.get_user_dn("group1"))
668
669     def test_search_anonymous1(self):
670         """Verify access of rootDSE with the correct request"""
671         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
672         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
673         self.assertEquals(len(res), 1)
674         #verify some of the attributes
675         #dont care about values
676         self.assertTrue("ldapServiceName" in res[0])
677         self.assertTrue("namingContexts" in res[0])
678         self.assertTrue("isSynchronized" in res[0])
679         self.assertTrue("dsServiceName" in res[0])
680         self.assertTrue("supportedSASLMechanisms" in res[0])
681         self.assertTrue("isGlobalCatalogReady" in res[0])
682         self.assertTrue("domainControllerFunctionality" in res[0])
683         self.assertTrue("serverName" in res[0])
684
685     def test_search_anonymous2(self):
686         """Make sure we cannot access anything else"""
687         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
688         try:
689             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
690         except LdbError, (num, _):
691             self.assertEquals(num, ERR_OPERATIONS_ERROR)
692         else:
693             self.fail()
694         try:
695             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
696         except LdbError, (num, _):
697             self.assertEquals(num, ERR_OPERATIONS_ERROR)
698         else:
699             self.fail()
700         try:
701             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
702                                         scope=SCOPE_SUBTREE)
703         except LdbError, (num, _):
704             self.assertEquals(num, ERR_OPERATIONS_ERROR)
705         else:
706             self.fail()
707
708     def test_search_anonymous3(self):
709         """Set dsHeuristics and repeat"""
710         self.ldb_admin.set_dsheuristics("0000002")
711         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
712         mod = "(A;CI;LC;;;AN)"
713         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
714         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
715         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
716         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
717                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
718         self.assertEquals(len(res), 1)
719         self.assertTrue("dn" in res[0])
720         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
721                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
722         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
723                                scope=SCOPE_SUBTREE)
724         self.assertEquals(len(res), 1)
725         self.assertTrue("dn" in res[0])
726         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
727
728     def test_search1(self):
729         """Make sure users can see us if given LC to user and group"""
730         self.create_clean_ou("OU=ou1," + self.base_dn)
731         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
732         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
733         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
734                                                  self.domain_sid)
735         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
736         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
737         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
738         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
739         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
740
741         #regular users must see only ou1 and ou2
742         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
743                                     scope=SCOPE_SUBTREE)
744         self.assertEquals(len(res), 2)
745         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
746                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
747
748         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
749         self.assertEquals(sorted(res_list), sorted(ok_list))
750
751         #these users should see all ous
752         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
753                                     scope=SCOPE_SUBTREE)
754         self.assertEquals(len(res), 6)
755         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
756         self.assertEquals(sorted(res_list), sorted(self.full_list))
757
758         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
759                                     scope=SCOPE_SUBTREE)
760         self.assertEquals(len(res), 6)
761         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
762         self.assertEquals(sorted(res_list), sorted(self.full_list))
763
764     def test_search2(self):
765         """Make sure users can't see us if access is explicitly denied"""
766         self.create_clean_ou("OU=ou1," + self.base_dn)
767         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
768         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
769         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
770         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
771         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
772         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
773         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
774         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
775                                     scope=SCOPE_SUBTREE)
776         #this user should see all ous
777         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
778         self.assertEquals(sorted(res_list), sorted(self.full_list))
779
780         #these users should see ou1, 2, 5 and 6 but not 3 and 4
781         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
782                                     scope=SCOPE_SUBTREE)
783         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
784                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
785                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
786                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
787         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
788         self.assertEquals(sorted(res_list), sorted(ok_list))
789
790         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
791                                     scope=SCOPE_SUBTREE)
792         self.assertEquals(len(res), 4)
793         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
794         self.assertEquals(sorted(res_list), sorted(ok_list))
795
796     def test_search3(self):
797         """Make sure users can't see ous if access is explicitly denied - 2"""
798         self.create_clean_ou("OU=ou1," + self.base_dn)
799         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
800         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
801         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
802                                                  self.domain_sid)
803         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
804         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
805         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
806         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
807         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
808
809         print "Testing correct behavior on nonaccessible search base"
810         try:
811              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
812                                    scope=SCOPE_BASE)
813         except LdbError, (num, _):
814             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
815         else:
816             self.fail()
817
818         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
819         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
820
821         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
822                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
823
824         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
825                                     scope=SCOPE_SUBTREE)
826         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
827         self.assertEquals(sorted(res_list), sorted(ok_list))
828
829         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
830                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
831                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
832                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
833
834         #should not see ou3 and ou4, but should see ou5 and ou6
835         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
836                                     scope=SCOPE_SUBTREE)
837         self.assertEquals(len(res), 4)
838         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
839         self.assertEquals(sorted(res_list), sorted(ok_list))
840
841         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
842                                     scope=SCOPE_SUBTREE)
843         self.assertEquals(len(res), 4)
844         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
845         self.assertEquals(sorted(res_list), sorted(ok_list))
846
847     def test_search4(self):
848         """There is no difference in visibility if the user is also creator"""
849         self.create_clean_ou("OU=ou1," + self.base_dn)
850         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
851         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
852         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
853                                                  self.domain_sid)
854         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
855         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
856         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
857         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
858         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
859
860         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
861                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
862         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
863                                     scope=SCOPE_SUBTREE)
864         self.assertEquals(len(res), 2)
865         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
866         self.assertEquals(sorted(res_list), sorted(ok_list))
867
868         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
869                                     scope=SCOPE_SUBTREE)
870         self.assertEquals(len(res), 2)
871         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
872         self.assertEquals(sorted(res_list), sorted(ok_list))
873
874     def test_search5(self):
875         """Make sure users can see only attributes they are allowed to see"""
876         self.create_clean_ou("OU=ou1," + self.base_dn)
877         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
878         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
879         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
880                                                  self.domain_sid)
881         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
882         # assert user can only see dn
883         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
884                                     scope=SCOPE_SUBTREE)
885         ok_list = ['dn']
886         self.assertEquals(len(res), 1)
887         res_list = res[0].keys()
888         self.assertEquals(res_list, ok_list)
889
890         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
891                                     scope=SCOPE_BASE, attrs=["ou"])
892
893         self.assertEquals(len(res), 1)
894         res_list = res[0].keys()
895         self.assertEquals(res_list, ok_list)
896
897         #give read property on ou and assert user can only see dn and ou
898         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
899         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
900         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
901         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
902                                     scope=SCOPE_SUBTREE)
903         ok_list = ['dn', 'ou']
904         self.assertEquals(len(res), 1)
905         res_list = res[0].keys()
906         self.assertEquals(sorted(res_list), sorted(ok_list))
907
908         #give read property on Public Information and assert user can see ou and other members
909         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
910         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
911         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
912         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
913                                     scope=SCOPE_SUBTREE)
914
915         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
916         res_list = res[0].keys()
917         self.assertEquals(sorted(res_list), sorted(ok_list))
918
919     def test_search6(self):
920         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
921         self.create_clean_ou("OU=ou1," + self.base_dn)
922         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
923         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
924         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
925                                                  self.domain_sid)
926         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
927         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
928
929         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
930                                     scope=SCOPE_SUBTREE)
931         #nothing should be returned as ou is not accessible
932         self.assertEquals(len(res), 0)
933
934         #give read property on ou and assert user can only see dn and ou
935         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
936         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
937         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
938                                     scope=SCOPE_SUBTREE)
939         self.assertEquals(len(res), 1)
940         ok_list = ['dn', 'ou']
941         res_list = res[0].keys()
942         self.assertEquals(sorted(res_list), sorted(ok_list))
943
944         #give read property on Public Information and assert user can see ou and other members
945         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
946         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
947         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
948                                    scope=SCOPE_SUBTREE)
949         self.assertEquals(len(res), 1)
950         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
951         res_list = res[0].keys()
952         self.assertEquals(sorted(res_list), sorted(ok_list))
953
954 #tests on ldap delete operations
955 class AclDeleteTests(AclTests):
956
957     def setUp(self):
958         super(AclDeleteTests, self).setUp()
959         self.regular_user = "acl_delete_user1"
960         # Create regular user
961         self.ldb_admin.newuser(self.regular_user, self.user_pass)
962         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
963
964     def tearDown(self):
965         super(AclDeleteTests, self).tearDown()
966         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
967         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
968         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
969
970     def test_delete_u1(self):
971         """User is prohibited by default to delete another User object"""
972         # Create user that we try to delete
973         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
974         # Here delete User object should ALWAYS through exception
975         try:
976             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
977         except LdbError, (num, _):
978             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
979         else:
980             self.fail()
981
982     def test_delete_u2(self):
983         """User's group has RIGHT_DELETE to another User object"""
984         user_dn = self.get_user_dn("test_delete_user1")
985         # Create user that we try to delete
986         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
987         mod = "(A;;SD;;;AU)"
988         self.sd_utils.dacl_add_ace(user_dn, mod)
989         # Try to delete User object
990         self.ldb_user.delete(user_dn)
991         res = self.ldb_admin.search(self.base_dn,
992                 expression="(distinguishedName=%s)" % user_dn)
993         self.assertEqual(len(res), 0)
994
995     def test_delete_u3(self):
996         """User indentified by SID has RIGHT_DELETE to another User object"""
997         user_dn = self.get_user_dn("test_delete_user1")
998         # Create user that we try to delete
999         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1000         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1001         self.sd_utils.dacl_add_ace(user_dn, mod)
1002         # Try to delete User object
1003         self.ldb_user.delete(user_dn)
1004         res = self.ldb_admin.search(self.base_dn,
1005                 expression="(distinguishedName=%s)" % user_dn)
1006         self.assertEqual(len(res), 0)
1007
1008     def test_delete_anonymous(self):
1009         """Test add operation with anonymous user"""
1010         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1011         self.ldb_admin.newuser("test_anonymous", "samba123@")
1012
1013         try:
1014             anonymous.delete(self.get_user_dn("test_anonymous"))
1015         except LdbError, (num, _):
1016             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1017         else:
1018             self.fail()
1019
1020 #tests on ldap rename operations
1021 class AclRenameTests(AclTests):
1022
1023     def setUp(self):
1024         super(AclRenameTests, self).setUp()
1025         self.regular_user = "acl_rename_user1"
1026         self.ou1 = "OU=test_rename_ou1"
1027         self.ou2 = "OU=test_rename_ou2"
1028         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1029         self.testuser1 = "test_rename_user1"
1030         self.testuser2 = "test_rename_user2"
1031         self.testuser3 = "test_rename_user3"
1032         self.testuser4 = "test_rename_user4"
1033         self.testuser5 = "test_rename_user5"
1034         # Create regular user
1035         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1036         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1037
1038     def tearDown(self):
1039         super(AclRenameTests, self).tearDown()
1040         # Rename OU3
1041         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1042         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1043         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1044         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1045         # Rename OU2
1046         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1047         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1048         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1049         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1050         # Rename OU1
1051         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1052         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1053         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1054         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1055         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1056         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1057
1058     def test_rename_u1(self):
1059         """Regular user fails to rename 'User object' within single OU"""
1060         # Create OU structure
1061         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1062         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1063         try:
1064             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1065                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1066         except LdbError, (num, _):
1067             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1068         else:
1069             self.fail()
1070
1071     def test_rename_u2(self):
1072         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1073         ou_dn = "OU=test_rename_ou1," + self.base_dn
1074         user_dn = "CN=test_rename_user1," + ou_dn
1075         rename_user_dn = "CN=test_rename_user5," + ou_dn
1076         # Create OU structure
1077         self.ldb_admin.create_ou(ou_dn)
1078         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1079         mod = "(A;;WP;;;AU)"
1080         self.sd_utils.dacl_add_ace(user_dn, mod)
1081         # Rename 'User object' having WP to AU
1082         self.ldb_user.rename(user_dn, rename_user_dn)
1083         res = self.ldb_admin.search(self.base_dn,
1084                 expression="(distinguishedName=%s)" % user_dn)
1085         self.assertEqual(len(res), 0)
1086         res = self.ldb_admin.search(self.base_dn,
1087                 expression="(distinguishedName=%s)" % rename_user_dn)
1088         self.assertNotEqual(len(res), 0)
1089
1090     def test_rename_u3(self):
1091         """Test rename with rights granted to 'User object' SID"""
1092         ou_dn = "OU=test_rename_ou1," + self.base_dn
1093         user_dn = "CN=test_rename_user1," + ou_dn
1094         rename_user_dn = "CN=test_rename_user5," + ou_dn
1095         # Create OU structure
1096         self.ldb_admin.create_ou(ou_dn)
1097         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1098         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1099         mod = "(A;;WP;;;%s)" % str(sid)
1100         self.sd_utils.dacl_add_ace(user_dn, mod)
1101         # Rename 'User object' having WP to AU
1102         self.ldb_user.rename(user_dn, rename_user_dn)
1103         res = self.ldb_admin.search(self.base_dn,
1104                 expression="(distinguishedName=%s)" % user_dn)
1105         self.assertEqual(len(res), 0)
1106         res = self.ldb_admin.search(self.base_dn,
1107                 expression="(distinguishedName=%s)" % rename_user_dn)
1108         self.assertNotEqual(len(res), 0)
1109
1110     def test_rename_u4(self):
1111         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1112         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1113         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1114         user_dn = "CN=test_rename_user2," + ou1_dn
1115         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1116         # Create OU structure
1117         self.ldb_admin.create_ou(ou1_dn)
1118         self.ldb_admin.create_ou(ou2_dn)
1119         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1120         mod = "(A;;WPSD;;;AU)"
1121         self.sd_utils.dacl_add_ace(user_dn, mod)
1122         mod = "(A;;CC;;;AU)"
1123         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1124         # Rename 'User object' having SD and CC to AU
1125         self.ldb_user.rename(user_dn, rename_user_dn)
1126         res = self.ldb_admin.search(self.base_dn,
1127                 expression="(distinguishedName=%s)" % user_dn)
1128         self.assertEqual(len(res), 0)
1129         res = self.ldb_admin.search(self.base_dn,
1130                 expression="(distinguishedName=%s)" % rename_user_dn)
1131         self.assertNotEqual(len(res), 0)
1132
1133     def test_rename_u5(self):
1134         """Test rename with rights granted to 'User object' SID"""
1135         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1136         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1137         user_dn = "CN=test_rename_user2," + ou1_dn
1138         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1139         # Create OU structure
1140         self.ldb_admin.create_ou(ou1_dn)
1141         self.ldb_admin.create_ou(ou2_dn)
1142         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1143         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1144         mod = "(A;;WPSD;;;%s)" % str(sid)
1145         self.sd_utils.dacl_add_ace(user_dn, mod)
1146         mod = "(A;;CC;;;%s)" % str(sid)
1147         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1148         # Rename 'User object' having SD and CC to AU
1149         self.ldb_user.rename(user_dn, rename_user_dn)
1150         res = self.ldb_admin.search(self.base_dn,
1151                 expression="(distinguishedName=%s)" % user_dn)
1152         self.assertEqual(len(res), 0)
1153         res = self.ldb_admin.search(self.base_dn,
1154                 expression="(distinguishedName=%s)" % rename_user_dn)
1155         self.assertNotEqual(len(res), 0)
1156
1157     def test_rename_u6(self):
1158         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1159         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1160         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1161         user_dn = "CN=test_rename_user2," + ou1_dn
1162         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1163         # Create OU structure
1164         self.ldb_admin.create_ou(ou1_dn)
1165         self.ldb_admin.create_ou(ou2_dn)
1166         #mod = "(A;CI;DCWP;;;AU)"
1167         mod = "(A;;DC;;;AU)"
1168         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1169         mod = "(A;;CC;;;AU)"
1170         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1171         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1172         mod = "(A;;WP;;;AU)"
1173         self.sd_utils.dacl_add_ace(user_dn, mod)
1174         # Rename 'User object' having SD and CC to AU
1175         self.ldb_user.rename(user_dn, rename_user_dn)
1176         res = self.ldb_admin.search(self.base_dn,
1177                 expression="(distinguishedName=%s)" % user_dn)
1178         self.assertEqual(len(res), 0)
1179         res = self.ldb_admin.search(self.base_dn,
1180                 expression="(distinguishedName=%s)" % rename_user_dn)
1181         self.assertNotEqual(len(res), 0)
1182
1183     def test_rename_u7(self):
1184         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1185         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1186         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1187         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1188         user_dn = "CN=test_rename_user2," + ou1_dn
1189         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1190         # Create OU structure
1191         self.ldb_admin.create_ou(ou1_dn)
1192         self.ldb_admin.create_ou(ou2_dn)
1193         self.ldb_admin.create_ou(ou3_dn)
1194         mod = "(A;CI;WPDC;;;AU)"
1195         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1196         mod = "(A;;CC;;;AU)"
1197         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1198         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1199         # Rename 'User object' having SD and CC to AU
1200         self.ldb_user.rename(user_dn, rename_user_dn)
1201         res = self.ldb_admin.search(self.base_dn,
1202                 expression="(distinguishedName=%s)" % user_dn)
1203         self.assertEqual(len(res), 0)
1204         res = self.ldb_admin.search(self.base_dn,
1205                 expression="(distinguishedName=%s)" % rename_user_dn)
1206         self.assertNotEqual(len(res), 0)
1207
1208     def test_rename_u8(self):
1209         """Test rename on an object with and without modify access on the RDN attribute"""
1210         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1211         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1212         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1213         # Create OU structure
1214         self.ldb_admin.create_ou(ou1_dn)
1215         self.ldb_admin.create_ou(ou2_dn)
1216         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1217         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1218         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1219         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1220         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1221         try:
1222             self.ldb_user.rename(ou2_dn, ou3_dn)
1223         except LdbError, (num, _):
1224             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1225         else:
1226             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1227             self.fail()
1228         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1229         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1230         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1231         self.ldb_user.rename(ou2_dn, ou3_dn)
1232         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1233         self.assertEqual(len(res), 0)
1234         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1235         self.assertNotEqual(len(res), 0)
1236
1237 #tests on Control Access Rights
1238 class AclCARTests(AclTests):
1239
1240     def setUp(self):
1241         super(AclCARTests, self).setUp()
1242         self.user_with_wp = "acl_car_user1"
1243         self.user_with_pc = "acl_car_user2"
1244         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1245         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1246         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1247         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1248
1249     def tearDown(self):
1250         super(AclCARTests, self).tearDown()
1251         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1252         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1253
1254     def test_change_password1(self):
1255         """Try a password change operation without any CARs given"""
1256         #users have change password by default - remove for negative testing
1257         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1258         sddl = desc.as_sddl(self.domain_sid)
1259         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1260         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1261         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1262         try:
1263             self.ldb_user.modify_ldif("""
1264 dn: """ + self.get_user_dn(self.user_with_wp) + """
1265 changetype: modify
1266 delete: unicodePwd
1267 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1268 add: unicodePwd
1269 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1270 """)
1271         except LdbError, (num, _):
1272             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1273         else:
1274             # for some reason we get constraint violation instead of insufficient access error
1275             self.fail()
1276
1277     def test_change_password2(self):
1278         """Make sure WP has no influence"""
1279         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1280         sddl = desc.as_sddl(self.domain_sid)
1281         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1282         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1283         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1284         mod = "(A;;WP;;;PS)"
1285         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1286         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1287         sddl = desc.as_sddl(self.domain_sid)
1288         try:
1289             self.ldb_user.modify_ldif("""
1290 dn: """ + self.get_user_dn(self.user_with_wp) + """
1291 changetype: modify
1292 delete: unicodePwd
1293 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1294 add: unicodePwd
1295 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1296 """)
1297         except LdbError, (num, _):
1298             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1299         else:
1300             # for some reason we get constraint violation instead of insufficient access error
1301             self.fail()
1302
1303     def test_change_password3(self):
1304         """Make sure WP has no influence"""
1305         mod = "(D;;WP;;;PS)"
1306         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1307         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1308         sddl = desc.as_sddl(self.domain_sid)
1309         self.ldb_user.modify_ldif("""
1310 dn: """ + self.get_user_dn(self.user_with_wp) + """
1311 changetype: modify
1312 delete: unicodePwd
1313 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1314 add: unicodePwd
1315 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1316 """)
1317
1318     def test_change_password5(self):
1319         """Make sure rights have no influence on dBCSPwd"""
1320         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1321         sddl = desc.as_sddl(self.domain_sid)
1322         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1323         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1324         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1325         mod = "(D;;WP;;;PS)"
1326         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1327         try:
1328             self.ldb_user.modify_ldif("""
1329 dn: """ + self.get_user_dn(self.user_with_wp) + """
1330 changetype: modify
1331 delete: dBCSPwd
1332 dBCSPwd: XXXXXXXXXXXXXXXX
1333 add: dBCSPwd
1334 dBCSPwd: YYYYYYYYYYYYYYYY
1335 """)
1336         except LdbError, (num, _):
1337             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1338         else:
1339             self.fail()
1340
1341     def test_change_password6(self):
1342         """Test uneven delete/adds"""
1343         try:
1344             self.ldb_user.modify_ldif("""
1345 dn: """ + self.get_user_dn(self.user_with_wp) + """
1346 changetype: modify
1347 delete: userPassword
1348 userPassword: thatsAcomplPASS1
1349 delete: userPassword
1350 userPassword: thatsAcomplPASS1
1351 add: userPassword
1352 userPassword: thatsAcomplPASS2
1353 """)
1354         except LdbError, (num, _):
1355             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1356         else:
1357             self.fail()
1358         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1359         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1360         try:
1361             self.ldb_user.modify_ldif("""
1362 dn: """ + self.get_user_dn(self.user_with_wp) + """
1363 changetype: modify
1364 delete: userPassword
1365 userPassword: thatsAcomplPASS1
1366 delete: userPassword
1367 userPassword: thatsAcomplPASS1
1368 add: userPassword
1369 userPassword: thatsAcomplPASS2
1370 """)
1371             # This fails on Windows 2000 domain level with constraint violation
1372         except LdbError, (num, _):
1373             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1374                             num == ERR_UNWILLING_TO_PERFORM)
1375         else:
1376             self.fail()
1377
1378
1379     def test_change_password7(self):
1380         """Try a password change operation without any CARs given"""
1381         #users have change password by default - remove for negative testing
1382         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1383         sddl = desc.as_sddl(self.domain_sid)
1384         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1385         #first change our own password
1386         self.ldb_user2.modify_ldif("""
1387 dn: """ + self.get_user_dn(self.user_with_pc) + """
1388 changetype: modify
1389 delete: unicodePwd
1390 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1391 add: unicodePwd
1392 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1393 """)
1394         #then someone else's
1395         self.ldb_user2.modify_ldif("""
1396 dn: """ + self.get_user_dn(self.user_with_wp) + """
1397 changetype: modify
1398 delete: unicodePwd
1399 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1400 add: unicodePwd
1401 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1402 """)
1403
1404     def test_reset_password1(self):
1405         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1406         try:
1407             self.ldb_user.modify_ldif("""
1408 dn: """ + self.get_user_dn(self.user_with_wp) + """
1409 changetype: modify
1410 replace: unicodePwd
1411 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1412 """)
1413         except LdbError, (num, _):
1414             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1415         else:
1416             self.fail()
1417         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1418         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1419         self.ldb_user.modify_ldif("""
1420 dn: """ + self.get_user_dn(self.user_with_wp) + """
1421 changetype: modify
1422 replace: unicodePwd
1423 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1424 """)
1425
1426     def test_reset_password2(self):
1427         """Try a user password reset operation (userPassword) before and after granting CAR"""
1428         try:
1429             self.ldb_user.modify_ldif("""
1430 dn: """ + self.get_user_dn(self.user_with_wp) + """
1431 changetype: modify
1432 replace: userPassword
1433 userPassword: thatsAcomplPASS1
1434 """)
1435         except LdbError, (num, _):
1436             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1437         else:
1438             self.fail()
1439         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1440         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1441         try:
1442             self.ldb_user.modify_ldif("""
1443 dn: """ + self.get_user_dn(self.user_with_wp) + """
1444 changetype: modify
1445 replace: userPassword
1446 userPassword: thatsAcomplPASS1
1447 """)
1448             # This fails on Windows 2000 domain level with constraint violation
1449         except LdbError, (num, _):
1450             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1451
1452     def test_reset_password3(self):
1453         """Grant WP and see what happens (unicodePwd)"""
1454         mod = "(A;;WP;;;PS)"
1455         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1456         try:
1457             self.ldb_user.modify_ldif("""
1458 dn: """ + self.get_user_dn(self.user_with_wp) + """
1459 changetype: modify
1460 replace: unicodePwd
1461 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1462 """)
1463         except LdbError, (num, _):
1464             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1465         else:
1466             self.fail()
1467
1468     def test_reset_password4(self):
1469         """Grant WP and see what happens (userPassword)"""
1470         mod = "(A;;WP;;;PS)"
1471         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1472         try:
1473             self.ldb_user.modify_ldif("""
1474 dn: """ + self.get_user_dn(self.user_with_wp) + """
1475 changetype: modify
1476 replace: userPassword
1477 userPassword: thatsAcomplPASS1
1478 """)
1479         except LdbError, (num, _):
1480             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1481         else:
1482             self.fail()
1483
1484     def test_reset_password5(self):
1485         """Explicitly deny WP but grant CAR (unicodePwd)"""
1486         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1487         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1488         self.ldb_user.modify_ldif("""
1489 dn: """ + self.get_user_dn(self.user_with_wp) + """
1490 changetype: modify
1491 replace: unicodePwd
1492 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1493 """)
1494
1495     def test_reset_password6(self):
1496         """Explicitly deny WP but grant CAR (userPassword)"""
1497         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1498         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1499         try:
1500             self.ldb_user.modify_ldif("""
1501 dn: """ + self.get_user_dn(self.user_with_wp) + """
1502 changetype: modify
1503 replace: userPassword
1504 userPassword: thatsAcomplPASS1
1505 """)
1506             # This fails on Windows 2000 domain level with constraint violation
1507         except LdbError, (num, _):
1508             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1509
1510 class AclExtendedTests(AclTests):
1511
1512     def setUp(self):
1513         super(AclExtendedTests, self).setUp()
1514         #regular user, will be the creator
1515         self.u1 = "ext_u1"
1516         #regular user
1517         self.u2 = "ext_u2"
1518         #admin user
1519         self.u3 = "ext_u3"
1520         self.ldb_admin.newuser(self.u1, self.user_pass)
1521         self.ldb_admin.newuser(self.u2, self.user_pass)
1522         self.ldb_admin.newuser(self.u3, self.user_pass)
1523         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1524                                                 add_members_operation=True)
1525         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1526         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1527         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1528         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1529         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1530
1531     def tearDown(self):
1532         super(AclExtendedTests, self).tearDown()
1533         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1534         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1535         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1536         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1537         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1538
1539     def test_ntSecurityDescriptor(self):
1540         #create empty ou
1541         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1542         #give u1 Create children access
1543         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1544         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1545         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1546         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1547         #create a group under that, grant RP to u2
1548         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1549                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1550         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1551         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1552         #u2 must not read the descriptor
1553         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1554                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1555         self.assertNotEqual(len(res), 0)
1556         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1557         #grant RC to u2 - still no access
1558         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1559         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1560         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1561                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1562         self.assertNotEqual(len(res), 0)
1563         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1564         #u3 is member of administrators group, should be able to read sd
1565         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1566                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1567         self.assertEqual(len(res),1)
1568         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1569
1570
1571 class AclSPNTests(AclTests):
1572
1573     def setUp(self):
1574         super(AclSPNTests, self).setUp()
1575         self.dcname = "TESTSRV8"
1576         self.rodcname = "TESTRODC8"
1577         self.computername = "testcomp8"
1578         self.test_user = "spn_test_user8"
1579         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1580         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1581         self.site = "Default-First-Site-Name"
1582         self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
1583             site=self.site, netbios_name=self.rodcname, targetdir=None,
1584             domain=None)
1585         self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
1586                 netbios_name=self.dcname, targetdir=None, domain=None)
1587         self.ldb_admin.newuser(self.test_user, self.user_pass)
1588         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1589         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1590         self.create_computer(self.computername, self.dcctx.dnsdomain)
1591         self.create_rodc(self.rodcctx)
1592         self.create_dc(self.dcctx)
1593
1594     def tearDown(self):
1595         super(AclSPNTests, self).tearDown()
1596         self.rodcctx.cleanup_old_join()
1597         self.dcctx.cleanup_old_join()
1598         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1599         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1600
1601     def replace_spn(self, _ldb, dn, spn):
1602         print "Setting spn %s on %s" % (spn, dn)
1603         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1604                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1605         if "servicePrincipalName" in res[0].keys():
1606             flag = FLAG_MOD_REPLACE
1607         else:
1608             flag = FLAG_MOD_ADD
1609
1610         msg = Message()
1611         msg.dn = Dn(self.ldb_admin, dn)
1612         msg["servicePrincipalName"] = MessageElement(spn, flag,
1613                                                          "servicePrincipalName")
1614         _ldb.modify(msg)
1615
1616     def create_computer(self, computername, domainname):
1617         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1618         samaccountname = computername + "$"
1619         dnshostname = "%s.%s" % (computername, domainname)
1620         self.ldb_admin.add({
1621             "dn": dn,
1622             "objectclass": "computer",
1623             "sAMAccountName": samaccountname,
1624             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1625             "dNSHostName": dnshostname})
1626
1627     # same as for join_RODC, but do not set any SPNs
1628     def create_rodc(self, ctx):
1629          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1630
1631          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1632                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1633                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1634                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1635                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1636          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1637
1638          mysid = ctx.get_mysid()
1639          admin_dn = "<SID=%s>" % mysid
1640          ctx.managedby = admin_dn
1641
1642          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1643                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1644                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1645
1646          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1647          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1648          ctx.RODC = True
1649          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1650                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1651                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1652                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1653                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1654
1655          ctx.join_add_objects()
1656
1657     def create_dc(self, ctx):
1658         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1659         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1660         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1661                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1662                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1663                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1664                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1665
1666         ctx.join_add_objects()
1667
1668     def dc_spn_test(self, ctx):
1669         netbiosdomain = self.dcctx.get_domain_name()
1670         try:
1671             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1672         except LdbError, (num, _):
1673             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1674
1675         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1676         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1677         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1678         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1679         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1680                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1681         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1682         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1683                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1684         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1685                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
1686         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1687         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1688                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1689         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1690         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1691         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1692                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1693         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1694         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1695                          (ctx.myname, ctx.dnsdomain))
1696         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1697                          (ctx.myname))
1698         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1699                          (ctx.myname, ctx.dnsdomain))
1700         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1701                          (ctx.myname, ctx.dnsdomain))
1702         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1703                          (ctx.ntds_guid, ctx.dnsdomain))
1704
1705         #the following spns do not match the restrictions and should fail
1706         try:
1707             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1708                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1709         except LdbError, (num, _):
1710             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1711         try:
1712             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1713                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1714         except LdbError, (num, _):
1715             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1716         try:
1717             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1718         except LdbError, (num, _):
1719             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1720         try:
1721             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1722                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1723         except LdbError, (num, _):
1724             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1725         try:
1726             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1727                              (ctx.ntds_guid, ctx.dnsdomain))
1728         except LdbError, (num, _):
1729             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1730
1731     def test_computer_spn(self):
1732         # with WP, any value can be set
1733         netbiosdomain = self.dcctx.get_domain_name()
1734         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1735                          (self.computername, netbiosdomain))
1736         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1737         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1738                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1739         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1740                          (self.computername, self.dcctx.dnsdomain))
1741         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1742                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1743         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1744                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1745         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1746         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1747                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1748         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1749                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1750         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1751                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1752         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1753         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1754                          (self.computername, self.dcctx.dnsdomain))
1755         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1756                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1757         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1758                          (self.computername, self.dcctx.dnsdomain))
1759         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1760                          (self.computername, self.dcctx.dnsdomain))
1761         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1762                          (self.computername))
1763         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1764                          (self.computername, self.dcctx.dnsdomain))
1765         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1766                          (self.computername, self.dcctx.dnsdomain))
1767         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1768
1769         #user has neither WP nor Validated-SPN, access denied expected
1770         try:
1771             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1772         except LdbError, (num, _):
1773             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1774
1775         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1776         self.sd_utils.dacl_add_ace(self.computerdn, mod)
1777         #grant Validated-SPN and check which values are accepted
1778         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1779
1780         # for regular computer objects we shouldalways get constraint violation
1781
1782         # This does not pass against Windows, although it should according to docs
1783         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
1784         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
1785                              (self.computername, self.dcctx.dnsdomain))
1786
1787         try:
1788             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1789         except LdbError, (num, _):
1790             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1791         try:
1792             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1793                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1794         except LdbError, (num, _):
1795             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1796         try:
1797             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
1798                              (self.computername, self.dcctx.dnsdomain))
1799         except LdbError, (num, _):
1800             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1801         try:
1802             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1803                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1804         except LdbError, (num, _):
1805             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1806         try:
1807             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
1808                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1809         except LdbError, (num, _):
1810             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1811         try:
1812             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1813         except LdbError, (num, _):
1814             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1815         try:
1816             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1817                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1818         except LdbError, (num, _):
1819             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1820
1821     def test_spn_rwdc(self):
1822         self.dc_spn_test(self.dcctx)
1823
1824     def test_spn_rodc(self):
1825         self.dc_spn_test(self.rodcctx)
1826
1827
1828 # Important unit running information
1829
1830 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
1831
1832 runner = SubunitTestRunner()
1833 rc = 0
1834 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1835     rc = 1
1836 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1837     rc = 1
1838 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1839     rc = 1
1840 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1841     rc = 1
1842
1843 # Get the old "dSHeuristics" if it was set
1844 dsheuristics = ldb.get_dsheuristics()
1845 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1846 ldb.set_dsheuristics("000000001")
1847 # Get the old "minPwdAge"
1848 minPwdAge = ldb.get_minPwdAge()
1849 # Set it temporarely to "0"
1850 ldb.set_minPwdAge("0")
1851 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1852     rc = 1
1853 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1854     rc = 1
1855 # Reset the "dSHeuristics" as they were before
1856 ldb.set_dsheuristics(dsheuristics)
1857 # Reset the "minPwdAge" as it was before
1858 ldb.set_minPwdAge(minPwdAge)
1859
1860 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1861     rc = 1
1862 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
1863     rc = 1
1864 sys.exit(rc)