s4-tests: Acl tests now use the get_dsheuristics and set_dsheuristics from SamDB.
[kai/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9
10 sys.path.append("bin/python")
11 import samba
12 samba.ensure_external_module("testtools", "testtools")
13 samba.ensure_external_module("subunit", "subunit/python")
14
15 import samba.getopt as options
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
24 from samba.ndr import ndr_pack, ndr_unpack
25 from samba.dcerpc import security
26
27 from samba.auth import system_session
28 from samba import gensec
29 from samba.samdb import SamDB
30 from samba.credentials import Credentials
31 import samba.tests
32 from subunit.run import SubunitTestRunner
33 import unittest
34
35 parser = optparse.OptionParser("acl.py [options] <host>")
36 sambaopts = options.SambaOptions(parser)
37 parser.add_option_group(sambaopts)
38 parser.add_option_group(options.VersionOptions(parser))
39
40 # use command line creds if available
41 credopts = options.CredentialsOptions(parser)
42 parser.add_option_group(credopts)
43 opts, args = parser.parse_args()
44
45 if len(args) < 1:
46     parser.print_usage()
47     sys.exit(1)
48
49 host = args[0]
50
51 lp = sambaopts.get_loadparm()
52 creds = credopts.get_credentials(lp)
53 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
54
55 #
56 # Tests start here
57 #
58
59 class AclTests(samba.tests.TestCase):
60
61     def delete_force(self, ldb, dn):
62         try:
63             ldb.delete(dn)
64         except LdbError, (num, _):
65             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
66
67     def find_domain_sid(self, ldb):
68         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
69         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
70
71     def setUp(self):
72         super(AclTests, self).setUp()
73         self.ldb_admin = ldb
74         self.base_dn = ldb.domain_dn()
75         self.domain_sid = self.find_domain_sid(self.ldb_admin)
76         self.user_pass = "samba123@"
77         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
78         print "baseDN: %s" % self.base_dn
79
80     def get_user_dn(self, name):
81         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
82
83     def modify_desc(self, object_dn, desc):
84         """ Modify security descriptor using either SDDL string
85             or security.descriptor object
86         """
87         assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
88         mod = """
89 dn: """ + object_dn + """
90 changetype: modify
91 replace: nTSecurityDescriptor
92 """
93         if isinstance(desc, str):
94             mod += "nTSecurityDescriptor: %s" % desc
95         elif isinstance(desc, security.descriptor):
96             mod += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
97         self.ldb_admin.modify_ldif(mod)
98     
99     def create_ou(self, _ldb, ou_dn, desc=None):
100         ldif = """
101 dn: """ + ou_dn + """
102 ou: """ + ou_dn.split(",")[0][3:] + """
103 objectClass: organizationalUnit
104 url: www.example.com
105 """
106         if desc:
107             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
108             if isinstance(desc, str):
109                 ldif += "nTSecurityDescriptor: %s" % desc
110             elif isinstance(desc, security.descriptor):
111                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
112         _ldb.add_ldif(ldif)
113
114     def create_active_user(self, _ldb, user_dn):
115         ldif = """
116 dn: """ + user_dn + """
117 sAMAccountName: """ + user_dn.split(",")[0][3:] + """
118 objectClass: user
119 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
120 url: www.example.com
121 """
122         _ldb.add_ldif(ldif)
123
124     def create_test_user(self, _ldb, user_dn, desc=None):
125         ldif = """
126 dn: """ + user_dn + """
127 sAMAccountName: """ + user_dn.split(",")[0][3:] + """
128 objectClass: user
129 userPassword: """ + self.user_pass + """
130 url: www.example.com
131 """
132         if desc:
133             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
134             if isinstance(desc, str):
135                 ldif += "nTSecurityDescriptor: %s" % desc
136             elif isinstance(desc, security.descriptor):
137                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
138         _ldb.add_ldif(ldif)
139
140     def create_group(self, _ldb, group_dn, desc=None):
141         ldif = """
142 dn: """ + group_dn + """
143 objectClass: group
144 sAMAccountName: """ + group_dn.split(",")[0][3:] + """
145 groupType: 4
146 url: www.example.com
147 """
148         if desc:
149             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
150             if isinstance(desc, str):
151                 ldif += "nTSecurityDescriptor: %s" % desc
152             elif isinstance(desc, security.descriptor):
153                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
154         _ldb.add_ldif(ldif)
155
156     def create_security_group(self, _ldb, group_dn, desc=None):
157         ldif = """
158 dn: """ + group_dn + """
159 objectClass: group
160 sAMAccountName: """ + group_dn.split(",")[0][3:] + """
161 groupType: -2147483646
162 url: www.example.com
163 """
164         if desc:
165             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
166             if isinstance(desc, str):
167                 ldif += "nTSecurityDescriptor: %s" % desc
168             elif isinstance(desc, security.descriptor):
169                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
170         _ldb.add_ldif(ldif)
171
172     def read_desc(self, object_dn):
173         res = self.ldb_admin.search(object_dn, SCOPE_BASE, None, ["nTSecurityDescriptor"])
174         desc = res[0]["nTSecurityDescriptor"][0]
175         return ndr_unpack(security.descriptor, desc)
176
177     def get_ldb_connection(self, target_username, target_password):
178         creds_tmp = Credentials()
179         creds_tmp.set_username(target_username)
180         creds_tmp.set_password(target_password)
181         creds_tmp.set_domain(creds.get_domain())
182         creds_tmp.set_realm(creds.get_realm())
183         creds_tmp.set_workstation(creds.get_workstation())
184         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
185                                       | gensec.FEATURE_SEAL)
186         ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
187         return ldb_target
188
189     def get_object_sid(self, object_dn):
190         res = self.ldb_admin.search(object_dn)
191         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
192
193     def dacl_add_ace(self, object_dn, ace):
194         desc = self.read_desc(object_dn)
195         desc_sddl = desc.as_sddl(self.domain_sid)
196         if ace in desc_sddl:
197             return
198         if desc_sddl.find("(") >= 0:
199             desc_sddl = desc_sddl[:desc_sddl.index("(")] + ace + desc_sddl[desc_sddl.index("("):]
200         else:
201             desc_sddl = desc_sddl + ace
202         self.modify_desc(object_dn, desc_sddl)
203
204     def get_desc_sddl(self, object_dn):
205         """ Return object nTSecutiryDescriptor in SDDL format
206         """
207         desc = self.read_desc(object_dn)
208         return desc.as_sddl(self.domain_sid)
209
210     # Test if we have any additional groups for users than default ones
211     def assert_user_no_group_member(self, username):
212         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
213         try:
214             self.assertEqual(res[0]["memberOf"][0], "")
215         except KeyError:
216             pass
217         else:
218             self.fail()
219     
220     def create_enable_user(self, username):
221         self.create_active_user(self.ldb_admin, self.get_user_dn(username))
222         self.ldb_admin.enable_account("(sAMAccountName=" + username + ")")
223
224 #tests on ldap add operations
225 class AclAddTests(AclTests):
226
227     def setUp(self):
228         super(AclAddTests, self).setUp()
229         # Domain admin that will be creator of OU parent-child structure
230         self.usr_admin_owner = "acl_add_user1"
231         # Second domain admin that will not be creator of OU parent-child structure
232         self.usr_admin_not_owner = "acl_add_user2"
233         # Regular user
234         self.regular_user = "acl_add_user3"
235         self.create_enable_user(self.usr_admin_owner)
236         self.create_enable_user(self.usr_admin_not_owner)
237         self.create_enable_user(self.regular_user)
238
239         # add admins to the Domain Admins group
240         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
241                        add_members_operation=True)
242         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
243                        add_members_operation=True)
244
245         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
246         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
247         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
248
249     def tearDown(self):
250         super(AclAddTests, self).tearDown()
251         self.delete_force(self.ldb_admin, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
252         self.delete_force(self.ldb_admin, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
253         self.delete_force(self.ldb_admin, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
254         self.delete_force(self.ldb_admin, "OU=test_add_ou1," + self.base_dn)
255         self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
256         self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
257         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
258
259     # Make sure top OU is deleted (and so everything under it)
260     def assert_top_ou_deleted(self):
261         res = self.ldb_admin.search(self.base_dn,
262             expression="(distinguishedName=%s,%s)" % (
263                 "OU=test_add_ou1", self.base_dn))
264         self.assertEqual(res, [])
265
266     def test_add_u1(self):
267         """Testing OU with the rights of Doman Admin not creator of the OU """
268         self.assert_top_ou_deleted()
269         # Change descriptor for top level OU
270         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
271         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
272         user_sid = self.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
273         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
274         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
275         # Test user and group creation with another domain admin's credentials
276         self.create_test_user(self.ldb_notowner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
277         self.create_group(self.ldb_notowner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
278         # Make sure we HAVE created the two objects -- user and group
279         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
280         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
281         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
282         self.assertTrue(len(res) > 0)
283         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
284         self.assertTrue(len(res) > 0)
285
286     def test_add_u2(self):
287         """Testing OU with the regular user that has no rights granted over the OU """
288         self.assert_top_ou_deleted()
289         # Create a parent-child OU structure with domain admin credentials
290         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
291         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
292         # Test user and group creation with regular user credentials
293         try:
294             self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
295             self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
296         except LdbError, (num, _):
297             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
298         else:
299             self.fail()
300         # Make sure we HAVEN'T created any of two objects -- user or group
301         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
302         self.assertEqual(res, [])
303         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
304         self.assertEqual(res, [])
305
306     def test_add_u3(self):
307         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
308         self.assert_top_ou_deleted()
309         # Change descriptor for top level OU
310         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
311         user_sid = self.get_object_sid(self.get_user_dn(self.regular_user))
312         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
313         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
314         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
315         # Test user and group creation with granted user only to one of the objects
316         self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
317         try:
318             self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
319         except LdbError, (num, _):
320             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
321         else:
322             self.fail()
323         # Make sure we HAVE created the one of two objects -- user
324         res = self.ldb_admin.search(self.base_dn,
325                 expression="(distinguishedName=%s,%s)" %
326                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
327                     self.base_dn))
328         self.assertNotEqual(len(res), 0)
329         res = self.ldb_admin.search(self.base_dn,
330                 expression="(distinguishedName=%s,%s)" %
331                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
332                     self.base_dn) )
333         self.assertEqual(res, [])
334
335     def test_add_u4(self):
336         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
337         self.assert_top_ou_deleted()
338         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
339         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
340         self.create_test_user(self.ldb_owner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
341         self.create_group(self.ldb_owner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
342         # Make sure we have successfully created the two objects -- user and group
343         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
344         self.assertTrue(len(res) > 0)
345         res = self.ldb_admin.search(self.base_dn,
346                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
347         self.assertTrue(len(res) > 0)
348
349 #tests on ldap modify operations
350 class AclModifyTests(AclTests):
351
352     def setUp(self):
353         super(AclModifyTests, self).setUp()
354         self.user_with_wp = "acl_mod_user1"
355         self.user_with_sm = "acl_mod_user2"
356         self.user_with_group_sm = "acl_mod_user3"
357         self.create_enable_user(self.user_with_wp)
358         self.create_enable_user(self.user_with_sm)
359         self.create_enable_user(self.user_with_group_sm)
360         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
361         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
362         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
363         self.user_sid = self.get_object_sid( self.get_user_dn(self.user_with_wp))
364         self.create_group(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
365         self.create_group(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
366         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user2"))
367
368     def tearDown(self):
369         super(AclModifyTests, self).tearDown()
370         self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
371         self.delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
372         self.delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
373         self.delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
374         self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
375         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
376         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
377         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
378         self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
379
380     def test_modify_u1(self):
381         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
382         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
383         # First test object -- User
384         print "Testing modify on User object"
385         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
386         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
387         ldif = """
388 dn: """ + self.get_user_dn("test_modify_user1") + """
389 changetype: modify
390 replace: displayName
391 displayName: test_changed"""
392         self.ldb_user.modify_ldif(ldif)
393         res = self.ldb_admin.search(self.base_dn,
394                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
395         self.assertEqual(res[0]["displayName"][0], "test_changed")
396         # Second test object -- Group
397         print "Testing modify on Group object"
398         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
399         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
400         ldif = """
401 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
402 changetype: modify
403 replace: displayName
404 displayName: test_changed"""
405         self.ldb_user.modify_ldif(ldif)
406         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
407         self.assertEqual(res[0]["displayName"][0], "test_changed")
408         # Third test object -- Organizational Unit
409         print "Testing modify on OU object"
410         #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
411         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
412         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
413         ldif = """
414 dn: OU=test_modify_ou1,""" + self.base_dn + """
415 changetype: modify
416 replace: displayName
417 displayName: test_changed"""
418         self.ldb_user.modify_ldif(ldif)
419         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
420         self.assertEqual(res[0]["displayName"][0], "test_changed")
421
422     def test_modify_u2(self):
423         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
424         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
425         # First test object -- User
426         print "Testing modify on User object"
427         #self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
428         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
429         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
430         # Modify on attribute you have rights for
431         ldif = """
432 dn: """ + self.get_user_dn("test_modify_user1") + """
433 changetype: modify
434 replace: displayName
435 displayName: test_changed"""
436         self.ldb_user.modify_ldif(ldif)
437         res = self.ldb_admin.search(self.base_dn,
438                 expression="(distinguishedName=%s)" %
439                 self.get_user_dn("test_modify_user1"))
440         self.assertEqual(res[0]["displayName"][0], "test_changed")
441         # Modify on attribute you do not have rights for granted
442         ldif = """
443 dn: """ + self.get_user_dn("test_modify_user1") + """
444 changetype: modify
445 replace: url
446 url: www.samba.org"""
447         try:
448             self.ldb_user.modify_ldif(ldif)
449         except LdbError, (num, _):
450             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
451         else:
452             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
453             self.fail()
454         # Second test object -- Group
455         print "Testing modify on Group object"
456         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
457         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
458         ldif = """
459 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
460 changetype: modify
461 replace: displayName
462 displayName: test_changed"""
463         self.ldb_user.modify_ldif(ldif)
464         res = self.ldb_admin.search(self.base_dn,
465                 expression="(distinguishedName=%s)" %
466                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
467         self.assertEqual(res[0]["displayName"][0], "test_changed")
468         # Modify on attribute you do not have rights for granted
469         ldif = """
470 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
471 changetype: modify
472 replace: url
473 url: www.samba.org"""
474         try:
475             self.ldb_user.modify_ldif(ldif)
476         except LdbError, (num, _):
477             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
478         else:
479             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
480             self.fail()
481         # Second test object -- Organizational Unit
482         print "Testing modify on OU object"
483         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
484         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
485         ldif = """
486 dn: OU=test_modify_ou1,""" + self.base_dn + """
487 changetype: modify
488 replace: displayName
489 displayName: test_changed"""
490         self.ldb_user.modify_ldif(ldif)
491         res = self.ldb_admin.search(self.base_dn,
492                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
493                     + self.base_dn))
494         self.assertEqual(res[0]["displayName"][0], "test_changed")
495         # Modify on attribute you do not have rights for granted
496         ldif = """
497 dn: OU=test_modify_ou1,""" + self.base_dn + """
498 changetype: modify
499 replace: url
500 url: www.samba.org"""
501         try:
502             self.ldb_user.modify_ldif(ldif)
503         except LdbError, (num, _):
504             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
505         else:
506             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
507             self.fail()
508
509     def test_modify_u3(self):
510         """7 Modify one attribute as you have no what so ever rights granted"""
511         # First test object -- User
512         print "Testing modify on User object"
513         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
514         # Modify on attribute you do not have rights for granted
515         ldif = """
516 dn: """ + self.get_user_dn("test_modify_user1") + """
517 changetype: modify
518 replace: url
519 url: www.samba.org"""
520         try:
521             self.ldb_user.modify_ldif(ldif)
522         except LdbError, (num, _):
523             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
524         else:
525             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
526             self.fail()
527
528         # Second test object -- Group
529         print "Testing modify on Group object"
530         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
531         # Modify on attribute you do not have rights for granted
532         ldif = """
533 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
534 changetype: modify
535 replace: url
536 url: www.samba.org"""
537         try:
538             self.ldb_user.modify_ldif(ldif)
539         except LdbError, (num, _):
540             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
541         else:
542             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
543             self.fail()
544
545         # Second test object -- Organizational Unit
546         print "Testing modify on OU object"
547         #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
548         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
549         # Modify on attribute you do not have rights for granted
550         ldif = """
551 dn: OU=test_modify_ou1,""" + self.base_dn + """
552 changetype: modify
553 replace: url
554 url: www.samba.org"""
555         try:
556             self.ldb_user.modify_ldif(ldif)
557         except LdbError, (num, _):
558             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
559         else:
560             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
561             self.fail()
562
563
564     def test_modify_u4(self):
565         """11 Grant WP to PRINCIPAL_SELF and test modify"""
566         ldif = """
567 dn: """ + self.get_user_dn(self.user_with_wp) + """
568 changetype: modify
569 add: adminDescription
570 adminDescription: blah blah blah"""
571         try:
572             self.ldb_user.modify_ldif(ldif)
573         except LdbError, (num, _):
574             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
575         else:
576             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
577             self.fail()
578
579         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
580         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
581         # Modify on attribute you have rights for
582         self.ldb_user.modify_ldif(ldif)
583         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
584                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
585         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
586
587     def test_modify_u5(self):
588         """12 test self membership"""
589         ldif = """
590 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
591 changetype: modify
592 add: Member
593 Member: """ +  self.get_user_dn(self.user_with_sm)
594 #the user has no rights granted, this should fail
595         try:
596             self.ldb_user2.modify_ldif(ldif)
597         except LdbError, (num, _):
598             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
599         else:
600             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
601             self.fail()
602
603 #grant self-membership, should be able to add himself
604         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
605         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
606         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
607         self.ldb_user2.modify_ldif(ldif)
608         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
609                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
610         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
611 #but not other users
612         ldif = """
613 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
614 changetype: modify
615 add: Member
616 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
617         try:
618             self.ldb_user2.modify_ldif(ldif)
619         except LdbError, (num, _):
620             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
621         else:
622             self.fail()
623
624     def test_modify_u6(self):
625         """13 test self membership"""
626         ldif = """
627 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
628 changetype: modify
629 add: Member
630 Member: """ +  self.get_user_dn(self.user_with_sm) + """
631 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
632
633 #grant self-membership, should be able to add himself  but not others at the same time
634         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
635         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
636         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
637         try:
638             self.ldb_user2.modify_ldif(ldif)
639         except LdbError, (num, _):
640             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
641         else:
642             self.fail()
643
644     def test_modify_u7(self):
645         """13 User with WP modifying Member"""
646 #a second user is given write property permission
647         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_wp))
648         mod = "(A;;WP;;;%s)" % str(user_sid)
649         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
650         ldif = """
651 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
652 changetype: modify
653 add: Member
654 Member: """ +  self.get_user_dn(self.user_with_wp)
655         self.ldb_user.modify_ldif(ldif)
656         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
657                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
658         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
659         ldif = """
660 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
661 changetype: modify
662 delete: Member"""
663         self.ldb_user.modify_ldif(ldif)
664         ldif = """
665 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
666 changetype: modify
667 add: Member
668 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
669         self.ldb_user.modify_ldif(ldif)
670         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
671                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
672         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
673
674 #enable these when we have search implemented
675 class AclSearchTests(AclTests):
676
677     def setUp(self):
678         super(AclSearchTests, self).setUp()
679         self.u1 = "search_u1"
680         self.u2 = "search_u2"
681         self.u3 = "search_u3"
682         self.group1 = "group1"
683         self.creds_tmp = Credentials()
684         self.creds_tmp.set_username("")
685         self.creds_tmp.set_password("")
686         self.creds_tmp.set_domain(creds.get_domain())
687         self.creds_tmp.set_realm(creds.get_realm())
688         self.creds_tmp.set_workstation(creds.get_workstation())
689         self.anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
690         self.dsheuristics = self.ldb_admin.get_dsheuristics()
691         self.create_enable_user(self.u1)
692         self.create_enable_user(self.u2)
693         self.create_enable_user(self.u3)
694         self.create_security_group(self.ldb_admin, self.get_user_dn(self.group1))
695         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
696                                                 add_members_operation=True)
697         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
698         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
699         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
700         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
701                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
702                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
703                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
704                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
705                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
706         self.user_sid = self.get_object_sid(self.get_user_dn(self.u1))
707         self.group_sid = self.get_object_sid(self.get_user_dn(self.group1))
708
709     def create_clean_ou(self, object_dn):
710         """ Base repeating setup for unittests to follow """
711         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
712                 expression="distinguishedName=%s" % object_dn)
713         # Make sure top testing OU has been deleted before starting the test
714         self.assertEqual(res, [])
715         self.create_ou(self.ldb_admin, object_dn)
716         desc_sddl = self.get_desc_sddl(object_dn)
717         # Make sure there are inheritable ACEs initially
718         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
719         # Find and remove all inherit ACEs
720         res = re.findall("\(.*?\)", desc_sddl)
721         res = [x for x in res if ("CI" in x) or ("OI" in x)]
722         for x in res:
723             desc_sddl = desc_sddl.replace(x, "")
724         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
725         # can propagate from above
726         # remove SACL, we are not interested
727         desc_sddl = desc_sddl.replace(":AI", ":AIP")
728         self.modify_desc(object_dn, desc_sddl)
729         # Verify all inheritable ACEs are gone
730         desc_sddl = self.get_desc_sddl(object_dn)
731         self.assertFalse("CI" in desc_sddl)
732         self.assertFalse("OI" in desc_sddl)
733
734     def tearDown(self):
735         super(AclSearchTests, self).tearDown()
736         self.delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
737         self.delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
738         self.delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
739         self.delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
740         self.delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
741         self.delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
742         self.delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
743         self.delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
744         self.delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
745         self.delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
746         self.delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
747         self.delete_force(self.ldb_admin, self.get_user_dn("group1"))
748
749     def test_search_anonymous1(self):
750         """Verify access of rootDSE with the correct request"""
751         res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
752         self.assertEquals(len(res), 1)
753         #verify some of the attributes
754         #dont care about values
755         self.assertTrue("ldapServiceName" in res[0])
756         self.assertTrue("namingContexts" in res[0])
757         self.assertTrue("isSynchronized" in res[0])
758         self.assertTrue("dsServiceName" in res[0])
759         self.assertTrue("supportedSASLMechanisms" in res[0])
760         self.assertTrue("isGlobalCatalogReady" in res[0])
761         self.assertTrue("domainControllerFunctionality" in res[0])
762         self.assertTrue("serverName" in res[0])
763
764     def test_search_anonymous2(self):
765         """Make sure we cannot access anything else"""
766         try:
767             res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
768         except LdbError, (num, _):
769             self.assertEquals(num, ERR_OPERATIONS_ERROR)
770         else:
771             self.fail()
772         try:
773             res = self.anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
774         except LdbError, (num, _):
775             self.assertEquals(num, ERR_OPERATIONS_ERROR)
776         else:
777             self.fail()
778         try:
779             res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
780                                         scope=SCOPE_SUBTREE)
781         except LdbError, (num, _):
782             self.assertEquals(num, ERR_OPERATIONS_ERROR)
783         else:
784             self.fail()
785
786     def test_search_anonymous3(self):
787         """Set dsHeuristics and repeat"""
788         self.ldb_admin.set_dsheuristics("0000002")
789         self.create_ou(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
790         mod = "(A;CI;LC;;;AN)"
791         self.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
792         self.create_ou(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
793         res = self.anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
794                                     expression="(objectClass=*)", scope=SCOPE_SUBTREE)
795         self.assertEquals(len(res), 1)
796         self.assertTrue("dn" in res[0])
797         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
798                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
799         res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
800                                     scope=SCOPE_SUBTREE)
801         self.assertEquals(len(res), 1)
802         self.assertTrue("dn" in res[0])
803         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
804         self.ldb_admin.set_dsheuristics(self.dsheuristics)
805
806     def test_search1(self):
807         """Make sure users can see us if given LC to user and group"""
808         self.create_clean_ou("OU=ou1," + self.base_dn)
809         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
810         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
811         self.create_ou(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn,
812                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
813         self.create_ou(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
814                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
815         self.create_ou(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
816                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
817         self.create_ou(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
818                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
819         self.create_ou(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
820                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
821
822         #regular users must see only ou1 and ou2
823         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
824                                     scope=SCOPE_SUBTREE)
825         self.assertEquals(len(res), 2)
826         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
827                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
828
829         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
830         self.assertEquals(sorted(res_list), sorted(ok_list))
831
832         #these users should see all ous
833         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
834                                     scope=SCOPE_SUBTREE)
835         self.assertEquals(len(res), 6)
836         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
837         self.assertEquals(sorted(res_list), sorted(self.full_list))
838
839         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
840                                     scope=SCOPE_SUBTREE)
841         self.assertEquals(len(res), 6)
842         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
843         self.assertEquals(sorted(res_list), sorted(self.full_list))
844
845     def test_search2(self):
846         """Make sure users can't see us if access is explicitly denied"""
847         self.create_clean_ou("OU=ou1," + self.base_dn)
848         self.create_ou(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
849         self.create_ou(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
850         self.create_ou(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
851         self.create_ou(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
852         self.create_ou(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
853         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
854         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
855         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
856                                     scope=SCOPE_SUBTREE)
857         #this user should see all ous
858         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
859         self.assertEquals(sorted(res_list), sorted(self.full_list))
860
861         #these users should see ou1, 2, 5 and 6 but not 3 and 4
862         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
863                                     scope=SCOPE_SUBTREE)
864         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
865                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
866                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
867                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
868         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
869         self.assertEquals(sorted(res_list), sorted(ok_list))
870
871         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
872                                     scope=SCOPE_SUBTREE)
873         self.assertEquals(len(res), 4)
874         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
875         self.assertEquals(sorted(res_list), sorted(ok_list))
876
877     def test_search3(self):
878         """Make sure users can't see ous if access is explicitly denied - 2"""
879         self.create_clean_ou("OU=ou1," + self.base_dn)
880         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
881         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
882         self.create_ou(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn,
883                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
884         self.create_ou(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
885                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
886         self.create_ou(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
887                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
888         self.create_ou(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
889                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
890         self.create_ou(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
891                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
892
893         print "Testing correct behavior on nonaccessible search base"
894         try:
895              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
896                                    scope=SCOPE_BASE)
897         except LdbError, (num, _):
898             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
899         else:
900             self.fail()
901
902         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
903         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
904
905         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
906                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
907
908         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
909                                     scope=SCOPE_SUBTREE)
910         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
911         self.assertEquals(sorted(res_list), sorted(ok_list))
912
913         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
914                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
915                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
916                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
917
918         #should not see ou3 and ou4, but should see ou5 and ou6
919         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
920                                     scope=SCOPE_SUBTREE)
921         self.assertEquals(len(res), 4)
922         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
923         self.assertEquals(sorted(res_list), sorted(ok_list))
924
925         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
926                                     scope=SCOPE_SUBTREE)
927         self.assertEquals(len(res), 4)
928         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
929         self.assertEquals(sorted(res_list), sorted(ok_list))
930
931     def test_search4(self):
932         """There is no difference in visibility if the user is also creator"""
933         self.create_clean_ou("OU=ou1," + self.base_dn)
934         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
935         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
936         self.create_ou(self.ldb_user, "OU=ou2,OU=ou1," + self.base_dn,
937                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
938         self.create_ou(self.ldb_user, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
939                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
940         self.create_ou(self.ldb_user, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
941                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
942         self.create_ou(self.ldb_user, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
943                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
944         self.create_ou(self.ldb_user, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
945                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
946
947         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
948                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
949         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
950                                     scope=SCOPE_SUBTREE)
951         self.assertEquals(len(res), 2)
952         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
953         self.assertEquals(sorted(res_list), sorted(ok_list))
954
955         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
956                                     scope=SCOPE_SUBTREE)
957         self.assertEquals(len(res), 2)
958         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
959         self.assertEquals(sorted(res_list), sorted(ok_list))
960
961     def test_search5(self):
962         """Make sure users can see only attributes they are allowed to see"""
963         self.create_clean_ou("OU=ou1," + self.base_dn)
964         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
965         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
966         self.create_ou(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn,
967                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
968         # assert user can only see dn
969         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
970                                     scope=SCOPE_SUBTREE)
971         ok_list = ['dn']
972         self.assertEquals(len(res), 1)
973         res_list = res[0].keys()
974         self.assertEquals(res_list, ok_list)
975
976         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
977                                     scope=SCOPE_BASE, attrs=["ou"])
978
979         self.assertEquals(len(res), 1)
980         res_list = res[0].keys()
981         self.assertEquals(res_list, ok_list)
982
983         #give read property on ou and assert user can only see dn and ou
984         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
985         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
986         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
987         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
988                                     scope=SCOPE_SUBTREE)
989         ok_list = ['dn', 'ou']
990         self.assertEquals(len(res), 1)
991         res_list = res[0].keys()
992         self.assertEquals(sorted(res_list), sorted(ok_list))
993
994         #give read property on Public Information and assert user can see ou and other members
995         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
996         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
997         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
998         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
999                                     scope=SCOPE_SUBTREE)
1000
1001         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
1002         res_list = res[0].keys()
1003         self.assertEquals(sorted(res_list), sorted(ok_list))
1004
1005     def test_search6(self):
1006         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
1007         self.create_clean_ou("OU=ou1," + self.base_dn)
1008         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
1009         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
1010         self.create_ou(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn,
1011                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
1012         self.create_ou(self.ldb_user, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
1013                        "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
1014
1015         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
1016                                     scope=SCOPE_SUBTREE)
1017         #nothing should be returned as ou is not accessible
1018         self.assertEquals(res, [])
1019
1020         #give read property on ou and assert user can only see dn and ou
1021         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
1022         self.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
1023         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
1024                                     scope=SCOPE_SUBTREE)
1025         self.assertEquals(len(res), 1)
1026         ok_list = ['dn', 'ou']
1027         res_list = res[0].keys()
1028         self.assertEquals(sorted(res_list), sorted(ok_list))
1029
1030         #give read property on Public Information and assert user can see ou and other members
1031         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
1032         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
1033         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
1034                                    scope=SCOPE_SUBTREE)
1035         self.assertEquals(len(res), 1)
1036         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
1037         res_list = res[0].keys()
1038         self.assertEquals(sorted(res_list), sorted(ok_list))
1039
1040 #tests on ldap delete operations
1041 class AclDeleteTests(AclTests):
1042
1043     def setUp(self):
1044         super(AclDeleteTests, self).setUp()
1045         self.regular_user = "acl_delete_user1"
1046             # Create regular user
1047         self.create_enable_user(self.regular_user)
1048         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1049
1050     def tearDown(self):
1051         super(AclDeleteTests, self).tearDown()
1052         self.delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
1053         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1054
1055     def test_delete_u1(self):
1056         """User is prohibited by default to delete another User object"""
1057         # Create user that we try to delete
1058         self.create_test_user(self.ldb_admin, self.get_user_dn("test_delete_user1"))
1059         # Here delete User object should ALWAYS through exception
1060         try:
1061             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
1062         except LdbError, (num, _):
1063             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1064         else:
1065             self.fail()
1066
1067     def test_delete_u2(self):
1068         """User's group has RIGHT_DELETE to another User object"""
1069         user_dn = self.get_user_dn("test_delete_user1")
1070         # Create user that we try to delete
1071         self.create_test_user(self.ldb_admin, user_dn)
1072         mod = "(A;;SD;;;AU)"
1073         self.dacl_add_ace(user_dn, mod)
1074         # Try to delete User object
1075         self.ldb_user.delete(user_dn)
1076         res = self.ldb_admin.search(self.base_dn,
1077                 expression="(distinguishedName=%s)" % user_dn)
1078         self.assertEqual(res, [])
1079
1080     def test_delete_u3(self):
1081         """User indentified by SID has RIGHT_DELETE to another User object"""
1082         user_dn = self.get_user_dn("test_delete_user1")
1083         # Create user that we try to delete
1084         self.create_test_user(self.ldb_admin, user_dn)
1085         mod = "(A;;SD;;;%s)" % self.get_object_sid(self.get_user_dn(self.regular_user))
1086         self.dacl_add_ace(user_dn, mod)
1087         # Try to delete User object
1088         self.ldb_user.delete(user_dn)
1089         res = self.ldb_admin.search(self.base_dn,
1090                 expression="(distinguishedName=%s)" % user_dn)
1091         self.assertEqual(res, [])
1092
1093 #tests on ldap rename operations
1094 class AclRenameTests(AclTests):
1095
1096     def setUp(self):
1097         super(AclRenameTests, self).setUp()
1098         self.regular_user = "acl_rename_user1"
1099
1100         # Create regular user
1101         self.create_enable_user(self.regular_user)
1102         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1103
1104     def tearDown(self):
1105         super(AclRenameTests, self).tearDown()
1106         # Rename OU3
1107         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
1108         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
1109         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
1110         self.delete_force(self.ldb_admin, "OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
1111         # Rename OU2
1112         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou2," + self.base_dn)
1113         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou2," + self.base_dn)
1114         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou2," + self.base_dn)
1115         self.delete_force(self.ldb_admin, "OU=test_rename_ou2," + self.base_dn)
1116         # Rename OU1
1117         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn)
1118         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou1," + self.base_dn)
1119         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn)
1120         self.delete_force(self.ldb_admin, "OU=test_rename_ou3,OU=test_rename_ou1," + self.base_dn)
1121         self.delete_force(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn)
1122         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1123
1124     def test_rename_u1(self):
1125         """Regular user fails to rename 'User object' within single OU"""
1126         # Create OU structure
1127         self.create_ou(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn)
1128         self.create_test_user(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn)
1129         try:
1130             self.ldb_user.rename("CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn, \
1131                     "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn)
1132         except LdbError, (num, _):
1133             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1134         else:
1135             self.fail()
1136
1137     def test_rename_u2(self):
1138         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1139         ou_dn = "OU=test_rename_ou1," + self.base_dn
1140         user_dn = "CN=test_rename_user1," + ou_dn
1141         rename_user_dn = "CN=test_rename_user5," + ou_dn
1142         # Create OU structure
1143         self.create_ou(self.ldb_admin, ou_dn)
1144         self.create_test_user(self.ldb_admin, user_dn)
1145         mod = "(A;;WP;;;AU)"
1146         self.dacl_add_ace(user_dn, mod)
1147         # Rename 'User object' having WP to AU
1148         self.ldb_user.rename(user_dn, rename_user_dn)
1149         res = self.ldb_admin.search(self.base_dn,
1150                 expression="(distinguishedName=%s)" % user_dn)
1151         self.assertEqual(res, [])
1152         res = self.ldb_admin.search(self.base_dn,
1153                 expression="(distinguishedName=%s)" % rename_user_dn)
1154         self.assertNotEqual(res, [])
1155
1156     def test_rename_u3(self):
1157         """Test rename with rights granted to 'User object' SID"""
1158         ou_dn = "OU=test_rename_ou1," + self.base_dn
1159         user_dn = "CN=test_rename_user1," + ou_dn
1160         rename_user_dn = "CN=test_rename_user5," + ou_dn
1161         # Create OU structure
1162         self.create_ou(self.ldb_admin, ou_dn)
1163         self.create_test_user(self.ldb_admin, user_dn)
1164         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1165         mod = "(A;;WP;;;%s)" % str(sid)
1166         self.dacl_add_ace(user_dn, mod)
1167         # Rename 'User object' having WP to AU
1168         self.ldb_user.rename(user_dn, rename_user_dn)
1169         res = self.ldb_admin.search(self.base_dn,
1170                 expression="(distinguishedName=%s)" % user_dn)
1171         self.assertEqual(res, [])
1172         res = self.ldb_admin.search(self.base_dn,
1173                 expression="(distinguishedName=%s)" % rename_user_dn)
1174         self.assertNotEqual(res, [])
1175
1176     def test_rename_u4(self):
1177         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1178         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1179         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1180         user_dn = "CN=test_rename_user2," + ou1_dn
1181         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1182         # Create OU structure
1183         self.create_ou(self.ldb_admin, ou1_dn)
1184         self.create_ou(self.ldb_admin, ou2_dn)
1185         self.create_test_user(self.ldb_admin, user_dn)
1186         mod = "(A;;WPSD;;;AU)"
1187         self.dacl_add_ace(user_dn, mod)
1188         mod = "(A;;CC;;;AU)"
1189         self.dacl_add_ace(ou2_dn, mod)
1190         # Rename 'User object' having SD and CC to AU
1191         self.ldb_user.rename(user_dn, rename_user_dn)
1192         res = self.ldb_admin.search(self.base_dn,
1193                 expression="(distinguishedName=%s)" % user_dn)
1194         self.assertEqual(res, [])
1195         res = self.ldb_admin.search(self.base_dn,
1196                 expression="(distinguishedName=%s)" % rename_user_dn)
1197         self.assertNotEqual(res, [])
1198
1199     def test_rename_u5(self):
1200         """Test rename with rights granted to 'User object' SID"""
1201         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1202         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1203         user_dn = "CN=test_rename_user2," + ou1_dn
1204         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1205         # Create OU structure
1206         self.create_ou(self.ldb_admin, ou1_dn)
1207         self.create_ou(self.ldb_admin, ou2_dn)
1208         self.create_test_user(self.ldb_admin, user_dn)
1209         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1210         mod = "(A;;WPSD;;;%s)" % str(sid)
1211         self.dacl_add_ace(user_dn, mod)
1212         mod = "(A;;CC;;;%s)" % str(sid)
1213         self.dacl_add_ace(ou2_dn, mod)
1214         # Rename 'User object' having SD and CC to AU
1215         self.ldb_user.rename(user_dn, rename_user_dn)
1216         res = self.ldb_admin.search(self.base_dn,
1217                 expression="(distinguishedName=%s)" % user_dn)
1218         self.assertEqual(res, [])
1219         res = self.ldb_admin.search(self.base_dn,
1220                 expression="(distinguishedName=%s)" % rename_user_dn)
1221         self.assertNotEqual(res, [])
1222
1223     def test_rename_u6(self):
1224         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1225         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1226         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1227         user_dn = "CN=test_rename_user2," + ou1_dn
1228         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1229         # Create OU structure
1230         self.create_ou(self.ldb_admin, ou1_dn)
1231         self.create_ou(self.ldb_admin, ou2_dn)
1232         #mod = "(A;CI;DCWP;;;AU)"
1233         mod = "(A;;DC;;;AU)"
1234         self.dacl_add_ace(ou1_dn, mod)
1235         mod = "(A;;CC;;;AU)"
1236         self.dacl_add_ace(ou2_dn, mod)
1237         self.create_test_user(self.ldb_admin, user_dn)
1238         mod = "(A;;WP;;;AU)"
1239         self.dacl_add_ace(user_dn, mod)
1240         # Rename 'User object' having SD and CC to AU
1241         self.ldb_user.rename(user_dn, rename_user_dn)
1242         res = self.ldb_admin.search(self.base_dn,
1243                 expression="(distinguishedName=%s)" % user_dn)
1244         self.assertEqual(res, [])
1245         res = self.ldb_admin.search(self.base_dn,
1246                 expression="(distinguishedName=%s)" % rename_user_dn)
1247         self.assertNotEqual(res, [])
1248
1249     def test_rename_u7(self):
1250         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1251         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1252         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1253         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1254         user_dn = "CN=test_rename_user2," + ou1_dn
1255         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1256         # Create OU structure
1257         self.create_ou(self.ldb_admin, ou1_dn)
1258         self.create_ou(self.ldb_admin, ou2_dn)
1259         self.create_ou(self.ldb_admin, ou3_dn)
1260         mod = "(A;CI;WPDC;;;AU)"
1261         self.dacl_add_ace(ou1_dn, mod)
1262         mod = "(A;;CC;;;AU)"
1263         self.dacl_add_ace(ou3_dn, mod)
1264         self.create_test_user(self.ldb_admin, user_dn)
1265         # Rename 'User object' having SD and CC to AU
1266         self.ldb_user.rename(user_dn, rename_user_dn)
1267         res = self.ldb_admin.search(self.base_dn,
1268                 expression="(distinguishedName=%s)" % user_dn)
1269         self.assertEqual(res, [])
1270         res = self.ldb_admin.search(self.base_dn,
1271                 expression="(distinguishedName=%s)" % rename_user_dn)
1272         self.assertNotEqual(res, [])
1273
1274     def test_rename_u8(self):
1275         """Test rename on an object with and without modify access on the RDN attribute"""
1276         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1277         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1278         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1279         # Create OU structure
1280         self.create_ou(self.ldb_admin, ou1_dn)
1281         self.create_ou(self.ldb_admin, ou2_dn)
1282         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1283         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1284         self.dacl_add_ace(ou2_dn, mod)
1285         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1286         self.dacl_add_ace(ou2_dn, mod)
1287         try:
1288             self.ldb_user.rename(ou2_dn, ou3_dn)
1289         except LdbError, (num, _):
1290             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1291         else:
1292             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1293             self.fail()
1294         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1295         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1296         self.dacl_add_ace(ou2_dn, mod)
1297         self.ldb_user.rename(ou2_dn, ou3_dn)
1298         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1299         self.assertEqual(res, [])
1300         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1301         self.assertNotEqual(res, [])
1302
1303 #tests on Control Access Rights
1304 class AclCARTests(AclTests):
1305
1306     def setUp(self):
1307         super(AclCARTests, self).setUp()
1308         self.user_with_wp = "acl_car_user1"
1309         self.user_with_pc = "acl_car_user2"
1310         self.create_enable_user(self.user_with_wp)
1311         self.create_enable_user(self.user_with_pc)
1312         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1313         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1314
1315         res = self.ldb_admin.search("CN=Directory Service, CN=Windows NT, CN=Services, "
1316                  + self.configuration_dn, scope=SCOPE_BASE, attrs=["dSHeuristics"])
1317         if "dSHeuristics" in res[0]:
1318             self.dsheuristics = res[0]["dSHeuristics"][0]
1319         else:
1320             self.dsheuristics = None
1321
1322         self.minPwdAge = self.ldb_admin.get_minPwdAge()
1323
1324         # Set the "dSHeuristics" to have the tests run against Windows Server
1325         self.ldb_admin.set_dsheuristics("000000001")
1326         # Set minPwdAge to 0
1327         self.ldb_admin.set_minPwdAge("0")
1328
1329     def tearDown(self):
1330         super(AclCARTests, self).tearDown()
1331         #restore original values
1332         self.ldb_admin.set_dsheuristics(self.dsheuristics)
1333         self.ldb_admin.set_minPwdAge(self.minPwdAge)
1334         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1335         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1336
1337     def test_change_password1(self):
1338         """Try a password change operation without any CARs given"""
1339         #users have change password by default - remove for negative testing
1340         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1341         sddl = desc.as_sddl(self.domain_sid)
1342         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1343         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1344         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1345         try:
1346             self.ldb_user.modify_ldif("""
1347 dn: """ + self.get_user_dn(self.user_with_wp) + """
1348 changetype: modify
1349 delete: unicodePwd
1350 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1351 add: unicodePwd
1352 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1353 """)
1354         except LdbError, (num, _):
1355             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1356         else:
1357             # for some reason we get constraint violation instead of insufficient access error
1358             self.fail()
1359
1360     def test_change_password2(self):
1361         """Make sure WP has no influence"""
1362         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1363         sddl = desc.as_sddl(self.domain_sid)
1364         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1365         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1366         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1367         mod = "(A;;WP;;;PS)"
1368         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1369         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1370         sddl = desc.as_sddl(self.domain_sid)
1371         try:
1372             self.ldb_user.modify_ldif("""
1373 dn: """ + self.get_user_dn(self.user_with_wp) + """
1374 changetype: modify
1375 delete: unicodePwd
1376 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1377 add: unicodePwd
1378 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1379 """)
1380         except LdbError, (num, _):
1381             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1382         else:
1383             # for some reason we get constraint violation instead of insufficient access error
1384             self.fail()
1385
1386     def test_change_password3(self):
1387         """Make sure WP has no influence"""
1388         mod = "(D;;WP;;;PS)"
1389         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1390         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1391         sddl = desc.as_sddl(self.domain_sid)
1392         self.ldb_user.modify_ldif("""
1393 dn: """ + self.get_user_dn(self.user_with_wp) + """
1394 changetype: modify
1395 delete: unicodePwd
1396 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1397 add: unicodePwd
1398 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1399 """)
1400
1401     def test_change_password5(self):
1402         """Make sure rights have no influence on dBCSPwd"""
1403         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1404         sddl = desc.as_sddl(self.domain_sid)
1405         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1406         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1407         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1408         mod = "(D;;WP;;;PS)"
1409         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1410         try:
1411             self.ldb_user.modify_ldif("""
1412 dn: """ + self.get_user_dn(self.user_with_wp) + """
1413 changetype: modify
1414 delete: dBCSPwd
1415 dBCSPwd: XXXXXXXXXXXXXXXX
1416 add: dBCSPwd
1417 dBCSPwd: YYYYYYYYYYYYYYYY
1418 """)
1419         except LdbError, (num, _):
1420             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1421         else:
1422             self.fail()
1423
1424     def test_change_password6(self):
1425         """Test uneven delete/adds"""
1426         try:
1427             self.ldb_user.modify_ldif("""
1428 dn: """ + self.get_user_dn(self.user_with_wp) + """
1429 changetype: modify
1430 delete: userPassword
1431 userPassword: thatsAcomplPASS1
1432 delete: userPassword
1433 userPassword: thatsAcomplPASS1
1434 add: userPassword
1435 userPassword: thatsAcomplPASS2
1436 """)
1437         except LdbError, (num, _):
1438             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1439         else:
1440             self.fail()
1441         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1442         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1443         try:
1444             self.ldb_user.modify_ldif("""
1445 dn: """ + self.get_user_dn(self.user_with_wp) + """
1446 changetype: modify
1447 delete: userPassword
1448 userPassword: thatsAcomplPASS1
1449 delete: userPassword
1450 userPassword: thatsAcomplPASS1
1451 add: userPassword
1452 userPassword: thatsAcomplPASS2
1453 """)
1454             # This fails on Windows 2000 domain level with constraint violation
1455         except LdbError, (num, _):
1456             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1457                             num == ERR_UNWILLING_TO_PERFORM)
1458         else:
1459             self.fail()
1460
1461
1462     def test_change_password7(self):
1463         """Try a password change operation without any CARs given"""
1464         #users have change password by default - remove for negative testing
1465         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1466         sddl = desc.as_sddl(self.domain_sid)
1467         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1468         #first change our own password
1469         self.ldb_user2.modify_ldif("""
1470 dn: """ + self.get_user_dn(self.user_with_pc) + """
1471 changetype: modify
1472 delete: unicodePwd
1473 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1474 add: unicodePwd
1475 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1476 """)
1477         #then someone else's
1478         self.ldb_user2.modify_ldif("""
1479 dn: """ + self.get_user_dn(self.user_with_wp) + """
1480 changetype: modify
1481 delete: unicodePwd
1482 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1483 add: unicodePwd
1484 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1485 """)
1486
1487     def test_reset_password1(self):
1488         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1489         try:
1490             self.ldb_user.modify_ldif("""
1491 dn: """ + self.get_user_dn(self.user_with_wp) + """
1492 changetype: modify
1493 replace: unicodePwd
1494 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1495 """)
1496         except LdbError, (num, _):
1497             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1498         else:
1499             self.fail()
1500         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1501         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1502         self.ldb_user.modify_ldif("""
1503 dn: """ + self.get_user_dn(self.user_with_wp) + """
1504 changetype: modify
1505 replace: unicodePwd
1506 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1507 """)
1508
1509     def test_reset_password2(self):
1510         """Try a user password reset operation (userPassword) before and after granting CAR"""
1511         try:
1512             self.ldb_user.modify_ldif("""
1513 dn: """ + self.get_user_dn(self.user_with_wp) + """
1514 changetype: modify
1515 replace: userPassword
1516 userPassword: thatsAcomplPASS1
1517 """)
1518         except LdbError, (num, _):
1519             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1520         else:
1521             self.fail()
1522         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1523         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1524         try:
1525             self.ldb_user.modify_ldif("""
1526 dn: """ + self.get_user_dn(self.user_with_wp) + """
1527 changetype: modify
1528 replace: userPassword
1529 userPassword: thatsAcomplPASS1
1530 """)
1531             # This fails on Windows 2000 domain level with constraint violation
1532         except LdbError, (num, _):
1533             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1534
1535     def test_reset_password3(self):
1536         """Grant WP and see what happens (unicodePwd)"""
1537         mod = "(A;;WP;;;PS)"
1538         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1539         try:
1540             self.ldb_user.modify_ldif("""
1541 dn: """ + self.get_user_dn(self.user_with_wp) + """
1542 changetype: modify
1543 replace: unicodePwd
1544 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1545 """)
1546         except LdbError, (num, _):
1547             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1548         else:
1549             self.fail()
1550
1551     def test_reset_password4(self):
1552         """Grant WP and see what happens (userPassword)"""
1553         mod = "(A;;WP;;;PS)"
1554         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1555         try:
1556             self.ldb_user.modify_ldif("""
1557 dn: """ + self.get_user_dn(self.user_with_wp) + """
1558 changetype: modify
1559 replace: userPassword
1560 userPassword: thatsAcomplPASS1
1561 """)
1562         except LdbError, (num, _):
1563             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1564         else:
1565             self.fail()
1566
1567     def test_reset_password5(self):
1568         """Explicitly deny WP but grant CAR (unicodePwd)"""
1569         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1570         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1571         self.ldb_user.modify_ldif("""
1572 dn: """ + self.get_user_dn(self.user_with_wp) + """
1573 changetype: modify
1574 replace: unicodePwd
1575 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1576 """)
1577
1578     def test_reset_password6(self):
1579         """Explicitly deny WP but grant CAR (userPassword)"""
1580         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1581         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1582         try:
1583             self.ldb_user.modify_ldif("""
1584 dn: """ + self.get_user_dn(self.user_with_wp) + """
1585 changetype: modify
1586 replace: userPassword
1587 userPassword: thatsAcomplPASS1
1588 """)
1589             # This fails on Windows 2000 domain level with constraint violation
1590         except LdbError, (num, _):
1591             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1592
1593 class AclExtendedTests(AclTests):
1594
1595     def setUp(self):
1596         super(AclExtendedTests, self).setUp()
1597         #regular user, will be the creator
1598         self.u1 = "ext_u1"
1599         #regular user
1600         self.u2 = "ext_u2"
1601         #admin user
1602         self.u3 = "ext_u3"
1603         self.create_enable_user(self.u1)
1604         self.create_enable_user(self.u2)
1605         self.create_enable_user(self.u3)
1606         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1607                                                 add_members_operation=True)
1608         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1609         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1610         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1611         self.user_sid1 = self.get_object_sid(self.get_user_dn(self.u1))
1612         self.user_sid2 = self.get_object_sid(self.get_user_dn(self.u2))
1613
1614     def tearDown(self):
1615         super(AclExtendedTests, self).tearDown()
1616         self.delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1617         self.delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1618         self.delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1619         self.delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1620         self.delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1621
1622     def test_ntSecurityDescriptor(self):
1623         #create empty ou
1624         self.create_ou(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1625         #give u1 Create children access
1626         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1627         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1628         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1629         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1630         #create a group under that, grant RP to u2
1631         self.create_group(self.ldb_user1, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1632         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1633         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1634         #u2 must not read the descriptor
1635         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1636                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1637         self.assertNotEqual(res,[])
1638         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1639         #grant RC to u2 - still no access
1640         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1641         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1642         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1643                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1644         self.assertNotEqual(res,[])
1645         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1646         #u3 is member of administrators group, should be able to read sd
1647         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1648                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1649         self.assertEqual(len(res),1)
1650         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1651
1652 # Important unit running information
1653
1654 if not "://" in host:
1655     host = "ldap://%s" % host
1656 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp)
1657
1658 runner = SubunitTestRunner()
1659 rc = 0
1660 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1661     rc = 1
1662 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1663     rc = 1
1664 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1665     rc = 1
1666 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1667     rc = 1
1668 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1669     rc = 1
1670 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1671     rc = 1
1672 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1673     rc = 1
1674
1675
1676 sys.exit(rc)