s4-dsdb/tests/python: Explicitly pass comamnd line LoadParm() instance to system_sess...
[amitay/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9
10 sys.path.append("bin/python")
11 import samba
12 samba.ensure_external_module("testtools", "testtools")
13 samba.ensure_external_module("subunit", "subunit/python")
14
15 import samba.getopt as options
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
24 from samba.ndr import ndr_pack, ndr_unpack
25 from samba.dcerpc import security
26
27 from samba.auth import system_session
28 from samba import gensec, sd_utils
29 from samba.samdb import SamDB
30 from samba.credentials import Credentials
31 import samba.tests
32 from samba.tests import delete_force
33 from subunit.run import SubunitTestRunner
34 import unittest
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51
52 lp = sambaopts.get_loadparm()
53 creds = credopts.get_credentials(lp)
54 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
55
56 #
57 # Tests start here
58 #
59
60 class AclTests(samba.tests.TestCase):
61
62     def setUp(self):
63         super(AclTests, self).setUp()
64         self.ldb_admin = ldb
65         self.base_dn = ldb.domain_dn()
66         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
67         self.user_pass = "samba123@"
68         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
69         self.sd_utils = sd_utils.SDUtils(ldb)
70         print "baseDN: %s" % self.base_dn
71
72     def get_user_dn(self, name):
73         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
74
75     def get_ldb_connection(self, target_username, target_password):
76         creds_tmp = Credentials()
77         creds_tmp.set_username(target_username)
78         creds_tmp.set_password(target_password)
79         creds_tmp.set_domain(creds.get_domain())
80         creds_tmp.set_realm(creds.get_realm())
81         creds_tmp.set_workstation(creds.get_workstation())
82         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
83                                       | gensec.FEATURE_SEAL)
84         ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
85         return ldb_target
86
87     # Test if we have any additional groups for users than default ones
88     def assert_user_no_group_member(self, username):
89         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
90         try:
91             self.assertEqual(res[0]["memberOf"][0], "")
92         except KeyError:
93             pass
94         else:
95             self.fail()
96
97 #tests on ldap add operations
98 class AclAddTests(AclTests):
99
100     def setUp(self):
101         super(AclAddTests, self).setUp()
102         # Domain admin that will be creator of OU parent-child structure
103         self.usr_admin_owner = "acl_add_user1"
104         # Second domain admin that will not be creator of OU parent-child structure
105         self.usr_admin_not_owner = "acl_add_user2"
106         # Regular user
107         self.regular_user = "acl_add_user3"
108         self.test_user1 = "test_add_user1"
109         self.test_group1 = "test_add_group1"
110         self.ou1 = "OU=test_add_ou1"
111         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
112         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
113         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
114         self.ldb_admin.newuser(self.regular_user, self.user_pass)
115
116         # add admins to the Domain Admins group
117         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
118                        add_members_operation=True)
119         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
120                        add_members_operation=True)
121
122         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
123         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
124         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
125
126     def tearDown(self):
127         super(AclAddTests, self).tearDown()
128         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
129                           (self.test_user1, self.ou2, self.base_dn))
130         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
131                           (self.test_group1, self.ou2, self.base_dn))
132         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
133         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
134         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
135         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
136         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
137
138     # Make sure top OU is deleted (and so everything under it)
139     def assert_top_ou_deleted(self):
140         res = self.ldb_admin.search(self.base_dn,
141             expression="(distinguishedName=%s,%s)" % (
142                 "OU=test_add_ou1", self.base_dn))
143         self.assertEqual(res, [])
144
145     def test_add_u1(self):
146         """Testing OU with the rights of Doman Admin not creator of the OU """
147         self.assert_top_ou_deleted()
148         # Change descriptor for top level OU
149         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
150         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
151         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
152         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
153         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
154         # Test user and group creation with another domain admin's credentials
155         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
156         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
157                                    grouptype=4)
158         # Make sure we HAVE created the two objects -- user and group
159         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
160         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
161         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
162         self.assertTrue(len(res) > 0)
163         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
164         self.assertTrue(len(res) > 0)
165
166     def test_add_u2(self):
167         """Testing OU with the regular user that has no rights granted over the OU """
168         self.assert_top_ou_deleted()
169         # Create a parent-child OU structure with domain admin credentials
170         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
171         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
172         # Test user and group creation with regular user credentials
173         try:
174             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
175             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
176                                    grouptype=4)
177         except LdbError, (num, _):
178             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
179         else:
180             self.fail()
181         # Make sure we HAVEN'T created any of two objects -- user or group
182         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
183         self.assertEqual(res, [])
184         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
185         self.assertEqual(res, [])
186
187     def test_add_u3(self):
188         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
189         self.assert_top_ou_deleted()
190         # Change descriptor for top level OU
191         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
192         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
193         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
194         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
195         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
196         # Test user and group creation with granted user only to one of the objects
197         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
198         try:
199             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
200                                    grouptype=4)
201         except LdbError, (num, _):
202             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
203         else:
204             self.fail()
205         # Make sure we HAVE created the one of two objects -- user
206         res = self.ldb_admin.search(self.base_dn,
207                 expression="(distinguishedName=%s,%s)" %
208                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
209                     self.base_dn))
210         self.assertNotEqual(len(res), 0)
211         res = self.ldb_admin.search(self.base_dn,
212                 expression="(distinguishedName=%s,%s)" %
213                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
214                     self.base_dn) )
215         self.assertEqual(res, [])
216
217     def test_add_u4(self):
218         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
219         self.assert_top_ou_deleted()
220         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
221         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
222         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
223         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
224                                  grouptype=4)
225         # Make sure we have successfully created the two objects -- user and group
226         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
227         self.assertTrue(len(res) > 0)
228         res = self.ldb_admin.search(self.base_dn,
229                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
230         self.assertTrue(len(res) > 0)
231
232 #tests on ldap modify operations
233 class AclModifyTests(AclTests):
234
235     def setUp(self):
236         super(AclModifyTests, self).setUp()
237         self.user_with_wp = "acl_mod_user1"
238         self.user_with_sm = "acl_mod_user2"
239         self.user_with_group_sm = "acl_mod_user3"
240         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
241         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
242         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
243         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
244         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
245         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
246         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
247         self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
248         self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
249         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
250
251     def tearDown(self):
252         super(AclModifyTests, self).tearDown()
253         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
254         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
255         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
256         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
257         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
258         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
259         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
260         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
261         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
262
263     def test_modify_u1(self):
264         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
265         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
266         # First test object -- User
267         print "Testing modify on User object"
268         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
269         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
270         ldif = """
271 dn: """ + self.get_user_dn("test_modify_user1") + """
272 changetype: modify
273 replace: displayName
274 displayName: test_changed"""
275         self.ldb_user.modify_ldif(ldif)
276         res = self.ldb_admin.search(self.base_dn,
277                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
278         self.assertEqual(res[0]["displayName"][0], "test_changed")
279         # Second test object -- Group
280         print "Testing modify on Group object"
281         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
282         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
283         ldif = """
284 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
285 changetype: modify
286 replace: displayName
287 displayName: test_changed"""
288         self.ldb_user.modify_ldif(ldif)
289         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
290         self.assertEqual(res[0]["displayName"][0], "test_changed")
291         # Third test object -- Organizational Unit
292         print "Testing modify on OU object"
293         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
294         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
295         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
296         ldif = """
297 dn: OU=test_modify_ou1,""" + self.base_dn + """
298 changetype: modify
299 replace: displayName
300 displayName: test_changed"""
301         self.ldb_user.modify_ldif(ldif)
302         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
303         self.assertEqual(res[0]["displayName"][0], "test_changed")
304
305     def test_modify_u2(self):
306         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
307         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
308         # First test object -- User
309         print "Testing modify on User object"
310         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
311         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
312         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
313         # Modify on attribute you have rights for
314         ldif = """
315 dn: """ + self.get_user_dn("test_modify_user1") + """
316 changetype: modify
317 replace: displayName
318 displayName: test_changed"""
319         self.ldb_user.modify_ldif(ldif)
320         res = self.ldb_admin.search(self.base_dn,
321                 expression="(distinguishedName=%s)" %
322                 self.get_user_dn("test_modify_user1"))
323         self.assertEqual(res[0]["displayName"][0], "test_changed")
324         # Modify on attribute you do not have rights for granted
325         ldif = """
326 dn: """ + self.get_user_dn("test_modify_user1") + """
327 changetype: modify
328 replace: url
329 url: www.samba.org"""
330         try:
331             self.ldb_user.modify_ldif(ldif)
332         except LdbError, (num, _):
333             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
334         else:
335             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
336             self.fail()
337         # Second test object -- Group
338         print "Testing modify on Group object"
339         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
340         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
341         ldif = """
342 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
343 changetype: modify
344 replace: displayName
345 displayName: test_changed"""
346         self.ldb_user.modify_ldif(ldif)
347         res = self.ldb_admin.search(self.base_dn,
348                 expression="(distinguishedName=%s)" %
349                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
350         self.assertEqual(res[0]["displayName"][0], "test_changed")
351         # Modify on attribute you do not have rights for granted
352         ldif = """
353 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
354 changetype: modify
355 replace: url
356 url: www.samba.org"""
357         try:
358             self.ldb_user.modify_ldif(ldif)
359         except LdbError, (num, _):
360             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
361         else:
362             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
363             self.fail()
364         # Second test object -- Organizational Unit
365         print "Testing modify on OU object"
366         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
367         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
368         ldif = """
369 dn: OU=test_modify_ou1,""" + self.base_dn + """
370 changetype: modify
371 replace: displayName
372 displayName: test_changed"""
373         self.ldb_user.modify_ldif(ldif)
374         res = self.ldb_admin.search(self.base_dn,
375                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
376                     + self.base_dn))
377         self.assertEqual(res[0]["displayName"][0], "test_changed")
378         # Modify on attribute you do not have rights for granted
379         ldif = """
380 dn: OU=test_modify_ou1,""" + self.base_dn + """
381 changetype: modify
382 replace: url
383 url: www.samba.org"""
384         try:
385             self.ldb_user.modify_ldif(ldif)
386         except LdbError, (num, _):
387             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
388         else:
389             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
390             self.fail()
391
392     def test_modify_u3(self):
393         """7 Modify one attribute as you have no what so ever rights granted"""
394         # First test object -- User
395         print "Testing modify on User object"
396         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
397         # Modify on attribute you do not have rights for granted
398         ldif = """
399 dn: """ + self.get_user_dn("test_modify_user1") + """
400 changetype: modify
401 replace: url
402 url: www.samba.org"""
403         try:
404             self.ldb_user.modify_ldif(ldif)
405         except LdbError, (num, _):
406             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
407         else:
408             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
409             self.fail()
410
411         # Second test object -- Group
412         print "Testing modify on Group object"
413         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
414         # Modify on attribute you do not have rights for granted
415         ldif = """
416 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
417 changetype: modify
418 replace: url
419 url: www.samba.org"""
420         try:
421             self.ldb_user.modify_ldif(ldif)
422         except LdbError, (num, _):
423             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
424         else:
425             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
426             self.fail()
427
428         # Second test object -- Organizational Unit
429         print "Testing modify on OU object"
430         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
431         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
432         # Modify on attribute you do not have rights for granted
433         ldif = """
434 dn: OU=test_modify_ou1,""" + self.base_dn + """
435 changetype: modify
436 replace: url
437 url: www.samba.org"""
438         try:
439             self.ldb_user.modify_ldif(ldif)
440         except LdbError, (num, _):
441             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
442         else:
443             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
444             self.fail()
445
446
447     def test_modify_u4(self):
448         """11 Grant WP to PRINCIPAL_SELF and test modify"""
449         ldif = """
450 dn: """ + self.get_user_dn(self.user_with_wp) + """
451 changetype: modify
452 add: adminDescription
453 adminDescription: blah blah blah"""
454         try:
455             self.ldb_user.modify_ldif(ldif)
456         except LdbError, (num, _):
457             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
458         else:
459             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
460             self.fail()
461
462         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
463         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
464         # Modify on attribute you have rights for
465         self.ldb_user.modify_ldif(ldif)
466         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
467                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
468         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
469
470     def test_modify_u5(self):
471         """12 test self membership"""
472         ldif = """
473 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
474 changetype: modify
475 add: Member
476 Member: """ +  self.get_user_dn(self.user_with_sm)
477 #the user has no rights granted, this should fail
478         try:
479             self.ldb_user2.modify_ldif(ldif)
480         except LdbError, (num, _):
481             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
482         else:
483             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
484             self.fail()
485
486 #grant self-membership, should be able to add himself
487         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
488         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
489         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
490         self.ldb_user2.modify_ldif(ldif)
491         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
492                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
493         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
494 #but not other users
495         ldif = """
496 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
497 changetype: modify
498 add: Member
499 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
500         try:
501             self.ldb_user2.modify_ldif(ldif)
502         except LdbError, (num, _):
503             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
504         else:
505             self.fail()
506
507     def test_modify_u6(self):
508         """13 test self membership"""
509         ldif = """
510 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
511 changetype: modify
512 add: Member
513 Member: """ +  self.get_user_dn(self.user_with_sm) + """
514 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
515
516 #grant self-membership, should be able to add himself  but not others at the same time
517         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
518         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
519         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
520         try:
521             self.ldb_user2.modify_ldif(ldif)
522         except LdbError, (num, _):
523             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
524         else:
525             self.fail()
526
527     def test_modify_u7(self):
528         """13 User with WP modifying Member"""
529 #a second user is given write property permission
530         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
531         mod = "(A;;WP;;;%s)" % str(user_sid)
532         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
533         ldif = """
534 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
535 changetype: modify
536 add: Member
537 Member: """ +  self.get_user_dn(self.user_with_wp)
538         self.ldb_user.modify_ldif(ldif)
539         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
540                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
541         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
542         ldif = """
543 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
544 changetype: modify
545 delete: Member"""
546         self.ldb_user.modify_ldif(ldif)
547         ldif = """
548 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
549 changetype: modify
550 add: Member
551 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
552         self.ldb_user.modify_ldif(ldif)
553         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
554                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
555         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
556
557 #enable these when we have search implemented
558 class AclSearchTests(AclTests):
559
560     def setUp(self):
561         super(AclSearchTests, self).setUp()
562         self.u1 = "search_u1"
563         self.u2 = "search_u2"
564         self.u3 = "search_u3"
565         self.group1 = "group1"
566         self.creds_tmp = Credentials()
567         self.creds_tmp.set_username("")
568         self.creds_tmp.set_password("")
569         self.creds_tmp.set_domain(creds.get_domain())
570         self.creds_tmp.set_realm(creds.get_realm())
571         self.creds_tmp.set_workstation(creds.get_workstation())
572         self.ldb_admin.newuser(self.u1, self.user_pass)
573         self.ldb_admin.newuser(self.u2, self.user_pass)
574         self.ldb_admin.newuser(self.u3, self.user_pass)
575         self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
576         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
577                                                 add_members_operation=True)
578         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
579         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
580         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
581         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
582                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
583                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
584                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
585                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
586                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
587         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
588         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
589
590     def create_clean_ou(self, object_dn):
591         """ Base repeating setup for unittests to follow """
592         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
593                 expression="distinguishedName=%s" % object_dn)
594         # Make sure top testing OU has been deleted before starting the test
595         self.assertEqual(res, [])
596         self.ldb_admin.create_ou(object_dn)
597         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
598         # Make sure there are inheritable ACEs initially
599         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
600         # Find and remove all inherit ACEs
601         res = re.findall("\(.*?\)", desc_sddl)
602         res = [x for x in res if ("CI" in x) or ("OI" in x)]
603         for x in res:
604             desc_sddl = desc_sddl.replace(x, "")
605         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
606         # can propagate from above
607         # remove SACL, we are not interested
608         desc_sddl = desc_sddl.replace(":AI", ":AIP")
609         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
610         # Verify all inheritable ACEs are gone
611         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
612         self.assertFalse("CI" in desc_sddl)
613         self.assertFalse("OI" in desc_sddl)
614
615     def tearDown(self):
616         super(AclSearchTests, self).tearDown()
617         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
618         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
619         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
620         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
621         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
622         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
623         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
624         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
625         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
626         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
627         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
628         delete_force(self.ldb_admin, self.get_user_dn("group1"))
629
630     def test_search_anonymous1(self):
631         """Verify access of rootDSE with the correct request"""
632         anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
633         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
634         self.assertEquals(len(res), 1)
635         #verify some of the attributes
636         #dont care about values
637         self.assertTrue("ldapServiceName" in res[0])
638         self.assertTrue("namingContexts" in res[0])
639         self.assertTrue("isSynchronized" in res[0])
640         self.assertTrue("dsServiceName" in res[0])
641         self.assertTrue("supportedSASLMechanisms" in res[0])
642         self.assertTrue("isGlobalCatalogReady" in res[0])
643         self.assertTrue("domainControllerFunctionality" in res[0])
644         self.assertTrue("serverName" in res[0])
645
646     def test_search_anonymous2(self):
647         """Make sure we cannot access anything else"""
648         anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
649         try:
650             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
651         except LdbError, (num, _):
652             self.assertEquals(num, ERR_OPERATIONS_ERROR)
653         else:
654             self.fail()
655         try:
656             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
657         except LdbError, (num, _):
658             self.assertEquals(num, ERR_OPERATIONS_ERROR)
659         else:
660             self.fail()
661         try:
662             res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
663                                         scope=SCOPE_SUBTREE)
664         except LdbError, (num, _):
665             self.assertEquals(num, ERR_OPERATIONS_ERROR)
666         else:
667             self.fail()
668
669     def test_search_anonymous3(self):
670         """Set dsHeuristics and repeat"""
671         self.ldb_admin.set_dsheuristics("0000002")
672         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
673         mod = "(A;CI;LC;;;AN)"
674         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
675         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
676         anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
677         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
678                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
679         self.assertEquals(len(res), 1)
680         self.assertTrue("dn" in res[0])
681         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
682                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
683         res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
684                                scope=SCOPE_SUBTREE)
685         self.assertEquals(len(res), 1)
686         self.assertTrue("dn" in res[0])
687         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
688
689     def test_search1(self):
690         """Make sure users can see us if given LC to user and group"""
691         self.create_clean_ou("OU=ou1," + self.base_dn)
692         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
693         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
694         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
695                                                  self.domain_sid)
696         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
697         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
698         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
699         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
700         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
701
702         #regular users must see only ou1 and ou2
703         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
704                                     scope=SCOPE_SUBTREE)
705         self.assertEquals(len(res), 2)
706         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
707                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
708
709         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
710         self.assertEquals(sorted(res_list), sorted(ok_list))
711
712         #these users should see all ous
713         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
714                                     scope=SCOPE_SUBTREE)
715         self.assertEquals(len(res), 6)
716         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
717         self.assertEquals(sorted(res_list), sorted(self.full_list))
718
719         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
720                                     scope=SCOPE_SUBTREE)
721         self.assertEquals(len(res), 6)
722         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
723         self.assertEquals(sorted(res_list), sorted(self.full_list))
724
725     def test_search2(self):
726         """Make sure users can't see us if access is explicitly denied"""
727         self.create_clean_ou("OU=ou1," + self.base_dn)
728         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
729         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
730         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
731         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
732         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
733         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
734         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
735         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
736                                     scope=SCOPE_SUBTREE)
737         #this user should see all ous
738         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
739         self.assertEquals(sorted(res_list), sorted(self.full_list))
740
741         #these users should see ou1, 2, 5 and 6 but not 3 and 4
742         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
743                                     scope=SCOPE_SUBTREE)
744         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
745                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
746                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
747                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
748         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
749         self.assertEquals(sorted(res_list), sorted(ok_list))
750
751         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
752                                     scope=SCOPE_SUBTREE)
753         self.assertEquals(len(res), 4)
754         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
755         self.assertEquals(sorted(res_list), sorted(ok_list))
756
757     def test_search3(self):
758         """Make sure users can't see ous if access is explicitly denied - 2"""
759         self.create_clean_ou("OU=ou1," + self.base_dn)
760         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
761         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
762         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
763                                                  self.domain_sid)
764         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
765         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
766         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
767         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
768         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
769
770         print "Testing correct behavior on nonaccessible search base"
771         try:
772              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
773                                    scope=SCOPE_BASE)
774         except LdbError, (num, _):
775             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
776         else:
777             self.fail()
778
779         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
780         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
781
782         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
783                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
784
785         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
786                                     scope=SCOPE_SUBTREE)
787         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
788         self.assertEquals(sorted(res_list), sorted(ok_list))
789
790         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
791                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
792                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
793                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
794
795         #should not see ou3 and ou4, but should see ou5 and ou6
796         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
797                                     scope=SCOPE_SUBTREE)
798         self.assertEquals(len(res), 4)
799         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
800         self.assertEquals(sorted(res_list), sorted(ok_list))
801
802         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
803                                     scope=SCOPE_SUBTREE)
804         self.assertEquals(len(res), 4)
805         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
806         self.assertEquals(sorted(res_list), sorted(ok_list))
807
808     def test_search4(self):
809         """There is no difference in visibility if the user is also creator"""
810         self.create_clean_ou("OU=ou1," + self.base_dn)
811         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
812         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
813         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
814                                                  self.domain_sid)
815         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
816         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
817         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
818         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
819         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
820
821         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
822                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
823         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
824                                     scope=SCOPE_SUBTREE)
825         self.assertEquals(len(res), 2)
826         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
827         self.assertEquals(sorted(res_list), sorted(ok_list))
828
829         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
830                                     scope=SCOPE_SUBTREE)
831         self.assertEquals(len(res), 2)
832         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
833         self.assertEquals(sorted(res_list), sorted(ok_list))
834
835     def test_search5(self):
836         """Make sure users can see only attributes they are allowed to see"""
837         self.create_clean_ou("OU=ou1," + self.base_dn)
838         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
839         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
840         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
841                                                  self.domain_sid)
842         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
843         # assert user can only see dn
844         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
845                                     scope=SCOPE_SUBTREE)
846         ok_list = ['dn']
847         self.assertEquals(len(res), 1)
848         res_list = res[0].keys()
849         self.assertEquals(res_list, ok_list)
850
851         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
852                                     scope=SCOPE_BASE, attrs=["ou"])
853
854         self.assertEquals(len(res), 1)
855         res_list = res[0].keys()
856         self.assertEquals(res_list, ok_list)
857
858         #give read property on ou and assert user can only see dn and ou
859         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
860         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
861         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
862         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
863                                     scope=SCOPE_SUBTREE)
864         ok_list = ['dn', 'ou']
865         self.assertEquals(len(res), 1)
866         res_list = res[0].keys()
867         self.assertEquals(sorted(res_list), sorted(ok_list))
868
869         #give read property on Public Information and assert user can see ou and other members
870         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
871         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
872         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
873         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
874                                     scope=SCOPE_SUBTREE)
875
876         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
877         res_list = res[0].keys()
878         self.assertEquals(sorted(res_list), sorted(ok_list))
879
880     def test_search6(self):
881         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
882         self.create_clean_ou("OU=ou1," + self.base_dn)
883         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
884         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
885         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
886                                                  self.domain_sid)
887         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
888         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
889
890         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
891                                     scope=SCOPE_SUBTREE)
892         #nothing should be returned as ou is not accessible
893         self.assertEquals(res, [])
894
895         #give read property on ou and assert user can only see dn and ou
896         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
897         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
898         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
899                                     scope=SCOPE_SUBTREE)
900         self.assertEquals(len(res), 1)
901         ok_list = ['dn', 'ou']
902         res_list = res[0].keys()
903         self.assertEquals(sorted(res_list), sorted(ok_list))
904
905         #give read property on Public Information and assert user can see ou and other members
906         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
907         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
908         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
909                                    scope=SCOPE_SUBTREE)
910         self.assertEquals(len(res), 1)
911         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
912         res_list = res[0].keys()
913         self.assertEquals(sorted(res_list), sorted(ok_list))
914
915 #tests on ldap delete operations
916 class AclDeleteTests(AclTests):
917
918     def setUp(self):
919         super(AclDeleteTests, self).setUp()
920         self.regular_user = "acl_delete_user1"
921         # Create regular user
922         self.ldb_admin.newuser(self.regular_user, self.user_pass)
923         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
924
925     def tearDown(self):
926         super(AclDeleteTests, self).tearDown()
927         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
928         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
929
930     def test_delete_u1(self):
931         """User is prohibited by default to delete another User object"""
932         # Create user that we try to delete
933         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
934         # Here delete User object should ALWAYS through exception
935         try:
936             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
937         except LdbError, (num, _):
938             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
939         else:
940             self.fail()
941
942     def test_delete_u2(self):
943         """User's group has RIGHT_DELETE to another User object"""
944         user_dn = self.get_user_dn("test_delete_user1")
945         # Create user that we try to delete
946         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
947         mod = "(A;;SD;;;AU)"
948         self.sd_utils.dacl_add_ace(user_dn, mod)
949         # Try to delete User object
950         self.ldb_user.delete(user_dn)
951         res = self.ldb_admin.search(self.base_dn,
952                 expression="(distinguishedName=%s)" % user_dn)
953         self.assertEqual(res, [])
954
955     def test_delete_u3(self):
956         """User indentified by SID has RIGHT_DELETE to another User object"""
957         user_dn = self.get_user_dn("test_delete_user1")
958         # Create user that we try to delete
959         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
960         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
961         self.sd_utils.dacl_add_ace(user_dn, mod)
962         # Try to delete User object
963         self.ldb_user.delete(user_dn)
964         res = self.ldb_admin.search(self.base_dn,
965                 expression="(distinguishedName=%s)" % user_dn)
966         self.assertEqual(res, [])
967
968 #tests on ldap rename operations
969 class AclRenameTests(AclTests):
970
971     def setUp(self):
972         super(AclRenameTests, self).setUp()
973         self.regular_user = "acl_rename_user1"
974         self.ou1 = "OU=test_rename_ou1"
975         self.ou2 = "OU=test_rename_ou2"
976         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
977         self.testuser1 = "test_rename_user1"
978         self.testuser2 = "test_rename_user2"
979         self.testuser3 = "test_rename_user3"
980         self.testuser4 = "test_rename_user4"
981         self.testuser5 = "test_rename_user5"
982         # Create regular user
983         self.ldb_admin.newuser(self.regular_user, self.user_pass)
984         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
985
986     def tearDown(self):
987         super(AclRenameTests, self).tearDown()
988         # Rename OU3
989         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
990         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
991         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
992         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
993         # Rename OU2
994         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
995         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
996         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
997         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
998         # Rename OU1
999         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1000         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1001         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1002         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1003         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1004         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1005
1006     def test_rename_u1(self):
1007         """Regular user fails to rename 'User object' within single OU"""
1008         # Create OU structure
1009         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1010         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1011         try:
1012             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1013                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1014         except LdbError, (num, _):
1015             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1016         else:
1017             self.fail()
1018
1019     def test_rename_u2(self):
1020         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1021         ou_dn = "OU=test_rename_ou1," + self.base_dn
1022         user_dn = "CN=test_rename_user1," + ou_dn
1023         rename_user_dn = "CN=test_rename_user5," + ou_dn
1024         # Create OU structure
1025         self.ldb_admin.create_ou(ou_dn)
1026         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1027         mod = "(A;;WP;;;AU)"
1028         self.sd_utils.dacl_add_ace(user_dn, mod)
1029         # Rename 'User object' having WP to AU
1030         self.ldb_user.rename(user_dn, rename_user_dn)
1031         res = self.ldb_admin.search(self.base_dn,
1032                 expression="(distinguishedName=%s)" % user_dn)
1033         self.assertEqual(res, [])
1034         res = self.ldb_admin.search(self.base_dn,
1035                 expression="(distinguishedName=%s)" % rename_user_dn)
1036         self.assertNotEqual(res, [])
1037
1038     def test_rename_u3(self):
1039         """Test rename with rights granted to 'User object' SID"""
1040         ou_dn = "OU=test_rename_ou1," + self.base_dn
1041         user_dn = "CN=test_rename_user1," + ou_dn
1042         rename_user_dn = "CN=test_rename_user5," + ou_dn
1043         # Create OU structure
1044         self.ldb_admin.create_ou(ou_dn)
1045         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1046         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1047         mod = "(A;;WP;;;%s)" % str(sid)
1048         self.sd_utils.dacl_add_ace(user_dn, mod)
1049         # Rename 'User object' having WP to AU
1050         self.ldb_user.rename(user_dn, rename_user_dn)
1051         res = self.ldb_admin.search(self.base_dn,
1052                 expression="(distinguishedName=%s)" % user_dn)
1053         self.assertEqual(res, [])
1054         res = self.ldb_admin.search(self.base_dn,
1055                 expression="(distinguishedName=%s)" % rename_user_dn)
1056         self.assertNotEqual(res, [])
1057
1058     def test_rename_u4(self):
1059         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1060         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1061         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1062         user_dn = "CN=test_rename_user2," + ou1_dn
1063         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1064         # Create OU structure
1065         self.ldb_admin.create_ou(ou1_dn)
1066         self.ldb_admin.create_ou(ou2_dn)
1067         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1068         mod = "(A;;WPSD;;;AU)"
1069         self.sd_utils.dacl_add_ace(user_dn, mod)
1070         mod = "(A;;CC;;;AU)"
1071         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1072         # Rename 'User object' having SD and CC to AU
1073         self.ldb_user.rename(user_dn, rename_user_dn)
1074         res = self.ldb_admin.search(self.base_dn,
1075                 expression="(distinguishedName=%s)" % user_dn)
1076         self.assertEqual(res, [])
1077         res = self.ldb_admin.search(self.base_dn,
1078                 expression="(distinguishedName=%s)" % rename_user_dn)
1079         self.assertNotEqual(res, [])
1080
1081     def test_rename_u5(self):
1082         """Test rename with rights granted to 'User object' SID"""
1083         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1084         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1085         user_dn = "CN=test_rename_user2," + ou1_dn
1086         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1087         # Create OU structure
1088         self.ldb_admin.create_ou(ou1_dn)
1089         self.ldb_admin.create_ou(ou2_dn)
1090         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1091         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1092         mod = "(A;;WPSD;;;%s)" % str(sid)
1093         self.sd_utils.dacl_add_ace(user_dn, mod)
1094         mod = "(A;;CC;;;%s)" % str(sid)
1095         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1096         # Rename 'User object' having SD and CC to AU
1097         self.ldb_user.rename(user_dn, rename_user_dn)
1098         res = self.ldb_admin.search(self.base_dn,
1099                 expression="(distinguishedName=%s)" % user_dn)
1100         self.assertEqual(res, [])
1101         res = self.ldb_admin.search(self.base_dn,
1102                 expression="(distinguishedName=%s)" % rename_user_dn)
1103         self.assertNotEqual(res, [])
1104
1105     def test_rename_u6(self):
1106         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1107         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1108         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1109         user_dn = "CN=test_rename_user2," + ou1_dn
1110         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1111         # Create OU structure
1112         self.ldb_admin.create_ou(ou1_dn)
1113         self.ldb_admin.create_ou(ou2_dn)
1114         #mod = "(A;CI;DCWP;;;AU)"
1115         mod = "(A;;DC;;;AU)"
1116         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1117         mod = "(A;;CC;;;AU)"
1118         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1119         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1120         mod = "(A;;WP;;;AU)"
1121         self.sd_utils.dacl_add_ace(user_dn, mod)
1122         # Rename 'User object' having SD and CC to AU
1123         self.ldb_user.rename(user_dn, rename_user_dn)
1124         res = self.ldb_admin.search(self.base_dn,
1125                 expression="(distinguishedName=%s)" % user_dn)
1126         self.assertEqual(res, [])
1127         res = self.ldb_admin.search(self.base_dn,
1128                 expression="(distinguishedName=%s)" % rename_user_dn)
1129         self.assertNotEqual(res, [])
1130
1131     def test_rename_u7(self):
1132         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1133         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1134         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1135         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1136         user_dn = "CN=test_rename_user2," + ou1_dn
1137         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1138         # Create OU structure
1139         self.ldb_admin.create_ou(ou1_dn)
1140         self.ldb_admin.create_ou(ou2_dn)
1141         self.ldb_admin.create_ou(ou3_dn)
1142         mod = "(A;CI;WPDC;;;AU)"
1143         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1144         mod = "(A;;CC;;;AU)"
1145         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1146         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1147         # Rename 'User object' having SD and CC to AU
1148         self.ldb_user.rename(user_dn, rename_user_dn)
1149         res = self.ldb_admin.search(self.base_dn,
1150                 expression="(distinguishedName=%s)" % user_dn)
1151         self.assertEqual(res, [])
1152         res = self.ldb_admin.search(self.base_dn,
1153                 expression="(distinguishedName=%s)" % rename_user_dn)
1154         self.assertNotEqual(res, [])
1155
1156     def test_rename_u8(self):
1157         """Test rename on an object with and without modify access on the RDN attribute"""
1158         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1159         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1160         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1161         # Create OU structure
1162         self.ldb_admin.create_ou(ou1_dn)
1163         self.ldb_admin.create_ou(ou2_dn)
1164         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1165         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1166         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1167         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1168         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1169         try:
1170             self.ldb_user.rename(ou2_dn, ou3_dn)
1171         except LdbError, (num, _):
1172             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1173         else:
1174             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1175             self.fail()
1176         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1177         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1178         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1179         self.ldb_user.rename(ou2_dn, ou3_dn)
1180         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1181         self.assertEqual(res, [])
1182         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1183         self.assertNotEqual(res, [])
1184
1185 #tests on Control Access Rights
1186 class AclCARTests(AclTests):
1187
1188     def setUp(self):
1189         super(AclCARTests, self).setUp()
1190         self.user_with_wp = "acl_car_user1"
1191         self.user_with_pc = "acl_car_user2"
1192         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1193         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1194         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1195         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1196
1197     def tearDown(self):
1198         super(AclCARTests, self).tearDown()
1199         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1200         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1201
1202     def test_change_password1(self):
1203         """Try a password change operation without any CARs given"""
1204         #users have change password by default - remove for negative testing
1205         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1206         sddl = desc.as_sddl(self.domain_sid)
1207         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1208         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1209         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1210         try:
1211             self.ldb_user.modify_ldif("""
1212 dn: """ + self.get_user_dn(self.user_with_wp) + """
1213 changetype: modify
1214 delete: unicodePwd
1215 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1216 add: unicodePwd
1217 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1218 """)
1219         except LdbError, (num, _):
1220             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1221         else:
1222             # for some reason we get constraint violation instead of insufficient access error
1223             self.fail()
1224
1225     def test_change_password2(self):
1226         """Make sure WP has no influence"""
1227         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1228         sddl = desc.as_sddl(self.domain_sid)
1229         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1230         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1231         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1232         mod = "(A;;WP;;;PS)"
1233         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1234         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1235         sddl = desc.as_sddl(self.domain_sid)
1236         try:
1237             self.ldb_user.modify_ldif("""
1238 dn: """ + self.get_user_dn(self.user_with_wp) + """
1239 changetype: modify
1240 delete: unicodePwd
1241 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1242 add: unicodePwd
1243 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1244 """)
1245         except LdbError, (num, _):
1246             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1247         else:
1248             # for some reason we get constraint violation instead of insufficient access error
1249             self.fail()
1250
1251     def test_change_password3(self):
1252         """Make sure WP has no influence"""
1253         mod = "(D;;WP;;;PS)"
1254         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1255         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1256         sddl = desc.as_sddl(self.domain_sid)
1257         self.ldb_user.modify_ldif("""
1258 dn: """ + self.get_user_dn(self.user_with_wp) + """
1259 changetype: modify
1260 delete: unicodePwd
1261 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1262 add: unicodePwd
1263 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1264 """)
1265
1266     def test_change_password5(self):
1267         """Make sure rights have no influence on dBCSPwd"""
1268         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1269         sddl = desc.as_sddl(self.domain_sid)
1270         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1271         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1272         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1273         mod = "(D;;WP;;;PS)"
1274         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1275         try:
1276             self.ldb_user.modify_ldif("""
1277 dn: """ + self.get_user_dn(self.user_with_wp) + """
1278 changetype: modify
1279 delete: dBCSPwd
1280 dBCSPwd: XXXXXXXXXXXXXXXX
1281 add: dBCSPwd
1282 dBCSPwd: YYYYYYYYYYYYYYYY
1283 """)
1284         except LdbError, (num, _):
1285             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1286         else:
1287             self.fail()
1288
1289     def test_change_password6(self):
1290         """Test uneven delete/adds"""
1291         try:
1292             self.ldb_user.modify_ldif("""
1293 dn: """ + self.get_user_dn(self.user_with_wp) + """
1294 changetype: modify
1295 delete: userPassword
1296 userPassword: thatsAcomplPASS1
1297 delete: userPassword
1298 userPassword: thatsAcomplPASS1
1299 add: userPassword
1300 userPassword: thatsAcomplPASS2
1301 """)
1302         except LdbError, (num, _):
1303             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1304         else:
1305             self.fail()
1306         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1307         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1308         try:
1309             self.ldb_user.modify_ldif("""
1310 dn: """ + self.get_user_dn(self.user_with_wp) + """
1311 changetype: modify
1312 delete: userPassword
1313 userPassword: thatsAcomplPASS1
1314 delete: userPassword
1315 userPassword: thatsAcomplPASS1
1316 add: userPassword
1317 userPassword: thatsAcomplPASS2
1318 """)
1319             # This fails on Windows 2000 domain level with constraint violation
1320         except LdbError, (num, _):
1321             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1322                             num == ERR_UNWILLING_TO_PERFORM)
1323         else:
1324             self.fail()
1325
1326
1327     def test_change_password7(self):
1328         """Try a password change operation without any CARs given"""
1329         #users have change password by default - remove for negative testing
1330         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1331         sddl = desc.as_sddl(self.domain_sid)
1332         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1333         #first change our own password
1334         self.ldb_user2.modify_ldif("""
1335 dn: """ + self.get_user_dn(self.user_with_pc) + """
1336 changetype: modify
1337 delete: unicodePwd
1338 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1339 add: unicodePwd
1340 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1341 """)
1342         #then someone else's
1343         self.ldb_user2.modify_ldif("""
1344 dn: """ + self.get_user_dn(self.user_with_wp) + """
1345 changetype: modify
1346 delete: unicodePwd
1347 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1348 add: unicodePwd
1349 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1350 """)
1351
1352     def test_reset_password1(self):
1353         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1354         try:
1355             self.ldb_user.modify_ldif("""
1356 dn: """ + self.get_user_dn(self.user_with_wp) + """
1357 changetype: modify
1358 replace: unicodePwd
1359 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1360 """)
1361         except LdbError, (num, _):
1362             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1363         else:
1364             self.fail()
1365         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1366         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1367         self.ldb_user.modify_ldif("""
1368 dn: """ + self.get_user_dn(self.user_with_wp) + """
1369 changetype: modify
1370 replace: unicodePwd
1371 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1372 """)
1373
1374     def test_reset_password2(self):
1375         """Try a user password reset operation (userPassword) before and after granting CAR"""
1376         try:
1377             self.ldb_user.modify_ldif("""
1378 dn: """ + self.get_user_dn(self.user_with_wp) + """
1379 changetype: modify
1380 replace: userPassword
1381 userPassword: thatsAcomplPASS1
1382 """)
1383         except LdbError, (num, _):
1384             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1385         else:
1386             self.fail()
1387         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1388         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1389         try:
1390             self.ldb_user.modify_ldif("""
1391 dn: """ + self.get_user_dn(self.user_with_wp) + """
1392 changetype: modify
1393 replace: userPassword
1394 userPassword: thatsAcomplPASS1
1395 """)
1396             # This fails on Windows 2000 domain level with constraint violation
1397         except LdbError, (num, _):
1398             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1399
1400     def test_reset_password3(self):
1401         """Grant WP and see what happens (unicodePwd)"""
1402         mod = "(A;;WP;;;PS)"
1403         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1404         try:
1405             self.ldb_user.modify_ldif("""
1406 dn: """ + self.get_user_dn(self.user_with_wp) + """
1407 changetype: modify
1408 replace: unicodePwd
1409 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1410 """)
1411         except LdbError, (num, _):
1412             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1413         else:
1414             self.fail()
1415
1416     def test_reset_password4(self):
1417         """Grant WP and see what happens (userPassword)"""
1418         mod = "(A;;WP;;;PS)"
1419         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1420         try:
1421             self.ldb_user.modify_ldif("""
1422 dn: """ + self.get_user_dn(self.user_with_wp) + """
1423 changetype: modify
1424 replace: userPassword
1425 userPassword: thatsAcomplPASS1
1426 """)
1427         except LdbError, (num, _):
1428             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1429         else:
1430             self.fail()
1431
1432     def test_reset_password5(self):
1433         """Explicitly deny WP but grant CAR (unicodePwd)"""
1434         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1435         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1436         self.ldb_user.modify_ldif("""
1437 dn: """ + self.get_user_dn(self.user_with_wp) + """
1438 changetype: modify
1439 replace: unicodePwd
1440 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1441 """)
1442
1443     def test_reset_password6(self):
1444         """Explicitly deny WP but grant CAR (userPassword)"""
1445         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1446         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1447         try:
1448             self.ldb_user.modify_ldif("""
1449 dn: """ + self.get_user_dn(self.user_with_wp) + """
1450 changetype: modify
1451 replace: userPassword
1452 userPassword: thatsAcomplPASS1
1453 """)
1454             # This fails on Windows 2000 domain level with constraint violation
1455         except LdbError, (num, _):
1456             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1457
1458 class AclExtendedTests(AclTests):
1459
1460     def setUp(self):
1461         super(AclExtendedTests, self).setUp()
1462         #regular user, will be the creator
1463         self.u1 = "ext_u1"
1464         #regular user
1465         self.u2 = "ext_u2"
1466         #admin user
1467         self.u3 = "ext_u3"
1468         self.ldb_admin.newuser(self.u1, self.user_pass)
1469         self.ldb_admin.newuser(self.u2, self.user_pass)
1470         self.ldb_admin.newuser(self.u3, self.user_pass)
1471         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1472                                                 add_members_operation=True)
1473         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1474         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1475         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1476         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1477         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1478
1479     def tearDown(self):
1480         super(AclExtendedTests, self).tearDown()
1481         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1482         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1483         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1484         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1485         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1486
1487     def test_ntSecurityDescriptor(self):
1488         #create empty ou
1489         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1490         #give u1 Create children access
1491         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1492         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1493         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1494         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1495         #create a group under that, grant RP to u2
1496         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
1497         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1498         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1499         #u2 must not read the descriptor
1500         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1501                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1502         self.assertNotEqual(res,[])
1503         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1504         #grant RC to u2 - still no access
1505         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1506         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1507         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1508                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1509         self.assertNotEqual(res,[])
1510         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1511         #u3 is member of administrators group, should be able to read sd
1512         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1513                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1514         self.assertEqual(len(res),1)
1515         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1516
1517 # Important unit running information
1518
1519 if not "://" in host:
1520     host = "ldap://%s" % host
1521 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
1522
1523 runner = SubunitTestRunner()
1524 rc = 0
1525 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1526     rc = 1
1527 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1528     rc = 1
1529 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1530     rc = 1
1531 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1532     rc = 1
1533
1534 # Get the old "dSHeuristics" if it was set
1535 dsheuristics = ldb.get_dsheuristics()
1536 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1537 ldb.set_dsheuristics("000000001")
1538 # Get the old "minPwdAge"
1539 minPwdAge = ldb.get_minPwdAge()
1540 # Set it temporarely to "0"
1541 ldb.set_minPwdAge("0")
1542 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1543     rc = 1
1544 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1545     rc = 1
1546 # Reset the "dSHeuristics" as they were before
1547 ldb.set_dsheuristics(dsheuristics)
1548 # Reset the "minPwdAge" as it was before
1549 ldb.set_minPwdAge(minPwdAge)
1550
1551 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1552     rc = 1
1553
1554
1555 sys.exit(rc)