s4-tests: Modified acly.py to use common delete_force instead of defining its own.
[kai/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9
10 sys.path.append("bin/python")
11 import samba
12 samba.ensure_external_module("testtools", "testtools")
13 samba.ensure_external_module("subunit", "subunit/python")
14
15 import samba.getopt as options
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
24 from samba.ndr import ndr_pack, ndr_unpack
25 from samba.dcerpc import security
26
27 from samba.auth import system_session
28 from samba import gensec
29 from samba.samdb import SamDB
30 from samba.credentials import Credentials
31 import samba.tests
32 from samba.tests import delete_force
33 from subunit.run import SubunitTestRunner
34 import unittest
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51
52 lp = sambaopts.get_loadparm()
53 creds = credopts.get_credentials(lp)
54 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
55
56 #
57 # Tests start here
58 #
59
60 class AclTests(samba.tests.TestCase):
61
62     def find_domain_sid(self, ldb):
63         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
64         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
65
66     def setUp(self):
67         super(AclTests, self).setUp()
68         self.ldb_admin = ldb
69         self.base_dn = ldb.domain_dn()
70         self.domain_sid = self.find_domain_sid(self.ldb_admin)
71         self.user_pass = "samba123@"
72         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
73         print "baseDN: %s" % self.base_dn
74
75     def get_user_dn(self, name):
76         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
77
78     def modify_desc(self, object_dn, desc):
79         """ Modify security descriptor using either SDDL string
80             or security.descriptor object
81         """
82         assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
83         mod = """
84 dn: """ + object_dn + """
85 changetype: modify
86 replace: nTSecurityDescriptor
87 """
88         if isinstance(desc, str):
89             mod += "nTSecurityDescriptor: %s" % desc
90         elif isinstance(desc, security.descriptor):
91             mod += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
92         self.ldb_admin.modify_ldif(mod)
93
94     def read_desc(self, object_dn):
95         res = self.ldb_admin.search(object_dn, SCOPE_BASE, None, ["nTSecurityDescriptor"])
96         desc = res[0]["nTSecurityDescriptor"][0]
97         return ndr_unpack(security.descriptor, desc)
98
99     def get_ldb_connection(self, target_username, target_password):
100         creds_tmp = Credentials()
101         creds_tmp.set_username(target_username)
102         creds_tmp.set_password(target_password)
103         creds_tmp.set_domain(creds.get_domain())
104         creds_tmp.set_realm(creds.get_realm())
105         creds_tmp.set_workstation(creds.get_workstation())
106         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
107                                       | gensec.FEATURE_SEAL)
108         ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
109         return ldb_target
110
111     def get_object_sid(self, object_dn):
112         res = self.ldb_admin.search(object_dn)
113         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
114
115     def dacl_add_ace(self, object_dn, ace):
116         desc = self.read_desc(object_dn)
117         desc_sddl = desc.as_sddl(self.domain_sid)
118         if ace in desc_sddl:
119             return
120         if desc_sddl.find("(") >= 0:
121             desc_sddl = desc_sddl[:desc_sddl.index("(")] + ace + desc_sddl[desc_sddl.index("("):]
122         else:
123             desc_sddl = desc_sddl + ace
124         self.modify_desc(object_dn, desc_sddl)
125
126     def get_desc_sddl(self, object_dn):
127         """ Return object nTSecutiryDescriptor in SDDL format
128         """
129         desc = self.read_desc(object_dn)
130         return desc.as_sddl(self.domain_sid)
131
132     # Test if we have any additional groups for users than default ones
133     def assert_user_no_group_member(self, username):
134         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
135         try:
136             self.assertEqual(res[0]["memberOf"][0], "")
137         except KeyError:
138             pass
139         else:
140             self.fail()
141
142 #tests on ldap add operations
143 class AclAddTests(AclTests):
144
145     def setUp(self):
146         super(AclAddTests, self).setUp()
147         # Domain admin that will be creator of OU parent-child structure
148         self.usr_admin_owner = "acl_add_user1"
149         # Second domain admin that will not be creator of OU parent-child structure
150         self.usr_admin_not_owner = "acl_add_user2"
151         # Regular user
152         self.regular_user = "acl_add_user3"
153         self.test_user1 = "test_add_user1"
154         self.test_group1 = "test_add_group1"
155         self.ou1 = "OU=test_add_ou1"
156         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
157         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
158         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
159         self.ldb_admin.newuser(self.regular_user, self.user_pass)
160
161         # add admins to the Domain Admins group
162         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
163                        add_members_operation=True)
164         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
165                        add_members_operation=True)
166
167         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
168         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
169         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
170
171     def tearDown(self):
172         super(AclAddTests, self).tearDown()
173         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
174                           (self.test_user1, self.ou2, self.base_dn))
175         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
176                           (self.test_group1, self.ou2, self.base_dn))
177         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
178         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
179         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
180         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
181         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
182
183     # Make sure top OU is deleted (and so everything under it)
184     def assert_top_ou_deleted(self):
185         res = self.ldb_admin.search(self.base_dn,
186             expression="(distinguishedName=%s,%s)" % (
187                 "OU=test_add_ou1", self.base_dn))
188         self.assertEqual(res, [])
189
190     def test_add_u1(self):
191         """Testing OU with the rights of Doman Admin not creator of the OU """
192         self.assert_top_ou_deleted()
193         # Change descriptor for top level OU
194         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
195         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
196         user_sid = self.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
197         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
198         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
199         # Test user and group creation with another domain admin's credentials
200         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
201         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
202                                    grouptype=4)
203         # Make sure we HAVE created the two objects -- user and group
204         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
205         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
206         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
207         self.assertTrue(len(res) > 0)
208         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
209         self.assertTrue(len(res) > 0)
210
211     def test_add_u2(self):
212         """Testing OU with the regular user that has no rights granted over the OU """
213         self.assert_top_ou_deleted()
214         # Create a parent-child OU structure with domain admin credentials
215         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
216         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
217         # Test user and group creation with regular user credentials
218         try:
219             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
220             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
221                                    grouptype=4)
222         except LdbError, (num, _):
223             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
224         else:
225             self.fail()
226         # Make sure we HAVEN'T created any of two objects -- user or group
227         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
228         self.assertEqual(res, [])
229         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
230         self.assertEqual(res, [])
231
232     def test_add_u3(self):
233         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
234         self.assert_top_ou_deleted()
235         # Change descriptor for top level OU
236         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
237         user_sid = self.get_object_sid(self.get_user_dn(self.regular_user))
238         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
239         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
240         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
241         # Test user and group creation with granted user only to one of the objects
242         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
243         try:
244             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
245                                    grouptype=4)
246         except LdbError, (num, _):
247             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
248         else:
249             self.fail()
250         # Make sure we HAVE created the one of two objects -- user
251         res = self.ldb_admin.search(self.base_dn,
252                 expression="(distinguishedName=%s,%s)" %
253                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
254                     self.base_dn))
255         self.assertNotEqual(len(res), 0)
256         res = self.ldb_admin.search(self.base_dn,
257                 expression="(distinguishedName=%s,%s)" %
258                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
259                     self.base_dn) )
260         self.assertEqual(res, [])
261
262     def test_add_u4(self):
263         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
264         self.assert_top_ou_deleted()
265         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
266         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
267         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
268         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
269                                  grouptype=4)
270         # Make sure we have successfully created the two objects -- user and group
271         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
272         self.assertTrue(len(res) > 0)
273         res = self.ldb_admin.search(self.base_dn,
274                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
275         self.assertTrue(len(res) > 0)
276
277 #tests on ldap modify operations
278 class AclModifyTests(AclTests):
279
280     def setUp(self):
281         super(AclModifyTests, self).setUp()
282         self.user_with_wp = "acl_mod_user1"
283         self.user_with_sm = "acl_mod_user2"
284         self.user_with_group_sm = "acl_mod_user3"
285         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
286         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
287         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
288         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
289         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
290         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
291         self.user_sid = self.get_object_sid( self.get_user_dn(self.user_with_wp))
292         self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
293         self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
294         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
295
296     def tearDown(self):
297         super(AclModifyTests, self).tearDown()
298         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
299         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
300         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
301         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
302         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
303         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
304         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
305         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
306         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
307
308     def test_modify_u1(self):
309         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
310         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
311         # First test object -- User
312         print "Testing modify on User object"
313         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
314         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
315         ldif = """
316 dn: """ + self.get_user_dn("test_modify_user1") + """
317 changetype: modify
318 replace: displayName
319 displayName: test_changed"""
320         self.ldb_user.modify_ldif(ldif)
321         res = self.ldb_admin.search(self.base_dn,
322                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
323         self.assertEqual(res[0]["displayName"][0], "test_changed")
324         # Second test object -- Group
325         print "Testing modify on Group object"
326         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
327         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
328         ldif = """
329 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
330 changetype: modify
331 replace: displayName
332 displayName: test_changed"""
333         self.ldb_user.modify_ldif(ldif)
334         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
335         self.assertEqual(res[0]["displayName"][0], "test_changed")
336         # Third test object -- Organizational Unit
337         print "Testing modify on OU object"
338         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
339         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
340         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
341         ldif = """
342 dn: OU=test_modify_ou1,""" + self.base_dn + """
343 changetype: modify
344 replace: displayName
345 displayName: test_changed"""
346         self.ldb_user.modify_ldif(ldif)
347         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
348         self.assertEqual(res[0]["displayName"][0], "test_changed")
349
350     def test_modify_u2(self):
351         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
352         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
353         # First test object -- User
354         print "Testing modify on User object"
355         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
356         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
357         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
358         # Modify on attribute you have rights for
359         ldif = """
360 dn: """ + self.get_user_dn("test_modify_user1") + """
361 changetype: modify
362 replace: displayName
363 displayName: test_changed"""
364         self.ldb_user.modify_ldif(ldif)
365         res = self.ldb_admin.search(self.base_dn,
366                 expression="(distinguishedName=%s)" %
367                 self.get_user_dn("test_modify_user1"))
368         self.assertEqual(res[0]["displayName"][0], "test_changed")
369         # Modify on attribute you do not have rights for granted
370         ldif = """
371 dn: """ + self.get_user_dn("test_modify_user1") + """
372 changetype: modify
373 replace: url
374 url: www.samba.org"""
375         try:
376             self.ldb_user.modify_ldif(ldif)
377         except LdbError, (num, _):
378             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
379         else:
380             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
381             self.fail()
382         # Second test object -- Group
383         print "Testing modify on Group object"
384         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
385         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
386         ldif = """
387 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
388 changetype: modify
389 replace: displayName
390 displayName: test_changed"""
391         self.ldb_user.modify_ldif(ldif)
392         res = self.ldb_admin.search(self.base_dn,
393                 expression="(distinguishedName=%s)" %
394                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
395         self.assertEqual(res[0]["displayName"][0], "test_changed")
396         # Modify on attribute you do not have rights for granted
397         ldif = """
398 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
399 changetype: modify
400 replace: url
401 url: www.samba.org"""
402         try:
403             self.ldb_user.modify_ldif(ldif)
404         except LdbError, (num, _):
405             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
406         else:
407             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
408             self.fail()
409         # Second test object -- Organizational Unit
410         print "Testing modify on OU object"
411         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
412         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
413         ldif = """
414 dn: OU=test_modify_ou1,""" + self.base_dn + """
415 changetype: modify
416 replace: displayName
417 displayName: test_changed"""
418         self.ldb_user.modify_ldif(ldif)
419         res = self.ldb_admin.search(self.base_dn,
420                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
421                     + self.base_dn))
422         self.assertEqual(res[0]["displayName"][0], "test_changed")
423         # Modify on attribute you do not have rights for granted
424         ldif = """
425 dn: OU=test_modify_ou1,""" + self.base_dn + """
426 changetype: modify
427 replace: url
428 url: www.samba.org"""
429         try:
430             self.ldb_user.modify_ldif(ldif)
431         except LdbError, (num, _):
432             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
433         else:
434             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
435             self.fail()
436
437     def test_modify_u3(self):
438         """7 Modify one attribute as you have no what so ever rights granted"""
439         # First test object -- User
440         print "Testing modify on User object"
441         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
442         # Modify on attribute you do not have rights for granted
443         ldif = """
444 dn: """ + self.get_user_dn("test_modify_user1") + """
445 changetype: modify
446 replace: url
447 url: www.samba.org"""
448         try:
449             self.ldb_user.modify_ldif(ldif)
450         except LdbError, (num, _):
451             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
452         else:
453             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
454             self.fail()
455
456         # Second test object -- Group
457         print "Testing modify on Group object"
458         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
459         # Modify on attribute you do not have rights for granted
460         ldif = """
461 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
462 changetype: modify
463 replace: url
464 url: www.samba.org"""
465         try:
466             self.ldb_user.modify_ldif(ldif)
467         except LdbError, (num, _):
468             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
469         else:
470             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
471             self.fail()
472
473         # Second test object -- Organizational Unit
474         print "Testing modify on OU object"
475         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
476         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
477         # Modify on attribute you do not have rights for granted
478         ldif = """
479 dn: OU=test_modify_ou1,""" + self.base_dn + """
480 changetype: modify
481 replace: url
482 url: www.samba.org"""
483         try:
484             self.ldb_user.modify_ldif(ldif)
485         except LdbError, (num, _):
486             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
487         else:
488             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
489             self.fail()
490
491
492     def test_modify_u4(self):
493         """11 Grant WP to PRINCIPAL_SELF and test modify"""
494         ldif = """
495 dn: """ + self.get_user_dn(self.user_with_wp) + """
496 changetype: modify
497 add: adminDescription
498 adminDescription: blah blah blah"""
499         try:
500             self.ldb_user.modify_ldif(ldif)
501         except LdbError, (num, _):
502             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
503         else:
504             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
505             self.fail()
506
507         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
508         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
509         # Modify on attribute you have rights for
510         self.ldb_user.modify_ldif(ldif)
511         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
512                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
513         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
514
515     def test_modify_u5(self):
516         """12 test self membership"""
517         ldif = """
518 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
519 changetype: modify
520 add: Member
521 Member: """ +  self.get_user_dn(self.user_with_sm)
522 #the user has no rights granted, this should fail
523         try:
524             self.ldb_user2.modify_ldif(ldif)
525         except LdbError, (num, _):
526             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
527         else:
528             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
529             self.fail()
530
531 #grant self-membership, should be able to add himself
532         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
533         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
534         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
535         self.ldb_user2.modify_ldif(ldif)
536         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
537                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
538         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
539 #but not other users
540         ldif = """
541 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
542 changetype: modify
543 add: Member
544 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
545         try:
546             self.ldb_user2.modify_ldif(ldif)
547         except LdbError, (num, _):
548             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
549         else:
550             self.fail()
551
552     def test_modify_u6(self):
553         """13 test self membership"""
554         ldif = """
555 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
556 changetype: modify
557 add: Member
558 Member: """ +  self.get_user_dn(self.user_with_sm) + """
559 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
560
561 #grant self-membership, should be able to add himself  but not others at the same time
562         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
563         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
564         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
565         try:
566             self.ldb_user2.modify_ldif(ldif)
567         except LdbError, (num, _):
568             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
569         else:
570             self.fail()
571
572     def test_modify_u7(self):
573         """13 User with WP modifying Member"""
574 #a second user is given write property permission
575         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_wp))
576         mod = "(A;;WP;;;%s)" % str(user_sid)
577         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
578         ldif = """
579 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
580 changetype: modify
581 add: Member
582 Member: """ +  self.get_user_dn(self.user_with_wp)
583         self.ldb_user.modify_ldif(ldif)
584         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
585                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
586         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
587         ldif = """
588 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
589 changetype: modify
590 delete: Member"""
591         self.ldb_user.modify_ldif(ldif)
592         ldif = """
593 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
594 changetype: modify
595 add: Member
596 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
597         self.ldb_user.modify_ldif(ldif)
598         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
599                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
600         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
601
602 #enable these when we have search implemented
603 class AclSearchTests(AclTests):
604
605     def setUp(self):
606         super(AclSearchTests, self).setUp()
607         self.u1 = "search_u1"
608         self.u2 = "search_u2"
609         self.u3 = "search_u3"
610         self.group1 = "group1"
611         self.creds_tmp = Credentials()
612         self.creds_tmp.set_username("")
613         self.creds_tmp.set_password("")
614         self.creds_tmp.set_domain(creds.get_domain())
615         self.creds_tmp.set_realm(creds.get_realm())
616         self.creds_tmp.set_workstation(creds.get_workstation())
617         self.anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
618         self.dsheuristics = self.ldb_admin.get_dsheuristics()
619         self.ldb_admin.newuser(self.u1, self.user_pass)
620         self.ldb_admin.newuser(self.u2, self.user_pass)
621         self.ldb_admin.newuser(self.u3, self.user_pass)
622         self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
623         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
624                                                 add_members_operation=True)
625         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
626         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
627         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
628         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
629                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
630                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
631                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
632                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
633                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
634         self.user_sid = self.get_object_sid(self.get_user_dn(self.u1))
635         self.group_sid = self.get_object_sid(self.get_user_dn(self.group1))
636
637     def create_clean_ou(self, object_dn):
638         """ Base repeating setup for unittests to follow """
639         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
640                 expression="distinguishedName=%s" % object_dn)
641         # Make sure top testing OU has been deleted before starting the test
642         self.assertEqual(res, [])
643         self.ldb_admin.create_ou(object_dn)
644         desc_sddl = self.get_desc_sddl(object_dn)
645         # Make sure there are inheritable ACEs initially
646         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
647         # Find and remove all inherit ACEs
648         res = re.findall("\(.*?\)", desc_sddl)
649         res = [x for x in res if ("CI" in x) or ("OI" in x)]
650         for x in res:
651             desc_sddl = desc_sddl.replace(x, "")
652         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
653         # can propagate from above
654         # remove SACL, we are not interested
655         desc_sddl = desc_sddl.replace(":AI", ":AIP")
656         self.modify_desc(object_dn, desc_sddl)
657         # Verify all inheritable ACEs are gone
658         desc_sddl = self.get_desc_sddl(object_dn)
659         self.assertFalse("CI" in desc_sddl)
660         self.assertFalse("OI" in desc_sddl)
661
662     def tearDown(self):
663         super(AclSearchTests, self).tearDown()
664         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
665         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
666         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
667         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
668         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
669         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
670         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
671         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
672         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
673         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
674         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
675         delete_force(self.ldb_admin, self.get_user_dn("group1"))
676
677     def test_search_anonymous1(self):
678         """Verify access of rootDSE with the correct request"""
679         res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
680         self.assertEquals(len(res), 1)
681         #verify some of the attributes
682         #dont care about values
683         self.assertTrue("ldapServiceName" in res[0])
684         self.assertTrue("namingContexts" in res[0])
685         self.assertTrue("isSynchronized" in res[0])
686         self.assertTrue("dsServiceName" in res[0])
687         self.assertTrue("supportedSASLMechanisms" in res[0])
688         self.assertTrue("isGlobalCatalogReady" in res[0])
689         self.assertTrue("domainControllerFunctionality" in res[0])
690         self.assertTrue("serverName" in res[0])
691
692     def test_search_anonymous2(self):
693         """Make sure we cannot access anything else"""
694         try:
695             res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
696         except LdbError, (num, _):
697             self.assertEquals(num, ERR_OPERATIONS_ERROR)
698         else:
699             self.fail()
700         try:
701             res = self.anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
702         except LdbError, (num, _):
703             self.assertEquals(num, ERR_OPERATIONS_ERROR)
704         else:
705             self.fail()
706         try:
707             res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
708                                         scope=SCOPE_SUBTREE)
709         except LdbError, (num, _):
710             self.assertEquals(num, ERR_OPERATIONS_ERROR)
711         else:
712             self.fail()
713
714     def test_search_anonymous3(self):
715         """Set dsHeuristics and repeat"""
716         self.ldb_admin.set_dsheuristics("0000002")
717         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
718         mod = "(A;CI;LC;;;AN)"
719         self.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
720         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
721         res = self.anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
722                                     expression="(objectClass=*)", scope=SCOPE_SUBTREE)
723         self.assertEquals(len(res), 1)
724         self.assertTrue("dn" in res[0])
725         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
726                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
727         res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
728                                     scope=SCOPE_SUBTREE)
729         self.assertEquals(len(res), 1)
730         self.assertTrue("dn" in res[0])
731         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
732         self.ldb_admin.set_dsheuristics(self.dsheuristics)
733
734     def test_search1(self):
735         """Make sure users can see us if given LC to user and group"""
736         self.create_clean_ou("OU=ou1," + self.base_dn)
737         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
738         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
739         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn,
740                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
741         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
742                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
743         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
744                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
745         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
746                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
747         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
748                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
749
750         #regular users must see only ou1 and ou2
751         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
752                                     scope=SCOPE_SUBTREE)
753         self.assertEquals(len(res), 2)
754         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
755                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
756
757         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
758         self.assertEquals(sorted(res_list), sorted(ok_list))
759
760         #these users should see all ous
761         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
762                                     scope=SCOPE_SUBTREE)
763         self.assertEquals(len(res), 6)
764         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
765         self.assertEquals(sorted(res_list), sorted(self.full_list))
766
767         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
768                                     scope=SCOPE_SUBTREE)
769         self.assertEquals(len(res), 6)
770         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
771         self.assertEquals(sorted(res_list), sorted(self.full_list))
772
773     def test_search2(self):
774         """Make sure users can't see us if access is explicitly denied"""
775         self.create_clean_ou("OU=ou1," + self.base_dn)
776         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
777         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
778         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
779         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
780         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
781         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
782         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
783         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
784                                     scope=SCOPE_SUBTREE)
785         #this user should see all ous
786         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
787         self.assertEquals(sorted(res_list), sorted(self.full_list))
788
789         #these users should see ou1, 2, 5 and 6 but not 3 and 4
790         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
791                                     scope=SCOPE_SUBTREE)
792         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
793                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
794                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
795                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
796         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
797         self.assertEquals(sorted(res_list), sorted(ok_list))
798
799         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
800                                     scope=SCOPE_SUBTREE)
801         self.assertEquals(len(res), 4)
802         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
803         self.assertEquals(sorted(res_list), sorted(ok_list))
804
805     def test_search3(self):
806         """Make sure users can't see ous if access is explicitly denied - 2"""
807         self.create_clean_ou("OU=ou1," + self.base_dn)
808         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
809         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
810         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn,
811                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
812         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
813                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
814         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
815                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
816         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
817                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
818         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
819                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
820
821         print "Testing correct behavior on nonaccessible search base"
822         try:
823              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
824                                    scope=SCOPE_BASE)
825         except LdbError, (num, _):
826             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
827         else:
828             self.fail()
829
830         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
831         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
832
833         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
834                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
835
836         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
837                                     scope=SCOPE_SUBTREE)
838         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
839         self.assertEquals(sorted(res_list), sorted(ok_list))
840
841         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
842                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
843                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
844                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
845
846         #should not see ou3 and ou4, but should see ou5 and ou6
847         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
848                                     scope=SCOPE_SUBTREE)
849         self.assertEquals(len(res), 4)
850         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
851         self.assertEquals(sorted(res_list), sorted(ok_list))
852
853         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
854                                     scope=SCOPE_SUBTREE)
855         self.assertEquals(len(res), 4)
856         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
857         self.assertEquals(sorted(res_list), sorted(ok_list))
858
859     def test_search4(self):
860         """There is no difference in visibility if the user is also creator"""
861         self.create_clean_ou("OU=ou1," + self.base_dn)
862         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
863         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
864         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn,
865                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
866         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
867                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
868         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
869                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
870         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
871                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
872         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
873                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
874
875         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
876                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
877         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
878                                     scope=SCOPE_SUBTREE)
879         self.assertEquals(len(res), 2)
880         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
881         self.assertEquals(sorted(res_list), sorted(ok_list))
882
883         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
884                                     scope=SCOPE_SUBTREE)
885         self.assertEquals(len(res), 2)
886         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
887         self.assertEquals(sorted(res_list), sorted(ok_list))
888
889     def test_search5(self):
890         """Make sure users can see only attributes they are allowed to see"""
891         self.create_clean_ou("OU=ou1," + self.base_dn)
892         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
893         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
894         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn,
895                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
896         # assert user can only see dn
897         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
898                                     scope=SCOPE_SUBTREE)
899         ok_list = ['dn']
900         self.assertEquals(len(res), 1)
901         res_list = res[0].keys()
902         self.assertEquals(res_list, ok_list)
903
904         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
905                                     scope=SCOPE_BASE, attrs=["ou"])
906
907         self.assertEquals(len(res), 1)
908         res_list = res[0].keys()
909         self.assertEquals(res_list, ok_list)
910
911         #give read property on ou and assert user can only see dn and ou
912         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
913         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
914         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
915         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
916                                     scope=SCOPE_SUBTREE)
917         ok_list = ['dn', 'ou']
918         self.assertEquals(len(res), 1)
919         res_list = res[0].keys()
920         self.assertEquals(sorted(res_list), sorted(ok_list))
921
922         #give read property on Public Information and assert user can see ou and other members
923         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
924         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
925         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
926         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
927                                     scope=SCOPE_SUBTREE)
928
929         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
930         res_list = res[0].keys()
931         self.assertEquals(sorted(res_list), sorted(ok_list))
932
933     def test_search6(self):
934         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
935         self.create_clean_ou("OU=ou1," + self.base_dn)
936         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
937         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
938         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn,
939                                  "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod)
940         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
941                                 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
942
943         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
944                                     scope=SCOPE_SUBTREE)
945         #nothing should be returned as ou is not accessible
946         self.assertEquals(res, [])
947
948         #give read property on ou and assert user can only see dn and ou
949         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
950         self.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
951         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
952                                     scope=SCOPE_SUBTREE)
953         self.assertEquals(len(res), 1)
954         ok_list = ['dn', 'ou']
955         res_list = res[0].keys()
956         self.assertEquals(sorted(res_list), sorted(ok_list))
957
958         #give read property on Public Information and assert user can see ou and other members
959         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
960         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
961         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
962                                    scope=SCOPE_SUBTREE)
963         self.assertEquals(len(res), 1)
964         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
965         res_list = res[0].keys()
966         self.assertEquals(sorted(res_list), sorted(ok_list))
967
968 #tests on ldap delete operations
969 class AclDeleteTests(AclTests):
970
971     def setUp(self):
972         super(AclDeleteTests, self).setUp()
973         self.regular_user = "acl_delete_user1"
974         # Create regular user
975         self.ldb_admin.newuser(self.regular_user, self.user_pass)
976         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
977
978     def tearDown(self):
979         super(AclDeleteTests, self).tearDown()
980         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
981         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
982
983     def test_delete_u1(self):
984         """User is prohibited by default to delete another User object"""
985         # Create user that we try to delete
986         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
987         # Here delete User object should ALWAYS through exception
988         try:
989             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
990         except LdbError, (num, _):
991             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
992         else:
993             self.fail()
994
995     def test_delete_u2(self):
996         """User's group has RIGHT_DELETE to another User object"""
997         user_dn = self.get_user_dn("test_delete_user1")
998         # Create user that we try to delete
999         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1000         mod = "(A;;SD;;;AU)"
1001         self.dacl_add_ace(user_dn, mod)
1002         # Try to delete User object
1003         self.ldb_user.delete(user_dn)
1004         res = self.ldb_admin.search(self.base_dn,
1005                 expression="(distinguishedName=%s)" % user_dn)
1006         self.assertEqual(res, [])
1007
1008     def test_delete_u3(self):
1009         """User indentified by SID has RIGHT_DELETE to another User object"""
1010         user_dn = self.get_user_dn("test_delete_user1")
1011         # Create user that we try to delete
1012         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1013         mod = "(A;;SD;;;%s)" % self.get_object_sid(self.get_user_dn(self.regular_user))
1014         self.dacl_add_ace(user_dn, mod)
1015         # Try to delete User object
1016         self.ldb_user.delete(user_dn)
1017         res = self.ldb_admin.search(self.base_dn,
1018                 expression="(distinguishedName=%s)" % user_dn)
1019         self.assertEqual(res, [])
1020
1021 #tests on ldap rename operations
1022 class AclRenameTests(AclTests):
1023
1024     def setUp(self):
1025         super(AclRenameTests, self).setUp()
1026         self.regular_user = "acl_rename_user1"
1027         self.ou1 = "OU=test_rename_ou1"
1028         self.ou2 = "OU=test_rename_ou2"
1029         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1030         self.testuser1 = "test_rename_user1"
1031         self.testuser2 = "test_rename_user2"
1032         self.testuser3 = "test_rename_user3"
1033         self.testuser4 = "test_rename_user4"
1034         self.testuser5 = "test_rename_user5"
1035         # Create regular user
1036         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1037         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1038
1039     def tearDown(self):
1040         super(AclRenameTests, self).tearDown()
1041         # Rename OU3
1042         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1043         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1044         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1045         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1046         # Rename OU2
1047         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1048         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1049         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1050         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1051         # Rename OU1
1052         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1053         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1054         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1055         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1056         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1057         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1058
1059     def test_rename_u1(self):
1060         """Regular user fails to rename 'User object' within single OU"""
1061         # Create OU structure
1062         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1063         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1064         try:
1065             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1066                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1067         except LdbError, (num, _):
1068             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1069         else:
1070             self.fail()
1071
1072     def test_rename_u2(self):
1073         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1074         ou_dn = "OU=test_rename_ou1," + self.base_dn
1075         user_dn = "CN=test_rename_user1," + ou_dn
1076         rename_user_dn = "CN=test_rename_user5," + ou_dn
1077         # Create OU structure
1078         self.ldb_admin.create_ou(ou_dn)
1079         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1080         mod = "(A;;WP;;;AU)"
1081         self.dacl_add_ace(user_dn, mod)
1082         # Rename 'User object' having WP to AU
1083         self.ldb_user.rename(user_dn, rename_user_dn)
1084         res = self.ldb_admin.search(self.base_dn,
1085                 expression="(distinguishedName=%s)" % user_dn)
1086         self.assertEqual(res, [])
1087         res = self.ldb_admin.search(self.base_dn,
1088                 expression="(distinguishedName=%s)" % rename_user_dn)
1089         self.assertNotEqual(res, [])
1090
1091     def test_rename_u3(self):
1092         """Test rename with rights granted to 'User object' SID"""
1093         ou_dn = "OU=test_rename_ou1," + self.base_dn
1094         user_dn = "CN=test_rename_user1," + ou_dn
1095         rename_user_dn = "CN=test_rename_user5," + ou_dn
1096         # Create OU structure
1097         self.ldb_admin.create_ou(ou_dn)
1098         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1099         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1100         mod = "(A;;WP;;;%s)" % str(sid)
1101         self.dacl_add_ace(user_dn, mod)
1102         # Rename 'User object' having WP to AU
1103         self.ldb_user.rename(user_dn, rename_user_dn)
1104         res = self.ldb_admin.search(self.base_dn,
1105                 expression="(distinguishedName=%s)" % user_dn)
1106         self.assertEqual(res, [])
1107         res = self.ldb_admin.search(self.base_dn,
1108                 expression="(distinguishedName=%s)" % rename_user_dn)
1109         self.assertNotEqual(res, [])
1110
1111     def test_rename_u4(self):
1112         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1113         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1114         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1115         user_dn = "CN=test_rename_user2," + ou1_dn
1116         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1117         # Create OU structure
1118         self.ldb_admin.create_ou(ou1_dn)
1119         self.ldb_admin.create_ou(ou2_dn)
1120         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1121         mod = "(A;;WPSD;;;AU)"
1122         self.dacl_add_ace(user_dn, mod)
1123         mod = "(A;;CC;;;AU)"
1124         self.dacl_add_ace(ou2_dn, mod)
1125         # Rename 'User object' having SD and CC to AU
1126         self.ldb_user.rename(user_dn, rename_user_dn)
1127         res = self.ldb_admin.search(self.base_dn,
1128                 expression="(distinguishedName=%s)" % user_dn)
1129         self.assertEqual(res, [])
1130         res = self.ldb_admin.search(self.base_dn,
1131                 expression="(distinguishedName=%s)" % rename_user_dn)
1132         self.assertNotEqual(res, [])
1133
1134     def test_rename_u5(self):
1135         """Test rename with rights granted to 'User object' SID"""
1136         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1137         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1138         user_dn = "CN=test_rename_user2," + ou1_dn
1139         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1140         # Create OU structure
1141         self.ldb_admin.create_ou(ou1_dn)
1142         self.ldb_admin.create_ou(ou2_dn)
1143         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1144         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1145         mod = "(A;;WPSD;;;%s)" % str(sid)
1146         self.dacl_add_ace(user_dn, mod)
1147         mod = "(A;;CC;;;%s)" % str(sid)
1148         self.dacl_add_ace(ou2_dn, mod)
1149         # Rename 'User object' having SD and CC to AU
1150         self.ldb_user.rename(user_dn, rename_user_dn)
1151         res = self.ldb_admin.search(self.base_dn,
1152                 expression="(distinguishedName=%s)" % user_dn)
1153         self.assertEqual(res, [])
1154         res = self.ldb_admin.search(self.base_dn,
1155                 expression="(distinguishedName=%s)" % rename_user_dn)
1156         self.assertNotEqual(res, [])
1157
1158     def test_rename_u6(self):
1159         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1160         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1161         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1162         user_dn = "CN=test_rename_user2," + ou1_dn
1163         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1164         # Create OU structure
1165         self.ldb_admin.create_ou(ou1_dn)
1166         self.ldb_admin.create_ou(ou2_dn)
1167         #mod = "(A;CI;DCWP;;;AU)"
1168         mod = "(A;;DC;;;AU)"
1169         self.dacl_add_ace(ou1_dn, mod)
1170         mod = "(A;;CC;;;AU)"
1171         self.dacl_add_ace(ou2_dn, mod)
1172         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1173         mod = "(A;;WP;;;AU)"
1174         self.dacl_add_ace(user_dn, mod)
1175         # Rename 'User object' having SD and CC to AU
1176         self.ldb_user.rename(user_dn, rename_user_dn)
1177         res = self.ldb_admin.search(self.base_dn,
1178                 expression="(distinguishedName=%s)" % user_dn)
1179         self.assertEqual(res, [])
1180         res = self.ldb_admin.search(self.base_dn,
1181                 expression="(distinguishedName=%s)" % rename_user_dn)
1182         self.assertNotEqual(res, [])
1183
1184     def test_rename_u7(self):
1185         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1186         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1187         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1188         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1189         user_dn = "CN=test_rename_user2," + ou1_dn
1190         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1191         # Create OU structure
1192         self.ldb_admin.create_ou(ou1_dn)
1193         self.ldb_admin.create_ou(ou2_dn)
1194         self.ldb_admin.create_ou(ou3_dn)
1195         mod = "(A;CI;WPDC;;;AU)"
1196         self.dacl_add_ace(ou1_dn, mod)
1197         mod = "(A;;CC;;;AU)"
1198         self.dacl_add_ace(ou3_dn, mod)
1199         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1200         # Rename 'User object' having SD and CC to AU
1201         self.ldb_user.rename(user_dn, rename_user_dn)
1202         res = self.ldb_admin.search(self.base_dn,
1203                 expression="(distinguishedName=%s)" % user_dn)
1204         self.assertEqual(res, [])
1205         res = self.ldb_admin.search(self.base_dn,
1206                 expression="(distinguishedName=%s)" % rename_user_dn)
1207         self.assertNotEqual(res, [])
1208
1209     def test_rename_u8(self):
1210         """Test rename on an object with and without modify access on the RDN attribute"""
1211         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1212         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1213         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1214         # Create OU structure
1215         self.ldb_admin.create_ou(ou1_dn)
1216         self.ldb_admin.create_ou(ou2_dn)
1217         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1218         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1219         self.dacl_add_ace(ou2_dn, mod)
1220         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1221         self.dacl_add_ace(ou2_dn, mod)
1222         try:
1223             self.ldb_user.rename(ou2_dn, ou3_dn)
1224         except LdbError, (num, _):
1225             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1226         else:
1227             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1228             self.fail()
1229         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1230         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1231         self.dacl_add_ace(ou2_dn, mod)
1232         self.ldb_user.rename(ou2_dn, ou3_dn)
1233         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1234         self.assertEqual(res, [])
1235         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1236         self.assertNotEqual(res, [])
1237
1238 #tests on Control Access Rights
1239 class AclCARTests(AclTests):
1240
1241     def setUp(self):
1242         super(AclCARTests, self).setUp()
1243         self.user_with_wp = "acl_car_user1"
1244         self.user_with_pc = "acl_car_user2"
1245         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1246         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1247         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1248         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1249
1250         res = self.ldb_admin.search("CN=Directory Service, CN=Windows NT, CN=Services, "
1251                  + self.configuration_dn, scope=SCOPE_BASE, attrs=["dSHeuristics"])
1252         if "dSHeuristics" in res[0]:
1253             self.dsheuristics = res[0]["dSHeuristics"][0]
1254         else:
1255             self.dsheuristics = None
1256
1257         self.minPwdAge = self.ldb_admin.get_minPwdAge()
1258
1259         # Set the "dSHeuristics" to have the tests run against Windows Server
1260         self.ldb_admin.set_dsheuristics("000000001")
1261         # Set minPwdAge to 0
1262         self.ldb_admin.set_minPwdAge("0")
1263
1264     def tearDown(self):
1265         super(AclCARTests, self).tearDown()
1266         #restore original values
1267         self.ldb_admin.set_dsheuristics(self.dsheuristics)
1268         self.ldb_admin.set_minPwdAge(self.minPwdAge)
1269         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1270         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1271
1272     def test_change_password1(self):
1273         """Try a password change operation without any CARs given"""
1274         #users have change password by default - remove for negative testing
1275         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1276         sddl = desc.as_sddl(self.domain_sid)
1277         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1278         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1279         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1280         try:
1281             self.ldb_user.modify_ldif("""
1282 dn: """ + self.get_user_dn(self.user_with_wp) + """
1283 changetype: modify
1284 delete: unicodePwd
1285 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1286 add: unicodePwd
1287 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1288 """)
1289         except LdbError, (num, _):
1290             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1291         else:
1292             # for some reason we get constraint violation instead of insufficient access error
1293             self.fail()
1294
1295     def test_change_password2(self):
1296         """Make sure WP has no influence"""
1297         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1298         sddl = desc.as_sddl(self.domain_sid)
1299         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1300         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1301         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1302         mod = "(A;;WP;;;PS)"
1303         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1304         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1305         sddl = desc.as_sddl(self.domain_sid)
1306         try:
1307             self.ldb_user.modify_ldif("""
1308 dn: """ + self.get_user_dn(self.user_with_wp) + """
1309 changetype: modify
1310 delete: unicodePwd
1311 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1312 add: unicodePwd
1313 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1314 """)
1315         except LdbError, (num, _):
1316             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1317         else:
1318             # for some reason we get constraint violation instead of insufficient access error
1319             self.fail()
1320
1321     def test_change_password3(self):
1322         """Make sure WP has no influence"""
1323         mod = "(D;;WP;;;PS)"
1324         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1325         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1326         sddl = desc.as_sddl(self.domain_sid)
1327         self.ldb_user.modify_ldif("""
1328 dn: """ + self.get_user_dn(self.user_with_wp) + """
1329 changetype: modify
1330 delete: unicodePwd
1331 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1332 add: unicodePwd
1333 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1334 """)
1335
1336     def test_change_password5(self):
1337         """Make sure rights have no influence on dBCSPwd"""
1338         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1339         sddl = desc.as_sddl(self.domain_sid)
1340         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1341         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1342         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1343         mod = "(D;;WP;;;PS)"
1344         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1345         try:
1346             self.ldb_user.modify_ldif("""
1347 dn: """ + self.get_user_dn(self.user_with_wp) + """
1348 changetype: modify
1349 delete: dBCSPwd
1350 dBCSPwd: XXXXXXXXXXXXXXXX
1351 add: dBCSPwd
1352 dBCSPwd: YYYYYYYYYYYYYYYY
1353 """)
1354         except LdbError, (num, _):
1355             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1356         else:
1357             self.fail()
1358
1359     def test_change_password6(self):
1360         """Test uneven delete/adds"""
1361         try:
1362             self.ldb_user.modify_ldif("""
1363 dn: """ + self.get_user_dn(self.user_with_wp) + """
1364 changetype: modify
1365 delete: userPassword
1366 userPassword: thatsAcomplPASS1
1367 delete: userPassword
1368 userPassword: thatsAcomplPASS1
1369 add: userPassword
1370 userPassword: thatsAcomplPASS2
1371 """)
1372         except LdbError, (num, _):
1373             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1374         else:
1375             self.fail()
1376         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1377         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1378         try:
1379             self.ldb_user.modify_ldif("""
1380 dn: """ + self.get_user_dn(self.user_with_wp) + """
1381 changetype: modify
1382 delete: userPassword
1383 userPassword: thatsAcomplPASS1
1384 delete: userPassword
1385 userPassword: thatsAcomplPASS1
1386 add: userPassword
1387 userPassword: thatsAcomplPASS2
1388 """)
1389             # This fails on Windows 2000 domain level with constraint violation
1390         except LdbError, (num, _):
1391             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1392                             num == ERR_UNWILLING_TO_PERFORM)
1393         else:
1394             self.fail()
1395
1396
1397     def test_change_password7(self):
1398         """Try a password change operation without any CARs given"""
1399         #users have change password by default - remove for negative testing
1400         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1401         sddl = desc.as_sddl(self.domain_sid)
1402         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1403         #first change our own password
1404         self.ldb_user2.modify_ldif("""
1405 dn: """ + self.get_user_dn(self.user_with_pc) + """
1406 changetype: modify
1407 delete: unicodePwd
1408 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1409 add: unicodePwd
1410 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1411 """)
1412         #then someone else's
1413         self.ldb_user2.modify_ldif("""
1414 dn: """ + self.get_user_dn(self.user_with_wp) + """
1415 changetype: modify
1416 delete: unicodePwd
1417 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1418 add: unicodePwd
1419 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1420 """)
1421
1422     def test_reset_password1(self):
1423         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1424         try:
1425             self.ldb_user.modify_ldif("""
1426 dn: """ + self.get_user_dn(self.user_with_wp) + """
1427 changetype: modify
1428 replace: unicodePwd
1429 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1430 """)
1431         except LdbError, (num, _):
1432             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1433         else:
1434             self.fail()
1435         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1436         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1437         self.ldb_user.modify_ldif("""
1438 dn: """ + self.get_user_dn(self.user_with_wp) + """
1439 changetype: modify
1440 replace: unicodePwd
1441 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1442 """)
1443
1444     def test_reset_password2(self):
1445         """Try a user password reset operation (userPassword) before and after granting CAR"""
1446         try:
1447             self.ldb_user.modify_ldif("""
1448 dn: """ + self.get_user_dn(self.user_with_wp) + """
1449 changetype: modify
1450 replace: userPassword
1451 userPassword: thatsAcomplPASS1
1452 """)
1453         except LdbError, (num, _):
1454             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1455         else:
1456             self.fail()
1457         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1458         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1459         try:
1460             self.ldb_user.modify_ldif("""
1461 dn: """ + self.get_user_dn(self.user_with_wp) + """
1462 changetype: modify
1463 replace: userPassword
1464 userPassword: thatsAcomplPASS1
1465 """)
1466             # This fails on Windows 2000 domain level with constraint violation
1467         except LdbError, (num, _):
1468             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1469
1470     def test_reset_password3(self):
1471         """Grant WP and see what happens (unicodePwd)"""
1472         mod = "(A;;WP;;;PS)"
1473         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1474         try:
1475             self.ldb_user.modify_ldif("""
1476 dn: """ + self.get_user_dn(self.user_with_wp) + """
1477 changetype: modify
1478 replace: unicodePwd
1479 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1480 """)
1481         except LdbError, (num, _):
1482             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1483         else:
1484             self.fail()
1485
1486     def test_reset_password4(self):
1487         """Grant WP and see what happens (userPassword)"""
1488         mod = "(A;;WP;;;PS)"
1489         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1490         try:
1491             self.ldb_user.modify_ldif("""
1492 dn: """ + self.get_user_dn(self.user_with_wp) + """
1493 changetype: modify
1494 replace: userPassword
1495 userPassword: thatsAcomplPASS1
1496 """)
1497         except LdbError, (num, _):
1498             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1499         else:
1500             self.fail()
1501
1502     def test_reset_password5(self):
1503         """Explicitly deny WP but grant CAR (unicodePwd)"""
1504         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1505         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1506         self.ldb_user.modify_ldif("""
1507 dn: """ + self.get_user_dn(self.user_with_wp) + """
1508 changetype: modify
1509 replace: unicodePwd
1510 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1511 """)
1512
1513     def test_reset_password6(self):
1514         """Explicitly deny WP but grant CAR (userPassword)"""
1515         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1516         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1517         try:
1518             self.ldb_user.modify_ldif("""
1519 dn: """ + self.get_user_dn(self.user_with_wp) + """
1520 changetype: modify
1521 replace: userPassword
1522 userPassword: thatsAcomplPASS1
1523 """)
1524             # This fails on Windows 2000 domain level with constraint violation
1525         except LdbError, (num, _):
1526             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1527
1528 class AclExtendedTests(AclTests):
1529
1530     def setUp(self):
1531         super(AclExtendedTests, self).setUp()
1532         #regular user, will be the creator
1533         self.u1 = "ext_u1"
1534         #regular user
1535         self.u2 = "ext_u2"
1536         #admin user
1537         self.u3 = "ext_u3"
1538         self.ldb_admin.newuser(self.u1, self.user_pass)
1539         self.ldb_admin.newuser(self.u2, self.user_pass)
1540         self.ldb_admin.newuser(self.u3, self.user_pass)
1541         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1542                                                 add_members_operation=True)
1543         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1544         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1545         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1546         self.user_sid1 = self.get_object_sid(self.get_user_dn(self.u1))
1547         self.user_sid2 = self.get_object_sid(self.get_user_dn(self.u2))
1548
1549     def tearDown(self):
1550         super(AclExtendedTests, self).tearDown()
1551         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1552         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1553         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1554         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1555         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1556
1557     def test_ntSecurityDescriptor(self):
1558         #create empty ou
1559         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1560         #give u1 Create children access
1561         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1562         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1563         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1564         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1565         #create a group under that, grant RP to u2
1566         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
1567         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1568         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1569         #u2 must not read the descriptor
1570         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1571                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1572         self.assertNotEqual(res,[])
1573         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1574         #grant RC to u2 - still no access
1575         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1576         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1577         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1578                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1579         self.assertNotEqual(res,[])
1580         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1581         #u3 is member of administrators group, should be able to read sd
1582         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1583                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1584         self.assertEqual(len(res),1)
1585         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1586
1587 # Important unit running information
1588
1589 if not "://" in host:
1590     host = "ldap://%s" % host
1591 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp)
1592
1593 runner = SubunitTestRunner()
1594 rc = 0
1595 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1596     rc = 1
1597 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1598     rc = 1
1599 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1600     rc = 1
1601 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1602     rc = 1
1603 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1604     rc = 1
1605 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1606     rc = 1
1607 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1608     rc = 1
1609
1610
1611 sys.exit(rc)