s4-dsdb: Implementation of User-Change-Password and User-Force-Password-Change
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9
10 sys.path.append("bin/python")
11 import samba
12 samba.ensure_external_module("subunit", "subunit/python")
13 samba.ensure_external_module("testtools", "testtools")
14
15 import samba.getopt as options
16
17 from ldb import (
18     SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
23 from samba.ndr import ndr_pack, ndr_unpack
24 from samba.dcerpc import security
25
26 from samba.auth import system_session
27 from samba import gensec
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials
30 import samba.tests
31 from subunit.run import SubunitTestRunner
32 import unittest
33
34 parser = optparse.OptionParser("ldap [options] <host>")
35 sambaopts = options.SambaOptions(parser)
36 parser.add_option_group(sambaopts)
37 parser.add_option_group(options.VersionOptions(parser))
38
39 # use command line creds if available
40 credopts = options.CredentialsOptions(parser)
41 parser.add_option_group(credopts)
42 opts, args = parser.parse_args()
43
44 if len(args) < 1:
45     parser.print_usage()
46     sys.exit(1)
47
48 host = args[0]
49
50 lp = sambaopts.get_loadparm()
51 creds = credopts.get_credentials(lp)
52 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
53
54 #
55 # Tests start here
56 #
57
58 class AclTests(samba.tests.TestCase):
59
60     def delete_force(self, ldb, dn):
61         try:
62             ldb.delete(dn)
63         except LdbError, (num, _):
64             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
65
66     def find_basedn(self, ldb):
67         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
68                          attrs=["defaultNamingContext"])
69         self.assertEquals(len(res), 1)
70         return res[0]["defaultNamingContext"][0]
71
72     def find_domain_sid(self, ldb):
73         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
74         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
75
76     def setUp(self):
77         super(AclTests, self).setUp()
78         self.ldb_admin = ldb
79         self.base_dn = self.find_basedn(self.ldb_admin)
80         self.domain_sid = self.find_domain_sid(self.ldb_admin)
81         self.user_pass = "samba123@"
82         print "baseDN: %s" % self.base_dn
83
84     def get_user_dn(self, name):
85         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
86
87     def modify_desc(self, object_dn, desc):
88         """ Modify security descriptor using either SDDL string
89             or security.descriptor object
90         """
91         assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
92         mod = """
93 dn: """ + object_dn + """
94 changetype: modify
95 replace: nTSecurityDescriptor
96 """
97         if isinstance(desc, str):
98             mod += "nTSecurityDescriptor: %s" % desc
99         elif isinstance(desc, security.descriptor):
100             mod += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
101         self.ldb_admin.modify_ldif(mod)
102
103     def add_group_member(self, _ldb, group_dn, member_dn):
104         """ Modify user to ge member of a group 
105             e.g. User to be 'Doamin Admin' group member
106         """
107         ldif = """
108 dn: """ + group_dn + """
109 changetype: modify
110 add: member
111 member: """ + member_dn
112         _ldb.modify_ldif(ldif)
113     
114     def create_ou(self, _ldb, ou_dn, desc=None):
115         ldif = """
116 dn: """ + ou_dn + """
117 ou: """ + ou_dn.split(",")[0][3:] + """
118 objectClass: organizationalUnit
119 url: www.example.com
120 """
121         if desc:
122             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
123             if isinstance(desc, str):
124                 ldif += "nTSecurityDescriptor: %s" % desc
125             elif isinstance(desc, security.descriptor):
126                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
127         _ldb.add_ldif(ldif)
128
129     def create_active_user(self, _ldb, user_dn):
130         ldif = """
131 dn: """ + user_dn + """
132 sAMAccountName: """ + user_dn.split(",")[0][3:] + """
133 objectClass: user
134 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
135 url: www.example.com
136 """
137         _ldb.add_ldif(ldif)
138
139     def create_test_user(self, _ldb, user_dn, desc=None):
140         ldif = """
141 dn: """ + user_dn + """
142 sAMAccountName: """ + user_dn.split(",")[0][3:] + """
143 objectClass: user
144 userPassword: """ + self.user_pass + """
145 url: www.example.com
146 """
147         if desc:
148             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
149             if isinstance(desc, str):
150                 ldif += "nTSecurityDescriptor: %s" % desc
151             elif isinstance(desc, security.descriptor):
152                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
153         _ldb.add_ldif(ldif)
154
155     def create_group(self, _ldb, group_dn, desc=None):
156         ldif = """
157 dn: """ + group_dn + """
158 objectClass: group
159 sAMAccountName: """ + group_dn.split(",")[0][3:] + """
160 groupType: 4
161 url: www.example.com
162 """
163         if desc:
164             assert(isinstance(desc, str) or isinstance(desc, security.descriptor))
165             if isinstance(desc, str):
166                 ldif += "nTSecurityDescriptor: %s" % desc
167             elif isinstance(desc, security.descriptor):
168                 ldif += "nTSecurityDescriptor:: %s" % base64.b64encode(ndr_pack(desc))
169         _ldb.add_ldif(ldif)
170
171     def read_desc(self, object_dn):
172         res = self.ldb_admin.search(object_dn, SCOPE_BASE, None, ["nTSecurityDescriptor"])
173         desc = res[0]["nTSecurityDescriptor"][0]
174         return ndr_unpack(security.descriptor, desc)
175
176     def get_ldb_connection(self, target_username, target_password):
177         creds_tmp = Credentials()
178         creds_tmp.set_username(target_username)
179         creds_tmp.set_password(target_password)
180         creds_tmp.set_domain(creds.get_domain())
181         creds_tmp.set_realm(creds.get_realm())
182         creds_tmp.set_workstation(creds.get_workstation())
183         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
184                                       | gensec.FEATURE_SEAL)
185         ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
186         return ldb_target
187
188     def get_object_sid(self, object_dn):
189         res = self.ldb_admin.search(object_dn)
190         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
191
192     def dacl_add_ace(self, object_dn, ace):
193         desc = self.read_desc(object_dn)
194         desc_sddl = desc.as_sddl(self.domain_sid)
195         if ace in desc_sddl:
196             return
197         if desc_sddl.find("(") >= 0:
198             desc_sddl = desc_sddl[:desc_sddl.index("(")] + ace + desc_sddl[desc_sddl.index("("):]
199         else:
200             desc_sddl = desc_sddl + ace
201         self.modify_desc(object_dn, desc_sddl)
202
203     def get_desc_sddl(self, object_dn):
204         """ Return object nTSecutiryDescriptor in SDDL format
205         """
206         desc = self.read_desc(object_dn)
207         return desc.as_sddl(self.domain_sid)
208
209     # Test if we have any additional groups for users than default ones
210     def assert_user_no_group_member(self, username):
211         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
212         try:
213             self.assertEqual(res[0]["memberOf"][0], "")
214         except KeyError:
215             pass
216         else:
217             self.fail()
218     
219     def create_enable_user(self, username):
220         self.create_active_user(self.ldb_admin, self.get_user_dn(username))
221         self.ldb_admin.enable_account("(sAMAccountName=" + username + ")")
222
223 #tests on ldap add operations
224 class AclAddTests(AclTests):
225
226     def setUp(self):
227         super(AclAddTests, self).setUp()
228         # Domain admin that will be creator of OU parent-child structure
229         self.usr_admin_owner = "acl_add_user1"
230         # Second domain admin that will not be creator of OU parent-child structure
231         self.usr_admin_not_owner = "acl_add_user2"
232         # Regular user
233         self.regular_user = "acl_add_user3"
234         self.create_enable_user(self.usr_admin_owner)
235         self.create_enable_user(self.usr_admin_not_owner)
236         self.create_enable_user(self.regular_user)
237
238         # add admins to the Domain Admins group
239         self.add_group_member(self.ldb_admin, "CN=Domain Admins,CN=Users," + self.base_dn, \
240                 self.get_user_dn(self.usr_admin_owner))
241         self.add_group_member(self.ldb_admin, "CN=Domain Admins,CN=Users," + self.base_dn, \
242                 self.get_user_dn(self.usr_admin_not_owner))
243
244         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
245         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
246         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
247
248     def tearDown(self):
249         super(AclAddTests, self).tearDown()
250         self.delete_force(self.ldb_admin, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
251         self.delete_force(self.ldb_admin, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
252         self.delete_force(self.ldb_admin, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
253         self.delete_force(self.ldb_admin, "OU=test_add_ou1," + self.base_dn)
254         self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
255         self.delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
256         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
257
258     # Make sure top OU is deleted (and so everything under it)
259     def assert_top_ou_deleted(self):
260         res = self.ldb_admin.search(self.base_dn,
261             expression="(distinguishedName=%s,%s)" % (
262                 "OU=test_add_ou1", self.base_dn))
263         self.assertEqual(res, [])
264
265     def test_add_u1(self):
266         """Testing OU with the rights of Doman Admin not creator of the OU """
267         self.assert_top_ou_deleted()
268         # Change descriptor for top level OU
269         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
270         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
271         user_sid = self.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
272         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
273         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
274         # Test user and group creation with another domain admin's credentials
275         self.create_test_user(self.ldb_notowner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
276         self.create_group(self.ldb_notowner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
277         # Make sure we HAVE created the two objects -- user and group
278         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
279         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
280         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
281         self.assertTrue(len(res) > 0)
282         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
283         self.assertTrue(len(res) > 0)
284
285     def test_add_u2(self):
286         """Testing OU with the regular user that has no rights granted over the OU """
287         self.assert_top_ou_deleted()
288         # Create a parent-child OU structure with domain admin credentials
289         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
290         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
291         # Test user and group creation with regular user credentials
292         try:
293             self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
294             self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
295         except LdbError, (num, _):
296             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
297         else:
298             self.fail()
299         # Make sure we HAVEN'T created any of two objects -- user or group
300         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
301         self.assertEqual(res, [])
302         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
303         self.assertEqual(res, [])
304
305     def test_add_u3(self):
306         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
307         self.assert_top_ou_deleted()
308         # Change descriptor for top level OU
309         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
310         user_sid = self.get_object_sid(self.get_user_dn(self.regular_user))
311         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
312         self.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
313         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
314         # Test user and group creation with granted user only to one of the objects
315         self.create_test_user(self.ldb_user, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
316         try:
317             self.create_group(self.ldb_user, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
318         except LdbError, (num, _):
319             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
320         else:
321             self.fail()
322         # Make sure we HAVE created the one of two objects -- user
323         res = self.ldb_admin.search(self.base_dn,
324                 expression="(distinguishedName=%s,%s)" %
325                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
326                     self.base_dn))
327         self.assertNotEqual(len(res), 0)
328         res = self.ldb_admin.search(self.base_dn,
329                 expression="(distinguishedName=%s,%s)" %
330                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
331                     self.base_dn) )
332         self.assertEqual(res, [])
333
334     def test_add_u4(self):
335         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
336         self.assert_top_ou_deleted()
337         self.create_ou(self.ldb_owner, "OU=test_add_ou1," + self.base_dn)
338         self.create_ou(self.ldb_owner, "OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
339         self.create_test_user(self.ldb_owner, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
340         self.create_group(self.ldb_owner, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
341         # Make sure we have successfully created the two objects -- user and group
342         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
343         self.assertTrue(len(res) > 0)
344         res = self.ldb_admin.search(self.base_dn,
345                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
346         self.assertTrue(len(res) > 0)
347
348 #tests on ldap modify operations
349 class AclModifyTests(AclTests):
350
351     def setUp(self):
352         super(AclModifyTests, self).setUp()
353         self.user_with_wp = "acl_mod_user1"
354         self.user_with_sm = "acl_mod_user2"
355         self.user_with_group_sm = "acl_mod_user3"
356         self.create_enable_user(self.user_with_wp)
357         self.create_enable_user(self.user_with_sm)
358         self.create_enable_user(self.user_with_group_sm)
359         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
360         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
361         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
362         self.user_sid = self.get_object_sid( self.get_user_dn(self.user_with_wp))
363         self.create_group(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
364         self.create_group(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
365         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user2"))
366
367     def tearDown(self):
368         super(AclModifyTests, self).tearDown()
369         self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
370         self.delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
371         self.delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
372         self.delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
373         self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
374         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
375         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
376         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
377         self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
378
379     def test_modify_u1(self):
380         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
381         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
382         # First test object -- User
383         print "Testing modify on User object"
384         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
385         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
386         ldif = """
387 dn: """ + self.get_user_dn("test_modify_user1") + """
388 changetype: modify
389 replace: displayName
390 displayName: test_changed"""
391         self.ldb_user.modify_ldif(ldif)
392         res = self.ldb_admin.search(self.base_dn,
393                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
394         self.assertEqual(res[0]["displayName"][0], "test_changed")
395         # Second test object -- Group
396         print "Testing modify on Group object"
397         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
398         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
399         ldif = """
400 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
401 changetype: modify
402 replace: displayName
403 displayName: test_changed"""
404         self.ldb_user.modify_ldif(ldif)
405         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
406         self.assertEqual(res[0]["displayName"][0], "test_changed")
407         # Third test object -- Organizational Unit
408         print "Testing modify on OU object"
409         #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
410         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
411         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
412         ldif = """
413 dn: OU=test_modify_ou1,""" + self.base_dn + """
414 changetype: modify
415 replace: displayName
416 displayName: test_changed"""
417         self.ldb_user.modify_ldif(ldif)
418         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
419         self.assertEqual(res[0]["displayName"][0], "test_changed")
420
421     def test_modify_u2(self):
422         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
423         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
424         # First test object -- User
425         print "Testing modify on User object"
426         #self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
427         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
428         self.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
429         # Modify on attribute you have rights for
430         ldif = """
431 dn: """ + self.get_user_dn("test_modify_user1") + """
432 changetype: modify
433 replace: displayName
434 displayName: test_changed"""
435         self.ldb_user.modify_ldif(ldif)
436         res = self.ldb_admin.search(self.base_dn,
437                 expression="(distinguishedName=%s)" %
438                 self.get_user_dn("test_modify_user1"))
439         self.assertEqual(res[0]["displayName"][0], "test_changed")
440         # Modify on attribute you do not have rights for granted
441         ldif = """
442 dn: """ + self.get_user_dn("test_modify_user1") + """
443 changetype: modify
444 replace: url
445 url: www.samba.org"""
446         try:
447             self.ldb_user.modify_ldif(ldif)
448         except LdbError, (num, _):
449             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
450         else:
451             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
452             self.fail()
453         # Second test object -- Group
454         print "Testing modify on Group object"
455         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
456         self.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
457         ldif = """
458 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
459 changetype: modify
460 replace: displayName
461 displayName: test_changed"""
462         self.ldb_user.modify_ldif(ldif)
463         res = self.ldb_admin.search(self.base_dn,
464                 expression="(distinguishedName=%s)" %
465                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
466         self.assertEqual(res[0]["displayName"][0], "test_changed")
467         # Modify on attribute you do not have rights for granted
468         ldif = """
469 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
470 changetype: modify
471 replace: url
472 url: www.samba.org"""
473         try:
474             self.ldb_user.modify_ldif(ldif)
475         except LdbError, (num, _):
476             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
477         else:
478             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
479             self.fail()
480         # Second test object -- Organizational Unit
481         print "Testing modify on OU object"
482         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
483         self.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
484         ldif = """
485 dn: OU=test_modify_ou1,""" + self.base_dn + """
486 changetype: modify
487 replace: displayName
488 displayName: test_changed"""
489         self.ldb_user.modify_ldif(ldif)
490         res = self.ldb_admin.search(self.base_dn,
491                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
492                     + self.base_dn))
493         self.assertEqual(res[0]["displayName"][0], "test_changed")
494         # Modify on attribute you do not have rights for granted
495         ldif = """
496 dn: OU=test_modify_ou1,""" + self.base_dn + """
497 changetype: modify
498 replace: url
499 url: www.samba.org"""
500         try:
501             self.ldb_user.modify_ldif(ldif)
502         except LdbError, (num, _):
503             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
504         else:
505             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
506             self.fail()
507
508     def test_modify_u3(self):
509         """7 Modify one attribute as you have no what so ever rights granted"""
510         # First test object -- User
511         print "Testing modify on User object"
512         self.create_test_user(self.ldb_admin, self.get_user_dn("test_modify_user1"))
513         # Modify on attribute you do not have rights for granted
514         ldif = """
515 dn: """ + self.get_user_dn("test_modify_user1") + """
516 changetype: modify
517 replace: url
518 url: www.samba.org"""
519         try:
520             self.ldb_user.modify_ldif(ldif)
521         except LdbError, (num, _):
522             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
523         else:
524             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
525             self.fail()
526
527         # Second test object -- Group
528         print "Testing modify on Group object"
529         self.create_group(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
530         # Modify on attribute you do not have rights for granted
531         ldif = """
532 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
533 changetype: modify
534 replace: url
535 url: www.samba.org"""
536         try:
537             self.ldb_user.modify_ldif(ldif)
538         except LdbError, (num, _):
539             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
540         else:
541             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
542             self.fail()
543
544         # Second test object -- Organizational Unit
545         print "Testing modify on OU object"
546         #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
547         self.create_ou(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
548         # Modify on attribute you do not have rights for granted
549         ldif = """
550 dn: OU=test_modify_ou1,""" + self.base_dn + """
551 changetype: modify
552 replace: url
553 url: www.samba.org"""
554         try:
555             self.ldb_user.modify_ldif(ldif)
556         except LdbError, (num, _):
557             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
558         else:
559             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
560             self.fail()
561
562
563     def test_modify_u4(self):
564         """11 Grant WP to PRINCIPAL_SELF and test modify"""
565         ldif = """
566 dn: """ + self.get_user_dn(self.user_with_wp) + """
567 changetype: modify
568 add: adminDescription
569 adminDescription: blah blah blah"""
570         try:
571             self.ldb_user.modify_ldif(ldif)
572         except LdbError, (num, _):
573             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
574         else:
575             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
576             self.fail()
577
578         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
579         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
580         # Modify on attribute you have rights for
581         self.ldb_user.modify_ldif(ldif)
582         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
583                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
584         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
585
586     def test_modify_u5(self):
587         """12 test self membership"""
588         ldif = """
589 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
590 changetype: modify
591 add: Member
592 Member: """ +  self.get_user_dn(self.user_with_sm)
593 #the user has no rights granted, this should fail
594         try:
595             self.ldb_user2.modify_ldif(ldif)
596         except LdbError, (num, _):
597             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
598         else:
599             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
600             self.fail()
601
602 #grant self-membership, should be able to add himself
603         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
604         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
605         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
606         self.ldb_user2.modify_ldif(ldif)
607         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
608                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
609         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
610 #but not other users
611         ldif = """
612 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
613 changetype: modify
614 add: Member
615 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
616         try:
617             self.ldb_user2.modify_ldif(ldif)
618         except LdbError, (num, _):
619             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
620         else:
621             self.fail()
622
623     def test_modify_u6(self):
624         """13 test self membership"""
625         ldif = """
626 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
627 changetype: modify
628 add: Member
629 Member: """ +  self.get_user_dn(self.user_with_sm) + """
630 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
631
632 #grant self-membership, should be able to add himself  but not others at the same time
633         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_sm))
634         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
635         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
636         try:
637             self.ldb_user2.modify_ldif(ldif)
638         except LdbError, (num, _):
639             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
640         else:
641             self.fail()
642
643     def test_modify_u7(self):
644         """13 User with WP modifying Member"""
645 #a second user is given write property permission
646         user_sid = self.get_object_sid(self.get_user_dn(self.user_with_wp))
647         mod = "(A;;WP;;;%s)" % str(user_sid)
648         self.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
649         ldif = """
650 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
651 changetype: modify
652 add: Member
653 Member: """ +  self.get_user_dn(self.user_with_wp)
654         self.ldb_user.modify_ldif(ldif)
655         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
656                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
657         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
658         ldif = """
659 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
660 changetype: modify
661 delete: Member"""
662         self.ldb_user.modify_ldif(ldif)
663         ldif = """
664 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
665 changetype: modify
666 add: Member
667 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
668         self.ldb_user.modify_ldif(ldif)
669         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
670                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
671         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
672
673 #enable these when we have search implemented
674 class AclSearchTests(AclTests):
675
676     def setUp(self):
677         super(AclTests, self).setUp()
678         self.regular_user = "acl_search_user1"
679         self.create_enable_user(self.regular_user)
680         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
681
682     def tearDown(self):
683         super(AclSearchTests, self).tearDown()
684         self.delete_force(self.ldb_admin, "CN=test_search_user1,OU=test_search_ou1," + self.base_dn)
685         self.delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
686         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
687
688     def test_search_u1(self):
689         """See if can prohibit user to read another User object"""
690         ou_dn = "OU=test_search_ou1," + self.base_dn
691         user_dn = "CN=test_search_user1," + ou_dn
692         # Create clean OU
693         self.delete_force(self.ldb_admin, ou_dn)
694         self.create_ou(self.ldb_admin, ou_dn)
695         desc = self.read_desc(ou_dn)
696         desc_sddl = desc.as_sddl(self.domain_sid)
697         # Parse descriptor's SDDL and remove all inherited ACEs reffering
698         # to 'Registered Users' or 'Authenticated Users'
699         desc_aces = re.findall("\(.*?\)", desc_sddl)
700         for ace in desc_aces:
701             if ("I" in ace) and (("RU" in ace) or ("AU" in ace)):
702                 desc_sddl = desc_sddl.replace(ace, "")
703         # Add 'P' in the DACL so it breaks further inheritance
704         desc_sddl = desc_sddl.replace("D:AI(", "D:PAI(")
705         # Create a security descriptor object and OU with that descriptor
706         desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
707         self.delete_force(self.ldb_admin, ou_dn)
708         self.create_ou(self.ldb_admin, ou_dn, desc)
709         # Create clean user
710         self.delete_force(self.ldb_admin, user_dn)
711         self.create_test_user(self.ldb_admin, user_dn)
712         desc = self.read_desc(user_dn)
713         desc_sddl = desc.as_sddl(self.domain_sid)
714         # Parse security descriptor SDDL and remove all 'Read' ACEs
715         # reffering to AU
716         desc_aces = re.findall("\(.*?\)", desc_sddl)
717         for ace in desc_aces:
718             if ("AU" in ace) and ("R" in ace):
719                 desc_sddl = desc_sddl.replace(ace, "")
720         # Create user with the edited descriptor
721         desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
722         self.delete_force(self.ldb_admin, user_dn)
723         self.create_test_user(self.ldb_admin, user_dn, desc)
724
725         res = self.ldb_user.search(self.base_dn,
726                 expression="(distinguishedName=%s)" % user_dn)
727         self.assertEqual(res, [])
728
729     def test_search_u2(self):
730         """User's group ACEs cleared and after that granted RIGHT_DS_READ_PROPERTY to another User object"""
731         ou_dn = "OU=test_search_ou1," + self.base_dn
732         user_dn = "CN=test_search_user1," + ou_dn
733         # Create clean OU
734         self.delete_force(self.ldb_admin, ou_dn)
735         self.create_ou(self.ldb_admin, ou_dn)
736         desc = self.read_desc(ou_dn)
737         desc_sddl = desc.as_sddl(self.domain_sid)
738         # Parse descriptor's SDDL and remove all inherited ACEs reffering
739         # to 'Registered Users' or 'Authenticated Users'
740         desc_aces = re.findall("\(.*?\)", desc_sddl)
741         for ace in desc_aces:
742             if ("I" in ace) and (("RU" in ace) or ("AU" in ace)):
743                 desc_sddl = desc_sddl.replace(ace, "")
744         # Add 'P' in the DACL so it breaks further inheritance
745         desc_sddl = desc_sddl.replace("D:AI(", "D:PAI(")
746         # Create a security descriptor object and OU with that descriptor
747         desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
748         self.delete_force(self.ldb_admin, ou_dn)
749         self.create_ou(self.ldb_admin, ou_dn, desc)
750         # Create clean user
751         self.delete_force(self.ldb_admin, user_dn)
752         self.create_test_user(self.ldb_admin, user_dn)
753         # Parse security descriptor SDDL and remove all 'Read' ACEs
754         # reffering to AU
755         desc_aces = re.findall("\(.*?\)", desc_sddl)
756         for ace in desc_aces:
757             if ("AU" in ace) and ("R" in ace):
758                 desc_sddl = desc_sddl.replace(ace, "")
759         #mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;AU)"
760         mod = "(A;;RP;;;AU)"
761         self.dacl_add_ace(user_dn, mod)
762         res = self.ldb_user.search(self.base_dn,
763                 expression="(distinguishedName=%s)" % user_dn)
764         self.assertNotEqual(res, [])
765
766 #tests on ldap delete operations
767 class AclDeleteTests(AclTests):
768
769     def setUp(self):
770         super(AclDeleteTests, self).setUp()
771         self.regular_user = "acl_delete_user1"
772             # Create regular user
773         self.create_enable_user(self.regular_user)
774         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
775
776     def tearDown(self):
777         super(AclDeleteTests, self).tearDown()
778         self.delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
779         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
780
781     def test_delete_u1(self):
782         """User is prohibited by default to delete another User object"""
783         # Create user that we try to delete
784         self.create_test_user(self.ldb_admin, self.get_user_dn("test_delete_user1"))
785         # Here delete User object should ALWAYS through exception
786         try:
787             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
788         except LdbError, (num, _):
789             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
790         else:
791             self.fail()
792
793     def test_delete_u2(self):
794         """User's group has RIGHT_DELETE to another User object"""
795         user_dn = self.get_user_dn("test_delete_user1")
796         # Create user that we try to delete
797         self.create_test_user(self.ldb_admin, user_dn)
798         mod = "(A;;SD;;;AU)"
799         self.dacl_add_ace(user_dn, mod)
800         # Try to delete User object
801         self.ldb_user.delete(user_dn)
802         res = self.ldb_admin.search(self.base_dn,
803                 expression="(distinguishedName=%s)" % user_dn)
804         self.assertEqual(res, [])
805
806     def test_delete_u3(self):
807         """User indentified by SID has RIGHT_DELETE to another User object"""
808         user_dn = self.get_user_dn("test_delete_user1")
809         # Create user that we try to delete
810         self.create_test_user(self.ldb_admin, user_dn)
811         mod = "(A;;SD;;;%s)" % self.get_object_sid(self.get_user_dn(self.regular_user))
812         self.dacl_add_ace(user_dn, mod)
813         # Try to delete User object
814         self.ldb_user.delete(user_dn)
815         res = self.ldb_admin.search(self.base_dn,
816                 expression="(distinguishedName=%s)" % user_dn)
817         self.assertEqual(res, [])
818
819 #tests on ldap rename operations
820 class AclRenameTests(AclTests):
821
822     def setUp(self):
823         super(AclRenameTests, self).setUp()
824         self.regular_user = "acl_rename_user1"
825
826         # Create regular user
827         self.create_enable_user(self.regular_user)
828         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
829
830     def tearDown(self):
831         super(AclRenameTests, self).tearDown()
832         # Rename OU3
833         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
834         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
835         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
836         self.delete_force(self.ldb_admin, "OU=test_rename_ou3,OU=test_rename_ou2," + self.base_dn)
837         # Rename OU2
838         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou2," + self.base_dn)
839         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou2," + self.base_dn)
840         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou2," + self.base_dn)
841         self.delete_force(self.ldb_admin, "OU=test_rename_ou2," + self.base_dn)
842         # Rename OU1
843         self.delete_force(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn)
844         self.delete_force(self.ldb_admin, "CN=test_rename_user2,OU=test_rename_ou1," + self.base_dn)
845         self.delete_force(self.ldb_admin, "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn)
846         self.delete_force(self.ldb_admin, "OU=test_rename_ou3,OU=test_rename_ou1," + self.base_dn)
847         self.delete_force(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn)
848         self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
849
850     def test_rename_u1(self):
851         """Regular user fails to rename 'User object' within single OU"""
852         # Create OU structure
853         self.create_ou(self.ldb_admin, "OU=test_rename_ou1," + self.base_dn)
854         self.create_test_user(self.ldb_admin, "CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn)
855         try:
856             self.ldb_user.rename("CN=test_rename_user1,OU=test_rename_ou1," + self.base_dn, \
857                     "CN=test_rename_user5,OU=test_rename_ou1," + self.base_dn)
858         except LdbError, (num, _):
859             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
860         else:
861             self.fail()
862
863     def test_rename_u2(self):
864         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
865         ou_dn = "OU=test_rename_ou1," + self.base_dn
866         user_dn = "CN=test_rename_user1," + ou_dn
867         rename_user_dn = "CN=test_rename_user5," + ou_dn
868         # Create OU structure
869         self.create_ou(self.ldb_admin, ou_dn)
870         self.create_test_user(self.ldb_admin, user_dn)
871         mod = "(A;;WP;;;AU)"
872         self.dacl_add_ace(user_dn, mod)
873         # Rename 'User object' having WP to AU
874         self.ldb_user.rename(user_dn, rename_user_dn)
875         res = self.ldb_admin.search(self.base_dn,
876                 expression="(distinguishedName=%s)" % user_dn)
877         self.assertEqual(res, [])
878         res = self.ldb_admin.search(self.base_dn,
879                 expression="(distinguishedName=%s)" % rename_user_dn)
880         self.assertNotEqual(res, [])
881
882     def test_rename_u3(self):
883         """Test rename with rights granted to 'User object' SID"""
884         ou_dn = "OU=test_rename_ou1," + self.base_dn
885         user_dn = "CN=test_rename_user1," + ou_dn
886         rename_user_dn = "CN=test_rename_user5," + ou_dn
887         # Create OU structure
888         self.create_ou(self.ldb_admin, ou_dn)
889         self.create_test_user(self.ldb_admin, user_dn)
890         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
891         mod = "(A;;WP;;;%s)" % str(sid)
892         self.dacl_add_ace(user_dn, mod)
893         # Rename 'User object' having WP to AU
894         self.ldb_user.rename(user_dn, rename_user_dn)
895         res = self.ldb_admin.search(self.base_dn,
896                 expression="(distinguishedName=%s)" % user_dn)
897         self.assertEqual(res, [])
898         res = self.ldb_admin.search(self.base_dn,
899                 expression="(distinguishedName=%s)" % rename_user_dn)
900         self.assertNotEqual(res, [])
901
902     def test_rename_u4(self):
903         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
904         ou1_dn = "OU=test_rename_ou1," + self.base_dn
905         ou2_dn = "OU=test_rename_ou2," + self.base_dn
906         user_dn = "CN=test_rename_user2," + ou1_dn
907         rename_user_dn = "CN=test_rename_user5," + ou2_dn
908         # Create OU structure
909         self.create_ou(self.ldb_admin, ou1_dn)
910         self.create_ou(self.ldb_admin, ou2_dn)
911         self.create_test_user(self.ldb_admin, user_dn)
912         mod = "(A;;WPSD;;;AU)"
913         self.dacl_add_ace(user_dn, mod)
914         mod = "(A;;CC;;;AU)"
915         self.dacl_add_ace(ou2_dn, mod)
916         # Rename 'User object' having SD and CC to AU
917         self.ldb_user.rename(user_dn, rename_user_dn)
918         res = self.ldb_admin.search(self.base_dn,
919                 expression="(distinguishedName=%s)" % user_dn)
920         self.assertEqual(res, [])
921         res = self.ldb_admin.search(self.base_dn,
922                 expression="(distinguishedName=%s)" % rename_user_dn)
923         self.assertNotEqual(res, [])
924
925     def test_rename_u5(self):
926         """Test rename with rights granted to 'User object' SID"""
927         ou1_dn = "OU=test_rename_ou1," + self.base_dn
928         ou2_dn = "OU=test_rename_ou2," + self.base_dn
929         user_dn = "CN=test_rename_user2," + ou1_dn
930         rename_user_dn = "CN=test_rename_user5," + ou2_dn
931         # Create OU structure
932         self.create_ou(self.ldb_admin, ou1_dn)
933         self.create_ou(self.ldb_admin, ou2_dn)
934         self.create_test_user(self.ldb_admin, user_dn)
935         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
936         mod = "(A;;WPSD;;;%s)" % str(sid)
937         self.dacl_add_ace(user_dn, mod)
938         mod = "(A;;CC;;;%s)" % str(sid)
939         self.dacl_add_ace(ou2_dn, mod)
940         # Rename 'User object' having SD and CC to AU
941         self.ldb_user.rename(user_dn, rename_user_dn)
942         res = self.ldb_admin.search(self.base_dn,
943                 expression="(distinguishedName=%s)" % user_dn)
944         self.assertEqual(res, [])
945         res = self.ldb_admin.search(self.base_dn,
946                 expression="(distinguishedName=%s)" % rename_user_dn)
947         self.assertNotEqual(res, [])
948
949     def test_rename_u6(self):
950         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
951         ou1_dn = "OU=test_rename_ou1," + self.base_dn
952         ou2_dn = "OU=test_rename_ou2," + self.base_dn
953         user_dn = "CN=test_rename_user2," + ou1_dn
954         rename_user_dn = "CN=test_rename_user2," + ou2_dn
955         # Create OU structure
956         self.create_ou(self.ldb_admin, ou1_dn)
957         self.create_ou(self.ldb_admin, ou2_dn)
958         #mod = "(A;CI;DCWP;;;AU)"
959         mod = "(A;;DC;;;AU)"
960         self.dacl_add_ace(ou1_dn, mod)
961         mod = "(A;;CC;;;AU)"
962         self.dacl_add_ace(ou2_dn, mod)
963         self.create_test_user(self.ldb_admin, user_dn)
964         mod = "(A;;WP;;;AU)"
965         self.dacl_add_ace(user_dn, mod)
966         # Rename 'User object' having SD and CC to AU
967         self.ldb_user.rename(user_dn, rename_user_dn)
968         res = self.ldb_admin.search(self.base_dn,
969                 expression="(distinguishedName=%s)" % user_dn)
970         self.assertEqual(res, [])
971         res = self.ldb_admin.search(self.base_dn,
972                 expression="(distinguishedName=%s)" % rename_user_dn)
973         self.assertNotEqual(res, [])
974
975     def test_rename_u7(self):
976         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
977         ou1_dn = "OU=test_rename_ou1," + self.base_dn
978         ou2_dn = "OU=test_rename_ou2," + self.base_dn
979         ou3_dn = "OU=test_rename_ou3," + ou2_dn
980         user_dn = "CN=test_rename_user2," + ou1_dn
981         rename_user_dn = "CN=test_rename_user5," + ou3_dn
982         # Create OU structure
983         self.create_ou(self.ldb_admin, ou1_dn)
984         self.create_ou(self.ldb_admin, ou2_dn)
985         self.create_ou(self.ldb_admin, ou3_dn)
986         mod = "(A;CI;WPDC;;;AU)"
987         self.dacl_add_ace(ou1_dn, mod)
988         mod = "(A;;CC;;;AU)"
989         self.dacl_add_ace(ou3_dn, mod)
990         self.create_test_user(self.ldb_admin, user_dn)
991         # Rename 'User object' having SD and CC to AU
992         self.ldb_user.rename(user_dn, rename_user_dn)
993         res = self.ldb_admin.search(self.base_dn,
994                 expression="(distinguishedName=%s)" % user_dn)
995         self.assertEqual(res, [])
996         res = self.ldb_admin.search(self.base_dn,
997                 expression="(distinguishedName=%s)" % rename_user_dn)
998         self.assertNotEqual(res, [])
999
1000     def test_rename_u8(self):
1001         """Test rename on an object with and without modify access on the RDN attribute"""
1002         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1003         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1004         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1005         # Create OU structure
1006         self.create_ou(self.ldb_admin, ou1_dn)
1007         self.create_ou(self.ldb_admin, ou2_dn)
1008         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1009         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1010         self.dacl_add_ace(ou2_dn, mod)
1011         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1012         self.dacl_add_ace(ou2_dn, mod)
1013         try:
1014             self.ldb_user.rename(ou2_dn, ou3_dn)
1015         except LdbError, (num, _):
1016             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1017         else:
1018             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1019             self.fail()
1020         sid = self.get_object_sid(self.get_user_dn(self.regular_user))
1021         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1022         self.dacl_add_ace(ou2_dn, mod)
1023         self.ldb_user.rename(ou2_dn, ou3_dn)
1024         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1025         self.assertEqual(res, [])
1026         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1027         self.assertNotEqual(res, [])
1028
1029 #tests on Control Access Rights
1030 class AclCARTests(AclTests):
1031
1032     def setUp(self):
1033         super(AclCARTests, self).setUp()
1034         self.user_with_wp = "acl_car_user1"
1035         self.user_with_pc = "acl_car_user2"
1036         self.create_enable_user(self.user_with_wp)
1037         self.create_enable_user(self.user_with_pc)
1038         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1039         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1040
1041     def tearDown(self):
1042         super(AclCARTests, self).tearDown()
1043         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1044         self.delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1045
1046     def test_change_password1(self):
1047         """Try a password change operation without any CARs given"""
1048         #users have change password by default - remove for negative testing
1049         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1050         sddl = desc.as_sddl(self.domain_sid)
1051         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1052         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1053         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1054         try:
1055             self.ldb_user.modify_ldif("""
1056 dn: """ + self.get_user_dn(self.user_with_wp) + """
1057 changetype: modify
1058 delete: unicodePwd
1059 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1060 add: unicodePwd
1061 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1062 """)
1063         except LdbError, (num, _):
1064             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1065         else:
1066             # for some reason we get constraint violation instead of insufficient access error
1067             self.fail()
1068
1069     def test_change_password2(self):
1070         """Make sure WP has no influence"""
1071         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1072         sddl = desc.as_sddl(self.domain_sid)
1073         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1074         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1075         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1076         mod = "(A;;WP;;;PS)"
1077         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1078         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1079         sddl = desc.as_sddl(self.domain_sid)
1080         try:
1081             self.ldb_user.modify_ldif("""
1082 dn: """ + self.get_user_dn(self.user_with_wp) + """
1083 changetype: modify
1084 delete: unicodePwd
1085 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1086 add: unicodePwd
1087 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1088 """)
1089         except LdbError, (num, _):
1090             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1091         else:
1092             # for some reason we get constraint violation instead of insufficient access error
1093             self.fail()
1094
1095     def test_change_password3(self):
1096         """Make sure WP has no influence"""
1097         mod = "(D;;WP;;;PS)"
1098         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1099         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1100         sddl = desc.as_sddl(self.domain_sid)
1101         self.ldb_user.modify_ldif("""
1102 dn: """ + self.get_user_dn(self.user_with_wp) + """
1103 changetype: modify
1104 delete: unicodePwd
1105 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1106 add: unicodePwd
1107 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1108 """)
1109
1110     def test_change_password5(self):
1111         """Make sure rights have no influence on dBCSPwd"""
1112         desc = self.read_desc(self.get_user_dn(self.user_with_wp))
1113         sddl = desc.as_sddl(self.domain_sid)
1114         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1115         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1116         self.modify_desc(self.get_user_dn(self.user_with_wp), sddl)
1117         mod = "(D;;WP;;;PS)"
1118         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1119         try:
1120             self.ldb_user.modify_ldif("""
1121 dn: """ + self.get_user_dn(self.user_with_wp) + """
1122 changetype: modify
1123 delete: dBCSPwd
1124 dBCSPwd: XXXXXXXXXXXXXXXX
1125 add: dBCSPwd
1126 dBCSPwd: YYYYYYYYYYYYYYYY
1127 """)
1128         except LdbError, (num, _):
1129             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1130         else:
1131             self.fail()
1132
1133     def test_change_password6(self):
1134         """Test uneven delete/adds"""
1135         try:
1136             self.ldb_user.modify_ldif("""
1137 dn: """ + self.get_user_dn(self.user_with_wp) + """
1138 changetype: modify
1139 delete: userPassword
1140 userPassword: thatsAcomplPASS1
1141 delete: userPassword
1142 userPassword: thatsAcomplPASS1
1143 add: userPassword
1144 userPassword: thatsAcomplPASS2
1145 """)
1146         except LdbError, (num, _):
1147             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1148         else:
1149             self.fail()
1150         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1151         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1152         try:
1153             self.ldb_user.modify_ldif("""
1154 dn: """ + self.get_user_dn(self.user_with_wp) + """
1155 changetype: modify
1156 delete: userPassword
1157 userPassword: thatsAcomplPASS1
1158 delete: userPassword
1159 userPassword: thatsAcomplPASS1
1160 add: userPassword
1161 userPassword: thatsAcomplPASS2
1162 """)
1163         except LdbError, (num, _):
1164             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1165         else:
1166             self.fail()
1167
1168     def test_reset_password1(self):
1169         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1170         try:
1171             self.ldb_user.modify_ldif("""
1172 dn: """ + self.get_user_dn(self.user_with_wp) + """
1173 changetype: modify
1174 replace: unicodePwd
1175 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1176 """)
1177         except LdbError, (num, _):
1178             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1179         else:
1180             self.fail()
1181         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1182         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1183         self.ldb_user.modify_ldif("""
1184 dn: """ + self.get_user_dn(self.user_with_wp) + """
1185 changetype: modify
1186 replace: unicodePwd
1187 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1188 """)
1189
1190     def test_reset_password2(self):
1191         """Try a user password reset operation (userPassword) before and after granting CAR"""
1192         try:
1193             self.ldb_user.modify_ldif("""
1194 dn: """ + self.get_user_dn(self.user_with_wp) + """
1195 changetype: modify
1196 replace: userPassword
1197 userPassword: thatsAcomplPASS1
1198 """)
1199         except LdbError, (num, _):
1200             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1201         else:
1202             self.fail()
1203         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1204         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1205         self.ldb_user.modify_ldif("""
1206 dn: """ + self.get_user_dn(self.user_with_wp) + """
1207 changetype: modify
1208 replace: userPassword
1209 userPassword: thatsAcomplPASS1
1210 """)
1211
1212     def test_reset_password3(self):
1213         """Grant WP and see what happens (unicodePwd)"""
1214         mod = "(A;;WP;;;PS)"
1215         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1216         try:
1217             self.ldb_user.modify_ldif("""
1218 dn: """ + self.get_user_dn(self.user_with_wp) + """
1219 changetype: modify
1220 replace: unicodePwd
1221 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1222 """)
1223         except LdbError, (num, _):
1224             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1225         else:
1226             self.fail()
1227
1228     def test_reset_password4(self):
1229         """Grant WP and see what happens (userPassword)"""
1230         mod = "(A;;WP;;;PS)"
1231         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1232         try:
1233             self.ldb_user.modify_ldif("""
1234 dn: """ + self.get_user_dn(self.user_with_wp) + """
1235 changetype: modify
1236 replace: userPassword
1237 userPassword: thatsAcomplPASS1
1238 """)
1239         except LdbError, (num, _):
1240             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1241         else:
1242             self.fail()
1243
1244     def test_reset_password5(self):
1245         """Explicitly deny WP but grant CAR (unicodePwd)"""
1246         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1247         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1248         self.ldb_user.modify_ldif("""
1249 dn: """ + self.get_user_dn(self.user_with_wp) + """
1250 changetype: modify
1251 replace: unicodePwd
1252 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1253 """)
1254
1255     def test_reset_password6(self):
1256         """Explicitly deny WP but grant CAR (userPassword)"""
1257         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1258         self.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1259         self.ldb_user.modify_ldif("""
1260 dn: """ + self.get_user_dn(self.user_with_wp) + """
1261 changetype: modify
1262 replace: userPassword
1263 userPassword: thatsAcomplPASS1
1264 """)
1265
1266 # Important unit running information
1267
1268 if not "://" in host:
1269     host = "ldap://%s" % host
1270 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp)
1271
1272 # Gets back the configuration basedn
1273 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
1274                  attrs=["configurationNamingContext"])
1275 configuration_dn = res[0]["configurationNamingContext"][0]
1276
1277 # Gets back the cbasedn
1278 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
1279                  attrs=["defaultNamingContext"])
1280 base_dn = res[0]["defaultNamingContext"][0]
1281
1282 # Get the old "dSHeuristics" if it was set
1283 res = ldb.search("CN=Directory Service, CN=Windows NT, CN=Services, "
1284                  + configuration_dn, scope=SCOPE_BASE, attrs=["dSHeuristics"])
1285 if "dSHeuristics" in res[0]:
1286   dsheuristics = res[0]["dSHeuristics"][0]
1287 else:
1288   dsheuristics = None
1289
1290 # Set the "dSHeuristics" to have the tests run against Windows Server
1291 m = Message()
1292 m.dn = Dn(ldb, "CN=Directory Service, CN=Windows NT, CN=Services, "
1293   + configuration_dn)
1294 m["dSHeuristics"] = MessageElement("000000001", FLAG_MOD_REPLACE,
1295   "dSHeuristics")
1296 ldb.modify(m)
1297
1298 # Get the current minPwdAge
1299 res = ldb.search(base_dn, scope=SCOPE_BASE, attrs=["minPwdAge"])
1300 minPwdAge = res[0]["minPwdAge"][0]
1301
1302 #set minPwdAge to 0 so password tests can against Windows server
1303 m = Message()
1304 m.dn = Dn(ldb, base_dn)
1305 m["minPwdAge"] = MessageElement("0", FLAG_MOD_REPLACE, "minPwdAge")
1306 ldb.modify(m)
1307
1308 runner = SubunitTestRunner()
1309 rc = 0
1310 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1311     rc = 1
1312 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1313     rc = 1
1314 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1315     rc = 1
1316 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1317     rc = 1
1318 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1319     rc = 1
1320
1321 # Reset the "dSHeuristics" as they were before
1322 m = Message()
1323 m.dn = Dn(ldb, "CN=Directory Service, CN=Windows NT, CN=Services, "
1324   + configuration_dn)
1325 if dsheuristics is not None:
1326     m["dSHeuristics"] = MessageElement(dsheuristics, FLAG_MOD_REPLACE,
1327       "dSHeuristics")
1328 else:
1329     m["dSHeuristics"] = MessageElement([], FLAG_MOD_DELETE, "dsHeuristics")
1330 ldb.modify(m)
1331
1332 # Reset minPwdAge as before
1333 m = Message()
1334 m.dn = Dn(ldb, base_dn)
1335 m["minPwdAge"] = MessageElement(minPwdAge, FLAG_MOD_REPLACE, "minPwdAge")
1336 ldb.modify(m)
1337
1338 sys.exit(rc)