fb6676693ec8d6783fc952ebfa833421c8d2c6dd
[kai/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9
10 sys.path.append("bin/python")
11 import samba
12 samba.ensure_external_module("testtools", "testtools")
13 samba.ensure_external_module("subunit", "subunit/python")
14
15 import samba.getopt as options
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
24 from samba.ndr import ndr_pack, ndr_unpack
25 from samba.dcerpc import security
26
27 from samba.auth import system_session
28 from samba import gensec
29 from samba.samdb import SamDB
30 from samba.credentials import Credentials
31 import samba.tests
32 from samba.tests import delete_force
33 from subunit.run import SubunitTestRunner
34 import unittest
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51
52 lp = sambaopts.get_loadparm()
53 creds = credopts.get_credentials(lp)
54 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
55
56 #
57 # Tests start here
58 #
59
60 class AclTests(samba.tests.TestCase):
61
62     def find_domain_sid(self, ldb):
63         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
64         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
65
66     def setUp(self):
67         super(AclTests, self).setUp()
68         self.ldb_admin = ldb
69         self.base_dn = ldb.domain_dn()
70         self.domain_sid = self.find_domain_sid(self.ldb_admin)
71         self.user_pass = "samba123@"
72         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
73         print "baseDN: %s" % self.base_dn
74
75     def get_user_dn(self, name):
76         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
77
78     def modify_desc(self, object_dn, desc):
79         """ Modify security descriptor using either SDDL string
80             or security.descriptor object
81         """
82         assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
83         mod = """
84 dn: """ + object_dn + """
85 changetype: modify
86 replace: nTSecurityDescriptor
87 """
88         if isinstance(desc, str):
89             mod += "nTSecurityDescriptor: %s" % desc
90         elif isinstance(desc, security.descriptor):
91             mod += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
92         self.ldb_admin.modify_ldif(mod)
93
94     def read_desc(self, object_dn):
95         res = self.ldb_admin.search(object_dn, SCOPE_BASE, None, ["nTSecurityDescriptor"])
96         desc = res[0]["nTSecurityDescriptor"][0]
97         return ndr_unpack(security.descriptor, desc)
98
99     def get_ldb_connection(self, target_username, target_password):
100         creds_tmp = Credentials()
101         creds_tmp.set_username(target_username)
102         creds_tmp.set_password(target_password)
103         creds_tmp.set_domain(creds.get_domain())
104         creds_tmp.set_realm(creds.get_realm())
105         creds_tmp.set_workstation(creds.get_workstation())
106         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
107                                       | gensec.FEATURE_SEAL)
108         ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
109         return ldb_target
110
111     def get_object_sid(self, object_dn):
112         res = self.ldb_admin.search(object_dn)
113         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
114
115     def dacl_add_ace(self, object_dn, ace):
116         desc = self.read_desc(object_dn)
117         desc_sddl = desc.as_sddl(self.domain_sid)
118         if ace in desc_sddl:
119             return
120         if desc_sddl.find("(") >= 0:
121             desc_sddl = desc_sddl[:desc_sddl.index("(")] + ace + desc_sddl[desc_sddl.index("("):]
122         else:
123             desc_sddl = desc_sddl + ace
124         self.modify_desc(object_dn, desc_sddl)
125
126     def get_desc_sddl(self, object_dn):
127         """ Return object nTSecutiryDescriptor in SDDL format
128         """
129         desc = self.read_desc(object_dn)
130         return desc.as_sddl(self.domain_sid)
131
132     # Test if we have any additional groups for users than default ones
133     def assert_user_no_group_member(self, username):
134         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
135         try:
136             self.assertEqual(res[0]["memberOf"][0], "")
137         except KeyError:
138             pass
139         else:
140             self.fail()
141
142 #tests on ldap add operations
143 class AclAddTests(AclTests):
144
145     def setUp(self):
146         super(AclAddTests, self).setUp()
147         # Domain admin that will be creator of OU parent-child structure
148         self.usr_admin_owner = "acl_add_user1"
149         # Second domain admin that will not be creator of OU parent-child structure
150         self.usr_admin_not_owner = "acl_add_user2"
151         # Regular user
152         self.regular_user = "acl_add_user3"
153         self.test_user1 = "test_add_user1"
154         self.test_group1 = "test_add_group1"
155         self.ou1 = "OU=test_add_ou1"
156         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
157         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
158         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
159         self.ldb_admin.newuser(self.regular_user, self.user_pass)
160
161         # add admins to the Domain Admins group
162         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
163                        add_members_operation=True)
164         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
165                        add_members_operation=True)
166
167         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
168         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
169         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
170
171     def tearDown(self):
172         super(AclAddTests, self).tearDown()
173         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
174                           (self.test_user1, self.ou2, self.base_dn))
175         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
176                           (self.test_group1, self.ou2, self.base_dn))
177         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
178         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
179         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
180         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
181         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
182
183     # Make sure top OU is deleted (and so everything under it)
184     def assert_top_ou_deleted(self):
185         res = self.ldb_admin.search(self.base_dn,
186             expression="(distinguishedName=%s,%s)" % (
187                 "OU=test_add_ou1", self.base_dn))
188         self.assertEqual(res, [])
189
190     def test_add_u1(self):
191         """Testing OU with the rights of Doman Admin not creator of the OU """
192         self.assert_top_ou_deleted()
193         # Change descriptor for top level OU
194         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
195         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
196         user_sid = self.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
197         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
198         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
199         # Test user and group creation with another domain admin's credentials
200         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
201         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
202                                    grouptype=4)
203         # Make sure we HAVE created the two objects -- user and group
204         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
205         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
206         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
207         self.assertTrue(len(res) > 0)
208         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
209         self.assertTrue(len(res) > 0)
210
211     def test_add_u2(self):
212         """Testing OU with the regular user that has no rights granted over the OU """
213         self.assert_top_ou_deleted()
214         # Create a parent-child OU structure with domain admin credentials
215         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
216         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
217         # Test user and group creation with regular user credentials
218         try:
219             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
220             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
221                                    grouptype=4)
222         except LdbError, (num, _):
223             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
224         else:
225             self.fail()
226         # Make sure we HAVEN'T created any of two objects -- user or group
227         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
228         self.assertEqual(res, [])
229         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
230         self.assertEqual(res, [])
231
232     def test_add_u3(self):
233         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
234         self.assert_top_ou_deleted()
235         # Change descriptor for top level OU
236         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
237         user_sid = self.get_object_sid(self.get_user_dn(self.regular_user))
238         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
239         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
240         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
241         # Test user and group creation with granted user only to one of the objects
242         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
243         try:
244             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
245                                    grouptype=4)
246         except LdbError, (num, _):
247             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
248         else:
249             self.fail()
250         # Make sure we HAVE created the one of two objects -- user
251         res = self.ldb_admin.search(self.base_dn,
252                 expression="(distinguishedName=%s,%s)" %
253                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
254                     self.base_dn))
255         self.assertNotEqual(len(res), 0)
256         res = self.ldb_admin.search(self.base_dn,
257                 expression="(distinguishedName=%s,%s)" %
258                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
259                     self.base_dn) )
260         self.assertEqual(res, [])
261
262     def test_add_u4(self):
263         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
264         self.assert_top_ou_deleted()
265         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
266         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
267         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
268         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
269                                  grouptype=4)
270         # Make sure we have successfully created the two objects -- user and group
271         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
272         self.assertTrue(len(res) > 0)
273         res = self.ldb_admin.search(self.base_dn,
274                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
275         self.assertTrue(len(res) > 0)
276
277 #tests on ldap modify operations
278 class AclModifyTests(AclTests):
279
280     def setUp(self):
281         super(AclModifyTests, self).setUp()
282         self.user_with_wp = "acl_mod_user1"
283         self.user_with_sm = "acl_mod_user2"
284         self.user_with_group_sm = "acl_mod_user3"
285         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
286         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
287         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
288         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
289         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
290         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
291         self.user_sid = self.get_object_sid( self.get_user_dn(self.user_with_wp))
292         self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
293         self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
294         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
295
296     def tearDown(self):
297         super(AclModifyTests, self).tearDown()
298         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
299         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
300         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
301         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
302         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
303         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
304         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
305         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
306         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
307
308     def test_modify_u1(self):
309         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
310         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
311         # First test object -- User
312         print "Testing modify on User object"
313         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
314         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
315         ldif = """
316 dn: """ + self.get_user_dn("test_modify_user1") + """
317 changetype: modify
318 replace: displayName
319 displayName: test_changed"""
320         self.ldb_user.modify_ldif(ldif)
321         res = self.ldb_admin.search(self.base_dn,
322                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
323         self.assertEqual(res[0]["displayName"][0], "test_changed")
324         # Second test object -- Group
325         print "Testing modify on Group object"
326         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
327         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
328         ldif = """
329 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
330 changetype: modify
331 replace: displayName
332 displayName: test_changed"""
333         self.ldb_user.modify_ldif(ldif)
334         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
335         self.assertEqual(res[0]["displayName"][0], "test_changed")
336         # Third test object -- Organizational Unit
337         print "Testing modify on OU object"
338         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
339         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
340         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
341         ldif = """
342 dn: OU=test_modify_ou1,""" + self.base_dn + """
343 changetype: modify
344 replace: displayName
345 displayName: test_changed"""
346         self.ldb_user.modify_ldif(ldif)
347         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
348         self.assertEqual(res[0]["displayName"][0], "test_changed")
349
350     def test_modify_u2(self):
351         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
352         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
353         # First test object -- User
354         print "Testing modify on User object"
355         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
356         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
357         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
358         # Modify on attribute you have rights for
359         ldif = """
360 dn: """ + self.get_user_dn("test_modify_user1") + """
361 changetype: modify
362 replace: displayName
363 displayName: test_changed"""
364         self.ldb_user.modify_ldif(ldif)
365         res = self.ldb_admin.search(self.base_dn,
366                 expression="(distinguishedName=%s)" %
367                 self.get_user_dn("test_modify_user1"))
368         self.assertEqual(res[0]["displayName"][0], "test_changed")
369         # Modify on attribute you do not have rights for granted
370         ldif = """
371 dn: """ + self.get_user_dn("test_modify_user1") + """
372 changetype: modify
373 replace: url
374 url: www.samba.org"""
375         try:
376             self.ldb_user.modify_ldif(ldif)
377         except LdbError, (num, _):
378             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
379         else:
380             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
381             self.fail()
382         # Second test object -- Group
383         print "Testing modify on Group object"
384         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
385         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
386         ldif = """
387 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
388 changetype: modify
389 replace: displayName
390 displayName: test_changed"""
391         self.ldb_user.modify_ldif(ldif)
392         res = self.ldb_admin.search(self.base_dn,
393                 expression="(distinguishedName=%s)" %
394                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
395         self.assertEqual(res[0]["displayName"][0], "test_changed")
396         # Modify on attribute you do not have rights for granted
397         ldif = """
398 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
399 changetype: modify
400 replace: url
401 url: www.samba.org"""
402         try:
403             self.ldb_user.modify_ldif(ldif)
404         except LdbError, (num, _):
405             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
406         else:
407             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
408             self.fail()
409         # Second test object -- Organizational Unit
410         print "Testing modify on OU object"
411         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
412         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
413         ldif = """
414 dn: OU=test_modify_ou1,""" + self.base_dn + """
415 changetype: modify
416 replace: displayName
417 displayName: test_changed"""
418         self.ldb_user.modify_ldif(ldif)
419         res = self.ldb_admin.search(self.base_dn,
420                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
421                     + self.base_dn))
422         self.assertEqual(res[0]["displayName"][0], "test_changed")
423         # Modify on attribute you do not have rights for granted
424         ldif = """
425 dn: OU=test_modify_ou1,""" + self.base_dn + """
426 changetype: modify
427 replace: url
428 url: www.samba.org"""
429         try:
430             self.ldb_user.modify_ldif(ldif)
431         except LdbError, (num, _):
432             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
433         else:
434             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
435             self.fail()
436
437     def test_modify_u3(self):
438         """7 Modify one attribute as you have no what so ever rights granted"""
439         # First test object -- User
440         print "Testing modify on User object"
441         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
442         # Modify on attribute you do not have rights for granted
443         ldif = """
444 dn: """ + self.get_user_dn("test_modify_user1") + """
445 changetype: modify
446 replace: url
447 url: www.samba.org"""
448         try:
449             self.ldb_user.modify_ldif(ldif)
450         except LdbError, (num, _):
451             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
452         else:
453             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
454             self.fail()
455
456         # Second test object -- Group
457         print "Testing modify on Group object"
458         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
459         # Modify on attribute you do not have rights for granted
460         ldif = """
461 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
462 changetype: modify
463 replace: url
464 url: www.samba.org"""
465         try:
466             self.ldb_user.modify_ldif(ldif)
467         except LdbError, (num, _):
468             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
469         else:
470             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
471             self.fail()
472
473         # Second test object -- Organizational Unit
474         print "Testing modify on OU object"
475         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
476         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
477         # Modify on attribute you do not have rights for granted
478         ldif = """
479 dn: OU=test_modify_ou1,""" + self.base_dn + """
480 changetype: modify
481 replace: url
482 url: www.samba.org"""
483         try:
484             self.ldb_user.modify_ldif(ldif)
485         except LdbError, (num, _):
486             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
487         else:
488             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
489             self.fail()
490
491
492     def test_modify_u4(self):
493         """11 Grant WP to PRINCIPAL_SELF and test modify"""
494         ldif = """
495 dn: """ + self.get_user_dn(self.user_with_wp) + """
496 changetype: modify
497 add: adminDescription
498 adminDescription: blah blah blah"""
499         try:
500             self.ldb_user.modify_ldif(ldif)
501         except LdbError, (num, _):
502             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
503         else:
504             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
505             self.fail()
506
507         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
508         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
509         # Modify on attribute you have rights for
510         self.ldb_user.modify_ldif(ldif)
511         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
512                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
513         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
514
515     def test_modify_u5(self):
516         """12 test self membership"""
517         ldif = """
518 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
519 changetype: modify
520 add: Member
521 Member: """ +  self.get_user_dn(self.user_with_sm)
522 #the user has no rights granted, this should fail
523         try:
524             self.ldb_user2.modify_ldif(ldif)
525         except LdbError, (num, _):
526             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
527         else:
528             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
529             self.fail()
530
531 #grant self-membership, should be able to add himself
532         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
533         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
534         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
535         self.ldb_user2.modify_ldif(ldif)
536         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
537                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
538         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
539 #but not other users
540         ldif = """
541 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
542 changetype: modify
543 add: Member
544 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
545         try:
546             self.ldb_user2.modify_ldif(ldif)
547         except LdbError, (num, _):
548             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
549         else:
550             self.fail()
551
552     def test_modify_u6(self):
553         """13 test self membership"""
554         ldif = """
555 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
556 changetype: modify
557 add: Member
558 Member: """ +  self.get_user_dn(self.user_with_sm) + """
559 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
560
561 #grant self-membership, should be able to add himself  but not others at the same time
562         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
563         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
564         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
565         try:
566             self.ldb_user2.modify_ldif(ldif)
567         except LdbError, (num, _):
568             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
569         else:
570             self.fail()
571
572     def test_modify_u7(self):
573         """13 User with WP modifying Member"""
574 #a second user is given write property permission
575         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_wp))
576         mod = "(A;;WP;;;%s)" % str(user_sid)
577         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
578         ldif = """
579 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
580 changetype: modify
581 add: Member
582 Member: """ +  self.get_user_dn(self.user_with_wp)
583         self.ldb_user.modify_ldif(ldif)
584         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
585                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
586         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
587         ldif = """
588 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
589 changetype: modify
590 delete: Member"""
591         self.ldb_user.modify_ldif(ldif)
592         ldif = """
593 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
594 changetype: modify
595 add: Member
596 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
597         self.ldb_user.modify_ldif(ldif)
598         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
599                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
600         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
601
602 #enable these when we have search implemented
603 class AclSearchTests(AclTests):
604
605     def setUp(self):
606         super(AclSearchTests, self).setUp()
607         self.u1 = "search_u1"
608         self.u2 = "search_u2"
609         self.u3 = "search_u3"
610         self.group1 = "group1"
611         self.creds_tmp = Credentials()
612         self.creds_tmp.set_username("")
613         self.creds_tmp.set_password("")
614         self.creds_tmp.set_domain(creds.get_domain())
615         self.creds_tmp.set_realm(creds.get_realm())
616         self.creds_tmp.set_workstation(creds.get_workstation())
617         self.anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
618         self.dsheuristics = self.ldb_admin.get_dsheuristics()
619         self.ldb_admin.newuser(self.u1, self.user_pass)
620         self.ldb_admin.newuser(self.u2, self.user_pass)
621         self.ldb_admin.newuser(self.u3, self.user_pass)
622         self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
623         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
624                                                 add_members_operation=True)
625         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
626         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
627         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
628         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
629                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
630                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
631                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
632                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
633                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
634         self.user_sid = self.get_object_sid(self.get_user_dn(self.u1))
635         self.group_sid = self.get_object_sid(self.get_user_dn(self.group1))
636
637     def create_clean_ou(self, object_dn):
638         """ Base repeating setup for unittests to follow """
639         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
640                 expression="distinguishedName=%s" % object_dn)
641         # Make sure top testing OU has been deleted before starting the test
642         self.assertEqual(res, [])
643         self.ldb_admin.create_ou(object_dn)
644         desc_sddl = self.get_desc_sddl(object_dn)
645         # Make sure there are inheritable ACEs initially
646         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
647         # Find and remove all inherit ACEs
648         res = re.findall("\(.*?\)", desc_sddl)
649         res = [x for x in res if ("CI" in x) or ("OI" in x)]
650         for x in res:
651             desc_sddl = desc_sddl.replace(x, "")
652         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
653         # can propagate from above
654         # remove SACL, we are not interested
655         desc_sddl = desc_sddl.replace(":AI", ":AIP")
656         self.modify_desc(object_dn, desc_sddl)
657         # Verify all inheritable ACEs are gone
658         desc_sddl = self.get_desc_sddl(object_dn)
659         self.assertFalse("CI" in desc_sddl)
660         self.assertFalse("OI" in desc_sddl)
661
662     def tearDown(self):
663         super(AclSearchTests, self).tearDown()
664         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
665         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
666         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
667         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
668         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
669         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
670         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
671         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
672         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
673         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
674         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
675         delete_force(self.ldb_admin, self.get_user_dn("group1"))
676
677     def test_search_anonymous1(self):
678         """Verify access of rootDSE with the correct request"""
679         res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
680         self.assertEquals(len(res), 1)
681         #verify some of the attributes
682         #dont care about values
683         self.assertTrue("ldapServiceName" in res[0])
684         self.assertTrue("namingContexts" in res[0])
685         self.assertTrue("isSynchronized" in res[0])
686         self.assertTrue("dsServiceName" in res[0])
687         self.assertTrue("supportedSASLMechanisms" in res[0])
688         self.assertTrue("isGlobalCatalogReady" in res[0])
689         self.assertTrue("domainControllerFunctionality" in res[0])
690         self.assertTrue("serverName" in res[0])
691
692     def test_search_anonymous2(self):
693         """Make sure we cannot access anything else"""
694         try:
695             res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
696         except LdbError, (num, _):
697             self.assertEquals(num, ERR_OPERATIONS_ERROR)
698         else:
699             self.fail()
700         try:
701             res = self.anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
702         except LdbError, (num, _):
703             self.assertEquals(num, ERR_OPERATIONS_ERROR)
704         else:
705             self.fail()
706         try:
707             res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
708                                         scope=SCOPE_SUBTREE)
709         except LdbError, (num, _):
710             self.assertEquals(num, ERR_OPERATIONS_ERROR)
711         else:
712             self.fail()
713
714     def test_search_anonymous3(self):
715         """Set dsHeuristics and repeat"""
716         self.ldb_admin.set_dsheuristics("0000002")
717         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
718         mod = "(A;CI;LC;;;AN)"
719         self.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
720         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
721         res = self.anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
722                                     expression="(objectClass=*)", scope=SCOPE_SUBTREE)
723         self.assertEquals(len(res), 1)
724         self.assertTrue("dn" in res[0])
725         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
726                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
727         res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
728                                     scope=SCOPE_SUBTREE)
729         self.assertEquals(len(res), 1)
730         self.assertTrue("dn" in res[0])
731         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
732         self.ldb_admin.set_dsheuristics(self.dsheuristics)
733
734     def test_search1(self):
735         """Make sure users can see us if given LC to user and group"""
736         self.create_clean_ou("OU=ou1," + self.base_dn)
737         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
738         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
739         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
740                                                  self.domain_sid)
741         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
742         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
743         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
744         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
745         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
746
747         #regular users must see only ou1 and ou2
748         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
749                                     scope=SCOPE_SUBTREE)
750         self.assertEquals(len(res), 2)
751         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
752                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
753
754         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
755         self.assertEquals(sorted(res_list), sorted(ok_list))
756
757         #these users should see all ous
758         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
759                                     scope=SCOPE_SUBTREE)
760         self.assertEquals(len(res), 6)
761         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
762         self.assertEquals(sorted(res_list), sorted(self.full_list))
763
764         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
765                                     scope=SCOPE_SUBTREE)
766         self.assertEquals(len(res), 6)
767         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
768         self.assertEquals(sorted(res_list), sorted(self.full_list))
769
770     def test_search2(self):
771         """Make sure users can't see us if access is explicitly denied"""
772         self.create_clean_ou("OU=ou1," + self.base_dn)
773         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
774         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
775         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
776         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
777         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
778         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
779         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
780         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
781                                     scope=SCOPE_SUBTREE)
782         #this user should see all ous
783         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
784         self.assertEquals(sorted(res_list), sorted(self.full_list))
785
786         #these users should see ou1, 2, 5 and 6 but not 3 and 4
787         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
788                                     scope=SCOPE_SUBTREE)
789         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
790                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
791                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
792                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
793         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
794         self.assertEquals(sorted(res_list), sorted(ok_list))
795
796         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
797                                     scope=SCOPE_SUBTREE)
798         self.assertEquals(len(res), 4)
799         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
800         self.assertEquals(sorted(res_list), sorted(ok_list))
801
802     def test_search3(self):
803         """Make sure users can't see ous if access is explicitly denied - 2"""
804         self.create_clean_ou("OU=ou1," + self.base_dn)
805         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
806         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
807         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
808                                                  self.domain_sid)
809         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
810         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
811         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
812         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
813         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
814
815         print "Testing correct behavior on nonaccessible search base"
816         try:
817              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
818                                    scope=SCOPE_BASE)
819         except LdbError, (num, _):
820             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
821         else:
822             self.fail()
823
824         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
825         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
826
827         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
828                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
829
830         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
831                                     scope=SCOPE_SUBTREE)
832         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
833         self.assertEquals(sorted(res_list), sorted(ok_list))
834
835         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
836                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
837                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
838                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
839
840         #should not see ou3 and ou4, but should see ou5 and ou6
841         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
842                                     scope=SCOPE_SUBTREE)
843         self.assertEquals(len(res), 4)
844         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
845         self.assertEquals(sorted(res_list), sorted(ok_list))
846
847         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
848                                     scope=SCOPE_SUBTREE)
849         self.assertEquals(len(res), 4)
850         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
851         self.assertEquals(sorted(res_list), sorted(ok_list))
852
853     def test_search4(self):
854         """There is no difference in visibility if the user is also creator"""
855         self.create_clean_ou("OU=ou1," + self.base_dn)
856         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
857         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
858         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
859                                                  self.domain_sid)
860         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
861         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
862         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
863         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
864         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
865
866         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
867                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
868         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
869                                     scope=SCOPE_SUBTREE)
870         self.assertEquals(len(res), 2)
871         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
872         self.assertEquals(sorted(res_list), sorted(ok_list))
873
874         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
875                                     scope=SCOPE_SUBTREE)
876         self.assertEquals(len(res), 2)
877         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
878         self.assertEquals(sorted(res_list), sorted(ok_list))
879
880     def test_search5(self):
881         """Make sure users can see only attributes they are allowed to see"""
882         self.create_clean_ou("OU=ou1," + self.base_dn)
883         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
884         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
885         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
886                                                  self.domain_sid)
887         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
888         # assert user can only see dn
889         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
890                                     scope=SCOPE_SUBTREE)
891         ok_list = ['dn']
892         self.assertEquals(len(res), 1)
893         res_list = res[0].keys()
894         self.assertEquals(res_list, ok_list)
895
896         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
897                                     scope=SCOPE_BASE, attrs=["ou"])
898
899         self.assertEquals(len(res), 1)
900         res_list = res[0].keys()
901         self.assertEquals(res_list, ok_list)
902
903         #give read property on ou and assert user can only see dn and ou
904         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
905         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
906         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
907         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
908                                     scope=SCOPE_SUBTREE)
909         ok_list = ['dn', 'ou']
910         self.assertEquals(len(res), 1)
911         res_list = res[0].keys()
912         self.assertEquals(sorted(res_list), sorted(ok_list))
913
914         #give read property on Public Information and assert user can see ou and other members
915         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
916         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
917         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
918         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
919                                     scope=SCOPE_SUBTREE)
920
921         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
922         res_list = res[0].keys()
923         self.assertEquals(sorted(res_list), sorted(ok_list))
924
925     def test_search6(self):
926         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
927         self.create_clean_ou("OU=ou1," + self.base_dn)
928         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
929         self.dacl_add_ace("OU=ou1," + self.base_dn, mod)
930         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
931                                                  self.domain_sid)
932         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
933         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
934
935         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
936                                     scope=SCOPE_SUBTREE)
937         #nothing should be returned as ou is not accessible
938         self.assertEquals(res, [])
939
940         #give read property on ou and assert user can only see dn and ou
941         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
942         self.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
943         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
944                                     scope=SCOPE_SUBTREE)
945         self.assertEquals(len(res), 1)
946         ok_list = ['dn', 'ou']
947         res_list = res[0].keys()
948         self.assertEquals(sorted(res_list), sorted(ok_list))
949
950         #give read property on Public Information and assert user can see ou and other members
951         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
952         self.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
953         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
954                                    scope=SCOPE_SUBTREE)
955         self.assertEquals(len(res), 1)
956         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
957         res_list = res[0].keys()
958         self.assertEquals(sorted(res_list), sorted(ok_list))
959
960 #tests on ldap delete operations
961 class AclDeleteTests(AclTests):
962
963     def setUp(self):
964         super(AclDeleteTests, self).setUp()
965         self.regular_user = "acl_delete_user1"
966         # Create regular user
967         self.ldb_admin.newuser(self.regular_user, self.user_pass)
968         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
969
970     def tearDown(self):
971         super(AclDeleteTests, self).tearDown()
972         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
973         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
974
975     def test_delete_u1(self):
976         """User is prohibited by default to delete another User object"""
977         # Create user that we try to delete
978         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
979         # Here delete User object should ALWAYS through exception
980         try:
981             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
982         except LdbError, (num, _):
983             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
984         else:
985             self.fail()
986
987     def test_delete_u2(self):
988         """User's group has RIGHT_DELETE to another User object"""
989         user_dn = self.get_user_dn("test_delete_user1")
990         # Create user that we try to delete
991         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
992         mod = "(A;;SD;;;AU)"
993         self.dacl_add_ace(user_dn, mod)
994         # Try to delete User object
995         self.ldb_user.delete(user_dn)
996         res = self.ldb_admin.search(self.base_dn,
997                 expression="(distinguishedName=%s)" % user_dn)
998         self.assertEqual(res, [])
999
1000     def test_delete_u3(self):
1001         """User indentified by SID has RIGHT_DELETE to another User object"""
1002         user_dn = self.get_user_dn("test_delete_user1")
1003         # Create user that we try to delete
1004         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1005         mod = "(A;;SD;;;%s)" % self.get_object_sid(self.get_user_dn(self.regular_user))
1006         self.dacl_add_ace(user_dn, mod)
1007         # Try to delete User object
1008         self.ldb_user.delete(user_dn)
1009         res = self.ldb_admin.search(self.base_dn,
1010                 expression="(distinguishedName=%s)" % user_dn)
1011         self.assertEqual(res, [])
1012
1013 #tests on ldap rename operations
1014 class AclRenameTests(AclTests):
1015
1016     def setUp(self):
1017         super(AclRenameTests, self).setUp()
1018         self.regular_user = "acl_rename_user1"
1019         self.ou1 = "OU=test_rename_ou1"
1020         self.ou2 = "OU=test_rename_ou2"
1021         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1022         self.testuser1 = "test_rename_user1"
1023         self.testuser2 = "test_rename_user2"
1024         self.testuser3 = "test_rename_user3"
1025         self.testuser4 = "test_rename_user4"
1026         self.testuser5 = "test_rename_user5"
1027         # Create regular user
1028         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1029         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1030
1031     def tearDown(self):
1032         super(AclRenameTests, self).tearDown()
1033         # Rename OU3
1034         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1035         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1036         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1037         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1038         # Rename OU2
1039         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1040         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1041         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1042         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1043         # Rename OU1
1044         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1045         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1046         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1047         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1048         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1049         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1050
1051     def test_rename_u1(self):
1052         """Regular user fails to rename 'User object' within single OU"""
1053         # Create OU structure
1054         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1055         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1056         try:
1057             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1058                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1059         except LdbError, (num, _):
1060             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1061         else:
1062             self.fail()
1063
1064     def test_rename_u2(self):
1065         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1066         ou_dn = "OU=test_rename_ou1," + self.base_dn
1067         user_dn = "CN=test_rename_user1," + ou_dn
1068         rename_user_dn = "CN=test_rename_user5," + ou_dn
1069         # Create OU structure
1070         self.ldb_admin.create_ou(ou_dn)
1071         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1072         mod = "(A;;WP;;;AU)"
1073         self.dacl_add_ace(user_dn, mod)
1074         # Rename 'User object' having WP to AU
1075         self.ldb_user.rename(user_dn, rename_user_dn)
1076         res = self.ldb_admin.search(self.base_dn,
1077                 expression="(distinguishedName=%s)" % user_dn)
1078         self.assertEqual(res, [])
1079         res = self.ldb_admin.search(self.base_dn,
1080                 expression="(distinguishedName=%s)" % rename_user_dn)
1081         self.assertNotEqual(res, [])
1082
1083     def test_rename_u3(self):
1084         """Test rename with rights granted to 'User object' SID"""
1085         ou_dn = "OU=test_rename_ou1," + self.base_dn
1086         user_dn = "CN=test_rename_user1," + ou_dn
1087         rename_user_dn = "CN=test_rename_user5," + ou_dn
1088         # Create OU structure
1089         self.ldb_admin.create_ou(ou_dn)
1090         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1091         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1092         mod = "(A;;WP;;;%s)" % str(sid)
1093         self.dacl_add_ace(user_dn, mod)
1094         # Rename 'User object' having WP to AU
1095         self.ldb_user.rename(user_dn, rename_user_dn)
1096         res = self.ldb_admin.search(self.base_dn,
1097                 expression="(distinguishedName=%s)" % user_dn)
1098         self.assertEqual(res, [])
1099         res = self.ldb_admin.search(self.base_dn,
1100                 expression="(distinguishedName=%s)" % rename_user_dn)
1101         self.assertNotEqual(res, [])
1102
1103     def test_rename_u4(self):
1104         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1105         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1106         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1107         user_dn = "CN=test_rename_user2," + ou1_dn
1108         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1109         # Create OU structure
1110         self.ldb_admin.create_ou(ou1_dn)
1111         self.ldb_admin.create_ou(ou2_dn)
1112         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1113         mod = "(A;;WPSD;;;AU)"
1114         self.dacl_add_ace(user_dn, mod)
1115         mod = "(A;;CC;;;AU)"
1116         self.dacl_add_ace(ou2_dn, mod)
1117         # Rename 'User object' having SD and CC to AU
1118         self.ldb_user.rename(user_dn, rename_user_dn)
1119         res = self.ldb_admin.search(self.base_dn,
1120                 expression="(distinguishedName=%s)" % user_dn)
1121         self.assertEqual(res, [])
1122         res = self.ldb_admin.search(self.base_dn,
1123                 expression="(distinguishedName=%s)" % rename_user_dn)
1124         self.assertNotEqual(res, [])
1125
1126     def test_rename_u5(self):
1127         """Test rename with rights granted to 'User object' SID"""
1128         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1129         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1130         user_dn = "CN=test_rename_user2," + ou1_dn
1131         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1132         # Create OU structure
1133         self.ldb_admin.create_ou(ou1_dn)
1134         self.ldb_admin.create_ou(ou2_dn)
1135         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1136         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1137         mod = "(A;;WPSD;;;%s)" % str(sid)
1138         self.dacl_add_ace(user_dn, mod)
1139         mod = "(A;;CC;;;%s)" % str(sid)
1140         self.dacl_add_ace(ou2_dn, mod)
1141         # Rename 'User object' having SD and CC to AU
1142         self.ldb_user.rename(user_dn, rename_user_dn)
1143         res = self.ldb_admin.search(self.base_dn,
1144                 expression="(distinguishedName=%s)" % user_dn)
1145         self.assertEqual(res, [])
1146         res = self.ldb_admin.search(self.base_dn,
1147                 expression="(distinguishedName=%s)" % rename_user_dn)
1148         self.assertNotEqual(res, [])
1149
1150     def test_rename_u6(self):
1151         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1152         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1153         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1154         user_dn = "CN=test_rename_user2," + ou1_dn
1155         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1156         # Create OU structure
1157         self.ldb_admin.create_ou(ou1_dn)
1158         self.ldb_admin.create_ou(ou2_dn)
1159         #mod = "(A;CI;DCWP;;;AU)"
1160         mod = "(A;;DC;;;AU)"
1161         self.dacl_add_ace(ou1_dn, mod)
1162         mod = "(A;;CC;;;AU)"
1163         self.dacl_add_ace(ou2_dn, mod)
1164         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1165         mod = "(A;;WP;;;AU)"
1166         self.dacl_add_ace(user_dn, mod)
1167         # Rename 'User object' having SD and CC to AU
1168         self.ldb_user.rename(user_dn, rename_user_dn)
1169         res = self.ldb_admin.search(self.base_dn,
1170                 expression="(distinguishedName=%s)" % user_dn)
1171         self.assertEqual(res, [])
1172         res = self.ldb_admin.search(self.base_dn,
1173                 expression="(distinguishedName=%s)" % rename_user_dn)
1174         self.assertNotEqual(res, [])
1175
1176     def test_rename_u7(self):
1177         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1178         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1179         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1180         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1181         user_dn = "CN=test_rename_user2," + ou1_dn
1182         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1183         # Create OU structure
1184         self.ldb_admin.create_ou(ou1_dn)
1185         self.ldb_admin.create_ou(ou2_dn)
1186         self.ldb_admin.create_ou(ou3_dn)
1187         mod = "(A;CI;WPDC;;;AU)"
1188         self.dacl_add_ace(ou1_dn, mod)
1189         mod = "(A;;CC;;;AU)"
1190         self.dacl_add_ace(ou3_dn, mod)
1191         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1192         # Rename 'User object' having SD and CC to AU
1193         self.ldb_user.rename(user_dn, rename_user_dn)
1194         res = self.ldb_admin.search(self.base_dn,
1195                 expression="(distinguishedName=%s)" % user_dn)
1196         self.assertEqual(res, [])
1197         res = self.ldb_admin.search(self.base_dn,
1198                 expression="(distinguishedName=%s)" % rename_user_dn)
1199         self.assertNotEqual(res, [])
1200
1201     def test_rename_u8(self):
1202         """Test rename on an object with and without modify access on the RDN attribute"""
1203         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1204         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1205         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1206         # Create OU structure
1207         self.ldb_admin.create_ou(ou1_dn)
1208         self.ldb_admin.create_ou(ou2_dn)
1209         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1210         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1211         self.dacl_add_ace(ou2_dn, mod)
1212         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1213         self.dacl_add_ace(ou2_dn, mod)
1214         try:
1215             self.ldb_user.rename(ou2_dn, ou3_dn)
1216         except LdbError, (num, _):
1217             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1218         else:
1219             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1220             self.fail()
1221         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1222         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1223         self.dacl_add_ace(ou2_dn, mod)
1224         self.ldb_user.rename(ou2_dn, ou3_dn)
1225         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1226         self.assertEqual(res, [])
1227         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1228         self.assertNotEqual(res, [])
1229
1230 #tests on Control Access Rights
1231 class AclCARTests(AclTests):
1232
1233     def setUp(self):
1234         super(AclCARTests, self).setUp()
1235         self.user_with_wp = "acl_car_user1"
1236         self.user_with_pc = "acl_car_user2"
1237         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1238         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1239         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1240         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1241
1242         res = self.ldb_admin.search("CN=Directory Service, CN=Windows NT, CN=Services, "
1243                  + self.configuration_dn, scope=SCOPE_BASE, attrs=["dSHeuristics"])
1244         if "dSHeuristics" in res[0]:
1245             self.dsheuristics = res[0]["dSHeuristics"][0]
1246         else:
1247             self.dsheuristics = None
1248
1249         self.minPwdAge = self.ldb_admin.get_minPwdAge()
1250
1251         # Set the "dSHeuristics" to have the tests run against Windows Server
1252         self.ldb_admin.set_dsheuristics("000000001")
1253         # Set minPwdAge to 0
1254         self.ldb_admin.set_minPwdAge("0")
1255
1256     def tearDown(self):
1257         super(AclCARTests, self).tearDown()
1258         #restore original values
1259         self.ldb_admin.set_dsheuristics(self.dsheuristics)
1260         self.ldb_admin.set_minPwdAge(self.minPwdAge)
1261         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1262         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1263
1264     def test_change_password1(self):
1265         """Try a password change operation without any CARs given"""
1266         #users have change password by default - remove for negative testing
1267         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1268         sddl = desc.as_sddl(self.domain_sid)
1269         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1270         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1271         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1272         try:
1273             self.ldb_user.modify_ldif("""
1274 dn: """ + self.get_user_dn(self.user_with_wp) + """
1275 changetype: modify
1276 delete: unicodePwd
1277 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1278 add: unicodePwd
1279 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1280 """)
1281         except LdbError, (num, _):
1282             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1283         else:
1284             # for some reason we get constraint violation instead of insufficient access error
1285             self.fail()
1286
1287     def test_change_password2(self):
1288         """Make sure WP has no influence"""
1289         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1290         sddl = desc.as_sddl(self.domain_sid)
1291         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1292         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1293         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1294         mod = "(A;;WP;;;PS)"
1295         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1296         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1297         sddl = desc.as_sddl(self.domain_sid)
1298         try:
1299             self.ldb_user.modify_ldif("""
1300 dn: """ + self.get_user_dn(self.user_with_wp) + """
1301 changetype: modify
1302 delete: unicodePwd
1303 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1304 add: unicodePwd
1305 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1306 """)
1307         except LdbError, (num, _):
1308             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1309         else:
1310             # for some reason we get constraint violation instead of insufficient access error
1311             self.fail()
1312
1313     def test_change_password3(self):
1314         """Make sure WP has no influence"""
1315         mod = "(D;;WP;;;PS)"
1316         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1317         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1318         sddl = desc.as_sddl(self.domain_sid)
1319         self.ldb_user.modify_ldif("""
1320 dn: """ + self.get_user_dn(self.user_with_wp) + """
1321 changetype: modify
1322 delete: unicodePwd
1323 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1324 add: unicodePwd
1325 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1326 """)
1327
1328     def test_change_password5(self):
1329         """Make sure rights have no influence on dBCSPwd"""
1330         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1331         sddl = desc.as_sddl(self.domain_sid)
1332         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1333         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1334         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1335         mod = "(D;;WP;;;PS)"
1336         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1337         try:
1338             self.ldb_user.modify_ldif("""
1339 dn: """ + self.get_user_dn(self.user_with_wp) + """
1340 changetype: modify
1341 delete: dBCSPwd
1342 dBCSPwd: XXXXXXXXXXXXXXXX
1343 add: dBCSPwd
1344 dBCSPwd: YYYYYYYYYYYYYYYY
1345 """)
1346         except LdbError, (num, _):
1347             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1348         else:
1349             self.fail()
1350
1351     def test_change_password6(self):
1352         """Test uneven delete/adds"""
1353         try:
1354             self.ldb_user.modify_ldif("""
1355 dn: """ + self.get_user_dn(self.user_with_wp) + """
1356 changetype: modify
1357 delete: userPassword
1358 userPassword: thatsAcomplPASS1
1359 delete: userPassword
1360 userPassword: thatsAcomplPASS1
1361 add: userPassword
1362 userPassword: thatsAcomplPASS2
1363 """)
1364         except LdbError, (num, _):
1365             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1366         else:
1367             self.fail()
1368         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1369         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1370         try:
1371             self.ldb_user.modify_ldif("""
1372 dn: """ + self.get_user_dn(self.user_with_wp) + """
1373 changetype: modify
1374 delete: userPassword
1375 userPassword: thatsAcomplPASS1
1376 delete: userPassword
1377 userPassword: thatsAcomplPASS1
1378 add: userPassword
1379 userPassword: thatsAcomplPASS2
1380 """)
1381             # This fails on Windows 2000 domain level with constraint violation
1382         except LdbError, (num, _):
1383             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1384                             num == ERR_UNWILLING_TO_PERFORM)
1385         else:
1386             self.fail()
1387
1388
1389     def test_change_password7(self):
1390         """Try a password change operation without any CARs given"""
1391         #users have change password by default - remove for negative testing
1392         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1393         sddl = desc.as_sddl(self.domain_sid)
1394         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1395         #first change our own password
1396         self.ldb_user2.modify_ldif("""
1397 dn: """ + self.get_user_dn(self.user_with_pc) + """
1398 changetype: modify
1399 delete: unicodePwd
1400 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1401 add: unicodePwd
1402 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1403 """)
1404         #then someone else's
1405         self.ldb_user2.modify_ldif("""
1406 dn: """ + self.get_user_dn(self.user_with_wp) + """
1407 changetype: modify
1408 delete: unicodePwd
1409 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1410 add: unicodePwd
1411 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1412 """)
1413
1414     def test_reset_password1(self):
1415         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1416         try:
1417             self.ldb_user.modify_ldif("""
1418 dn: """ + self.get_user_dn(self.user_with_wp) + """
1419 changetype: modify
1420 replace: unicodePwd
1421 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1422 """)
1423         except LdbError, (num, _):
1424             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1425         else:
1426             self.fail()
1427         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1428         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1429         self.ldb_user.modify_ldif("""
1430 dn: """ + self.get_user_dn(self.user_with_wp) + """
1431 changetype: modify
1432 replace: unicodePwd
1433 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1434 """)
1435
1436     def test_reset_password2(self):
1437         """Try a user password reset operation (userPassword) before and after granting CAR"""
1438         try:
1439             self.ldb_user.modify_ldif("""
1440 dn: """ + self.get_user_dn(self.user_with_wp) + """
1441 changetype: modify
1442 replace: userPassword
1443 userPassword: thatsAcomplPASS1
1444 """)
1445         except LdbError, (num, _):
1446             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1447         else:
1448             self.fail()
1449         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1450         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1451         try:
1452             self.ldb_user.modify_ldif("""
1453 dn: """ + self.get_user_dn(self.user_with_wp) + """
1454 changetype: modify
1455 replace: userPassword
1456 userPassword: thatsAcomplPASS1
1457 """)
1458             # This fails on Windows 2000 domain level with constraint violation
1459         except LdbError, (num, _):
1460             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1461
1462     def test_reset_password3(self):
1463         """Grant WP and see what happens (unicodePwd)"""
1464         mod = "(A;;WP;;;PS)"
1465         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1466         try:
1467             self.ldb_user.modify_ldif("""
1468 dn: """ + self.get_user_dn(self.user_with_wp) + """
1469 changetype: modify
1470 replace: unicodePwd
1471 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1472 """)
1473         except LdbError, (num, _):
1474             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1475         else:
1476             self.fail()
1477
1478     def test_reset_password4(self):
1479         """Grant WP and see what happens (userPassword)"""
1480         mod = "(A;;WP;;;PS)"
1481         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1482         try:
1483             self.ldb_user.modify_ldif("""
1484 dn: """ + self.get_user_dn(self.user_with_wp) + """
1485 changetype: modify
1486 replace: userPassword
1487 userPassword: thatsAcomplPASS1
1488 """)
1489         except LdbError, (num, _):
1490             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1491         else:
1492             self.fail()
1493
1494     def test_reset_password5(self):
1495         """Explicitly deny WP but grant CAR (unicodePwd)"""
1496         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1497         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1498         self.ldb_user.modify_ldif("""
1499 dn: """ + self.get_user_dn(self.user_with_wp) + """
1500 changetype: modify
1501 replace: unicodePwd
1502 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1503 """)
1504
1505     def test_reset_password6(self):
1506         """Explicitly deny WP but grant CAR (userPassword)"""
1507         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1508         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1509         try:
1510             self.ldb_user.modify_ldif("""
1511 dn: """ + self.get_user_dn(self.user_with_wp) + """
1512 changetype: modify
1513 replace: userPassword
1514 userPassword: thatsAcomplPASS1
1515 """)
1516             # This fails on Windows 2000 domain level with constraint violation
1517         except LdbError, (num, _):
1518             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1519
1520 class AclExtendedTests(AclTests):
1521
1522     def setUp(self):
1523         super(AclExtendedTests, self).setUp()
1524         #regular user, will be the creator
1525         self.u1 = "ext_u1"
1526         #regular user
1527         self.u2 = "ext_u2"
1528         #admin user
1529         self.u3 = "ext_u3"
1530         self.ldb_admin.newuser(self.u1, self.user_pass)
1531         self.ldb_admin.newuser(self.u2, self.user_pass)
1532         self.ldb_admin.newuser(self.u3, self.user_pass)
1533         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1534                                                 add_members_operation=True)
1535         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1536         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1537         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1538         self.user_sid1 = self.get_object_sid(self.get_user_dn(self.u1))
1539         self.user_sid2 = self.get_object_sid(self.get_user_dn(self.u2))
1540
1541     def tearDown(self):
1542         super(AclExtendedTests, self).tearDown()
1543         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1544         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1545         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1546         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1547         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1548
1549     def test_ntSecurityDescriptor(self):
1550         #create empty ou
1551         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1552         #give u1 Create children access
1553         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1554         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1555         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1556         self.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1557         #create a group under that, grant RP to u2
1558         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
1559         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1560         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1561         #u2 must not read the descriptor
1562         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1563                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1564         self.assertNotEqual(res,[])
1565         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1566         #grant RC to u2 - still no access
1567         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1568         self.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1569         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1570                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1571         self.assertNotEqual(res,[])
1572         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1573         #u3 is member of administrators group, should be able to read sd
1574         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1575                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1576         self.assertEqual(len(res),1)
1577         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1578
1579 # Important unit running information
1580
1581 if not "://" in host:
1582     host = "ldap://%s" % host
1583 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp)
1584
1585 runner = SubunitTestRunner()
1586 rc = 0
1587 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1588     rc = 1
1589 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1590     rc = 1
1591 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1592     rc = 1
1593 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1594     rc = 1
1595 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1596     rc = 1
1597 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1598     rc = 1
1599 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1600     rc = 1
1601
1602
1603 sys.exit(rc)