Fix various spelling errors
[ambi/samba-autobuild/.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15 from samba.join import dc_join
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
24 from samba.dcerpc import security, drsuapi, misc
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials, DONT_USE_KERBEROS
30 import samba.tests
31 from samba.tests import delete_force
32 import samba.dsdb
33
34 parser = optparse.OptionParser("acl.py [options] <host>")
35 sambaopts = options.SambaOptions(parser)
36 parser.add_option_group(sambaopts)
37 parser.add_option_group(options.VersionOptions(parser))
38
39 # use command line creds if available
40 credopts = options.CredentialsOptions(parser)
41 parser.add_option_group(credopts)
42 subunitopts = SubunitOptions(parser)
43 parser.add_option_group(subunitopts)
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52 if not "://" in host:
53     ldaphost = "ldap://%s" % host
54 else:
55     ldaphost = host
56     start = host.rindex("://")
57     host = host.lstrip(start+3)
58
59 lp = sambaopts.get_loadparm()
60 creds = credopts.get_credentials(lp)
61 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
62
63 #
64 # Tests start here
65 #
66
67 class AclTests(samba.tests.TestCase):
68
69     def setUp(self):
70         super(AclTests, self).setUp()
71         self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
72         self.base_dn = self.ldb_admin.domain_dn()
73         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
74         self.user_pass = "samba123@"
75         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
76         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
77         #used for anonymous login
78         self.creds_tmp = Credentials()
79         self.creds_tmp.set_username("")
80         self.creds_tmp.set_password("")
81         self.creds_tmp.set_domain(creds.get_domain())
82         self.creds_tmp.set_realm(creds.get_realm())
83         self.creds_tmp.set_workstation(creds.get_workstation())
84         print "baseDN: %s" % self.base_dn
85
86     def get_user_dn(self, name):
87         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
88
89     def get_ldb_connection(self, target_username, target_password):
90         creds_tmp = Credentials()
91         creds_tmp.set_username(target_username)
92         creds_tmp.set_password(target_password)
93         creds_tmp.set_domain(creds.get_domain())
94         creds_tmp.set_realm(creds.get_realm())
95         creds_tmp.set_workstation(creds.get_workstation())
96         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
97                                       | gensec.FEATURE_SEAL)
98         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
99         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
100         return ldb_target
101
102     # Test if we have any additional groups for users than default ones
103     def assert_user_no_group_member(self, username):
104         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
105         try:
106             self.assertEqual(res[0]["memberOf"][0], "")
107         except KeyError:
108             pass
109         else:
110             self.fail()
111
112 #tests on ldap add operations
113 class AclAddTests(AclTests):
114
115     def setUp(self):
116         super(AclAddTests, self).setUp()
117         # Domain admin that will be creator of OU parent-child structure
118         self.usr_admin_owner = "acl_add_user1"
119         # Second domain admin that will not be creator of OU parent-child structure
120         self.usr_admin_not_owner = "acl_add_user2"
121         # Regular user
122         self.regular_user = "acl_add_user3"
123         self.test_user1 = "test_add_user1"
124         self.test_group1 = "test_add_group1"
125         self.ou1 = "OU=test_add_ou1"
126         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
127         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
128         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
129         self.ldb_admin.newuser(self.regular_user, self.user_pass)
130
131         # add admins to the Domain Admins group
132         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
133                        add_members_operation=True)
134         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
135                        add_members_operation=True)
136
137         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
138         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
139         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
140
141     def tearDown(self):
142         super(AclAddTests, self).tearDown()
143         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
144                           (self.test_user1, self.ou2, self.base_dn))
145         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
146                           (self.test_group1, self.ou2, self.base_dn))
147         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
148         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
149         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
150         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
151         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
152         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
153
154     # Make sure top OU is deleted (and so everything under it)
155     def assert_top_ou_deleted(self):
156         res = self.ldb_admin.search(self.base_dn,
157             expression="(distinguishedName=%s,%s)" % (
158                 "OU=test_add_ou1", self.base_dn))
159         self.assertEqual(len(res), 0)
160
161     def test_add_u1(self):
162         """Testing OU with the rights of Doman Admin not creator of the OU """
163         self.assert_top_ou_deleted()
164         # Change descriptor for top level OU
165         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
166         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
167         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
168         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
169         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
170         # Test user and group creation with another domain admin's credentials
171         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
172         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
173                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
174         # Make sure we HAVE created the two objects -- user and group
175         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
176         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
177         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
178         self.assertTrue(len(res) > 0)
179         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
180         self.assertTrue(len(res) > 0)
181
182     def test_add_u2(self):
183         """Testing OU with the regular user that has no rights granted over the OU """
184         self.assert_top_ou_deleted()
185         # Create a parent-child OU structure with domain admin credentials
186         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
187         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
188         # Test user and group creation with regular user credentials
189         try:
190             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
191             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
192                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
193         except LdbError, (num, _):
194             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
195         else:
196             self.fail()
197         # Make sure we HAVEN'T created any of two objects -- user or group
198         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
199         self.assertEqual(len(res), 0)
200         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
201         self.assertEqual(len(res), 0)
202
203     def test_add_u3(self):
204         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
205         self.assert_top_ou_deleted()
206         # Change descriptor for top level OU
207         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
208         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
209         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
210         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
211         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
212         # Test user and group creation with granted user only to one of the objects
213         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
214         try:
215             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
216                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
217         except LdbError, (num, _):
218             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
219         else:
220             self.fail()
221         # Make sure we HAVE created the one of two objects -- user
222         res = self.ldb_admin.search(self.base_dn,
223                 expression="(distinguishedName=%s,%s)" %
224                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
225                     self.base_dn))
226         self.assertNotEqual(len(res), 0)
227         res = self.ldb_admin.search(self.base_dn,
228                 expression="(distinguishedName=%s,%s)" %
229                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
230                     self.base_dn) )
231         self.assertEqual(len(res), 0)
232
233     def test_add_u4(self):
234         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
235         self.assert_top_ou_deleted()
236         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
237         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
238         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
239         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
240                                  grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
241         # Make sure we have successfully created the two objects -- user and group
242         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
243         self.assertTrue(len(res) > 0)
244         res = self.ldb_admin.search(self.base_dn,
245                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
246         self.assertTrue(len(res) > 0)
247
248     def test_add_anonymous(self):
249         """Test add operation with anonymous user"""
250         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
251         try:
252             anonymous.newuser("test_add_anonymous", self.user_pass)
253         except LdbError, (num, _):
254             self.assertEquals(num, ERR_OPERATIONS_ERROR)
255         else:
256             self.fail()
257
258 #tests on ldap modify operations
259 class AclModifyTests(AclTests):
260
261     def setUp(self):
262         super(AclModifyTests, self).setUp()
263         self.user_with_wp = "acl_mod_user1"
264         self.user_with_sm = "acl_mod_user2"
265         self.user_with_group_sm = "acl_mod_user3"
266         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
267         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
268         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
269         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
270         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
271         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
272         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
273         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
274         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
275         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
276
277     def tearDown(self):
278         super(AclModifyTests, self).tearDown()
279         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
280         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
281         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
282         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
283         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
284         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
285         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
286         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
287         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
288         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
289
290     def test_modify_u1(self):
291         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
292         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
293         # First test object -- User
294         print "Testing modify on User object"
295         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
296         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
297         ldif = """
298 dn: """ + self.get_user_dn("test_modify_user1") + """
299 changetype: modify
300 replace: displayName
301 displayName: test_changed"""
302         self.ldb_user.modify_ldif(ldif)
303         res = self.ldb_admin.search(self.base_dn,
304                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
305         self.assertEqual(res[0]["displayName"][0], "test_changed")
306         # Second test object -- Group
307         print "Testing modify on Group object"
308         self.ldb_admin.newgroup("test_modify_group1",
309                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
310         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
311         ldif = """
312 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
313 changetype: modify
314 replace: displayName
315 displayName: test_changed"""
316         self.ldb_user.modify_ldif(ldif)
317         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
318         self.assertEqual(res[0]["displayName"][0], "test_changed")
319         # Third test object -- Organizational Unit
320         print "Testing modify on OU object"
321         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
322         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
323         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
324         ldif = """
325 dn: OU=test_modify_ou1,""" + self.base_dn + """
326 changetype: modify
327 replace: displayName
328 displayName: test_changed"""
329         self.ldb_user.modify_ldif(ldif)
330         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
331         self.assertEqual(res[0]["displayName"][0], "test_changed")
332
333     def test_modify_u2(self):
334         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
335         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
336         # First test object -- User
337         print "Testing modify on User object"
338         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
339         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
340         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
341         # Modify on attribute you have rights for
342         ldif = """
343 dn: """ + self.get_user_dn("test_modify_user1") + """
344 changetype: modify
345 replace: displayName
346 displayName: test_changed"""
347         self.ldb_user.modify_ldif(ldif)
348         res = self.ldb_admin.search(self.base_dn,
349                 expression="(distinguishedName=%s)" %
350                 self.get_user_dn("test_modify_user1"))
351         self.assertEqual(res[0]["displayName"][0], "test_changed")
352         # Modify on attribute you do not have rights for granted
353         ldif = """
354 dn: """ + self.get_user_dn("test_modify_user1") + """
355 changetype: modify
356 replace: url
357 url: www.samba.org"""
358         try:
359             self.ldb_user.modify_ldif(ldif)
360         except LdbError, (num, _):
361             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
362         else:
363             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
364             self.fail()
365         # Second test object -- Group
366         print "Testing modify on Group object"
367         self.ldb_admin.newgroup("test_modify_group1",
368                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
369         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
370         ldif = """
371 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
372 changetype: modify
373 replace: displayName
374 displayName: test_changed"""
375         self.ldb_user.modify_ldif(ldif)
376         res = self.ldb_admin.search(self.base_dn,
377                 expression="(distinguishedName=%s)" %
378                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
379         self.assertEqual(res[0]["displayName"][0], "test_changed")
380         # Modify on attribute you do not have rights for granted
381         ldif = """
382 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
383 changetype: modify
384 replace: url
385 url: www.samba.org"""
386         try:
387             self.ldb_user.modify_ldif(ldif)
388         except LdbError, (num, _):
389             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
390         else:
391             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
392             self.fail()
393         # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
394         ldif = """
395 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
396 changetype: modify
397 replace: url
398 url: www.samba.org
399 replace: displayName
400 displayName: test_changed"""
401         try:
402             self.ldb_user.modify_ldif(ldif)
403         except LdbError, (num, _):
404             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
405         else:
406             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
407             self.fail()
408         # Second test object -- Organizational Unit
409         print "Testing modify on OU object"
410         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
411         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
412         ldif = """
413 dn: OU=test_modify_ou1,""" + self.base_dn + """
414 changetype: modify
415 replace: displayName
416 displayName: test_changed"""
417         self.ldb_user.modify_ldif(ldif)
418         res = self.ldb_admin.search(self.base_dn,
419                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
420                     + self.base_dn))
421         self.assertEqual(res[0]["displayName"][0], "test_changed")
422         # Modify on attribute you do not have rights for granted
423         ldif = """
424 dn: OU=test_modify_ou1,""" + self.base_dn + """
425 changetype: modify
426 replace: url
427 url: www.samba.org"""
428         try:
429             self.ldb_user.modify_ldif(ldif)
430         except LdbError, (num, _):
431             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
432         else:
433             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
434             self.fail()
435
436     def test_modify_u3(self):
437         """7 Modify one attribute as you have no what so ever rights granted"""
438         # First test object -- User
439         print "Testing modify on User object"
440         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
441         # Modify on attribute you do not have rights for granted
442         ldif = """
443 dn: """ + self.get_user_dn("test_modify_user1") + """
444 changetype: modify
445 replace: url
446 url: www.samba.org"""
447         try:
448             self.ldb_user.modify_ldif(ldif)
449         except LdbError, (num, _):
450             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
451         else:
452             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
453             self.fail()
454
455         # Second test object -- Group
456         print "Testing modify on Group object"
457         self.ldb_admin.newgroup("test_modify_group1",
458                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
459         # Modify on attribute you do not have rights for granted
460         ldif = """
461 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
462 changetype: modify
463 replace: url
464 url: www.samba.org"""
465         try:
466             self.ldb_user.modify_ldif(ldif)
467         except LdbError, (num, _):
468             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
469         else:
470             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
471             self.fail()
472
473         # Second test object -- Organizational Unit
474         print "Testing modify on OU object"
475         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
476         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
477         # Modify on attribute you do not have rights for granted
478         ldif = """
479 dn: OU=test_modify_ou1,""" + self.base_dn + """
480 changetype: modify
481 replace: url
482 url: www.samba.org"""
483         try:
484             self.ldb_user.modify_ldif(ldif)
485         except LdbError, (num, _):
486             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
487         else:
488             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
489             self.fail()
490
491
492     def test_modify_u4(self):
493         """11 Grant WP to PRINCIPAL_SELF and test modify"""
494         ldif = """
495 dn: """ + self.get_user_dn(self.user_with_wp) + """
496 changetype: modify
497 add: adminDescription
498 adminDescription: blah blah blah"""
499         try:
500             self.ldb_user.modify_ldif(ldif)
501         except LdbError, (num, _):
502             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
503         else:
504             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
505             self.fail()
506
507         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
508         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
509         # Modify on attribute you have rights for
510         self.ldb_user.modify_ldif(ldif)
511         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
512                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
513         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
514
515     def test_modify_u5(self):
516         """12 test self membership"""
517         ldif = """
518 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
519 changetype: modify
520 add: Member
521 Member: """ +  self.get_user_dn(self.user_with_sm)
522 #the user has no rights granted, this should fail
523         try:
524             self.ldb_user2.modify_ldif(ldif)
525         except LdbError, (num, _):
526             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
527         else:
528             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
529             self.fail()
530
531 #grant self-membership, should be able to add himself
532         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
533         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
534         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
535         self.ldb_user2.modify_ldif(ldif)
536         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
537                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
538         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
539 #but not other users
540         ldif = """
541 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
542 changetype: modify
543 add: Member
544 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
545         try:
546             self.ldb_user2.modify_ldif(ldif)
547         except LdbError, (num, _):
548             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
549         else:
550             self.fail()
551
552     def test_modify_u6(self):
553         """13 test self membership"""
554         ldif = """
555 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
556 changetype: modify
557 add: Member
558 Member: """ +  self.get_user_dn(self.user_with_sm) + """
559 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
560
561 #grant self-membership, should be able to add himself  but not others at the same time
562         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
563         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
564         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
565         try:
566             self.ldb_user2.modify_ldif(ldif)
567         except LdbError, (num, _):
568             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
569         else:
570             self.fail()
571
572     def test_modify_u7(self):
573         """13 User with WP modifying Member"""
574 #a second user is given write property permission
575         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
576         mod = "(A;;WP;;;%s)" % str(user_sid)
577         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
578         ldif = """
579 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
580 changetype: modify
581 add: Member
582 Member: """ +  self.get_user_dn(self.user_with_wp)
583         self.ldb_user.modify_ldif(ldif)
584         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
585                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
586         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
587         ldif = """
588 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
589 changetype: modify
590 delete: Member"""
591         self.ldb_user.modify_ldif(ldif)
592         ldif = """
593 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
594 changetype: modify
595 add: Member
596 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
597         self.ldb_user.modify_ldif(ldif)
598         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
599                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
600         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
601
602     def test_modify_anonymous(self):
603         """Test add operation with anonymous user"""
604         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
605         self.ldb_admin.newuser("test_anonymous", "samba123@")
606         m = Message()
607         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
608
609         m["description"] = MessageElement("sambauser2",
610                                           FLAG_MOD_ADD,
611                                           "description")
612         try:
613             anonymous.modify(m)
614         except LdbError, (num, _):
615             self.assertEquals(num, ERR_OPERATIONS_ERROR)
616         else:
617             self.fail()
618
619 #enable these when we have search implemented
620 class AclSearchTests(AclTests):
621
622     def setUp(self):
623         super(AclSearchTests, self).setUp()
624         # Get the old "dSHeuristics" if it was set
625         dsheuristics = self.ldb_admin.get_dsheuristics()
626         # Reset the "dSHeuristics" as they were before
627         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
628         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
629         self.ldb_admin.set_dsheuristics("000000001")
630         # Get the old "minPwdAge"
631         minPwdAge = self.ldb_admin.get_minPwdAge()
632         # Reset the "minPwdAge" as it was before
633         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
634         # Set it temporarely to "0"
635         self.ldb_admin.set_minPwdAge("0")
636
637         self.u1 = "search_u1"
638         self.u2 = "search_u2"
639         self.u3 = "search_u3"
640         self.group1 = "group1"
641         self.ldb_admin.newuser(self.u1, self.user_pass)
642         self.ldb_admin.newuser(self.u2, self.user_pass)
643         self.ldb_admin.newuser(self.u3, self.user_pass)
644         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
645         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
646                                                 add_members_operation=True)
647         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
648         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
649         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
650         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
651                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
652                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
653                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
654                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
655                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
656         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
657         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
658
659     def create_clean_ou(self, object_dn):
660         """ Base repeating setup for unittests to follow """
661         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
662                 expression="distinguishedName=%s" % object_dn)
663         # Make sure top testing OU has been deleted before starting the test
664         self.assertEqual(len(res), 0)
665         self.ldb_admin.create_ou(object_dn)
666         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
667         # Make sure there are inheritable ACEs initially
668         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
669         # Find and remove all inherit ACEs
670         res = re.findall("\(.*?\)", desc_sddl)
671         res = [x for x in res if ("CI" in x) or ("OI" in x)]
672         for x in res:
673             desc_sddl = desc_sddl.replace(x, "")
674         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
675         # can propagate from above
676         # remove SACL, we are not interested
677         desc_sddl = desc_sddl.replace(":AI", ":AIP")
678         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
679         # Verify all inheritable ACEs are gone
680         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
681         self.assertFalse("CI" in desc_sddl)
682         self.assertFalse("OI" in desc_sddl)
683
684     def tearDown(self):
685         super(AclSearchTests, self).tearDown()
686         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
687         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
688         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
689         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
690         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
691         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
692         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
693         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
694         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
695         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
696         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
697         delete_force(self.ldb_admin, self.get_user_dn("group1"))
698
699     def test_search_anonymous1(self):
700         """Verify access of rootDSE with the correct request"""
701         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
702         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
703         self.assertEquals(len(res), 1)
704         #verify some of the attributes
705         #don't care about values
706         self.assertTrue("ldapServiceName" in res[0])
707         self.assertTrue("namingContexts" in res[0])
708         self.assertTrue("isSynchronized" in res[0])
709         self.assertTrue("dsServiceName" in res[0])
710         self.assertTrue("supportedSASLMechanisms" in res[0])
711         self.assertTrue("isGlobalCatalogReady" in res[0])
712         self.assertTrue("domainControllerFunctionality" in res[0])
713         self.assertTrue("serverName" in res[0])
714
715     def test_search_anonymous2(self):
716         """Make sure we cannot access anything else"""
717         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
718         try:
719             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
720         except LdbError, (num, _):
721             self.assertEquals(num, ERR_OPERATIONS_ERROR)
722         else:
723             self.fail()
724         try:
725             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
726         except LdbError, (num, _):
727             self.assertEquals(num, ERR_OPERATIONS_ERROR)
728         else:
729             self.fail()
730         try:
731             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
732                                         scope=SCOPE_SUBTREE)
733         except LdbError, (num, _):
734             self.assertEquals(num, ERR_OPERATIONS_ERROR)
735         else:
736             self.fail()
737
738     def test_search_anonymous3(self):
739         """Set dsHeuristics and repeat"""
740         self.ldb_admin.set_dsheuristics("0000002")
741         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
742         mod = "(A;CI;LC;;;AN)"
743         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
744         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
745         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
746         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
747                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
748         self.assertEquals(len(res), 1)
749         self.assertTrue("dn" in res[0])
750         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
751                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
752         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
753                                scope=SCOPE_SUBTREE)
754         self.assertEquals(len(res), 1)
755         self.assertTrue("dn" in res[0])
756         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
757
758     def test_search1(self):
759         """Make sure users can see us if given LC to user and group"""
760         self.create_clean_ou("OU=ou1," + self.base_dn)
761         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
762         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
763         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
764                                                  self.domain_sid)
765         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
766         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
767         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
768         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
769         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
770
771         #regular users must see only ou1 and ou2
772         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
773                                     scope=SCOPE_SUBTREE)
774         self.assertEquals(len(res), 2)
775         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
776                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
777
778         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
779         self.assertEquals(sorted(res_list), sorted(ok_list))
780
781         #these users should see all ous
782         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
783                                     scope=SCOPE_SUBTREE)
784         self.assertEquals(len(res), 6)
785         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
786         self.assertEquals(sorted(res_list), sorted(self.full_list))
787
788         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
789                                     scope=SCOPE_SUBTREE)
790         self.assertEquals(len(res), 6)
791         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
792         self.assertEquals(sorted(res_list), sorted(self.full_list))
793
794     def test_search2(self):
795         """Make sure users can't see us if access is explicitly denied"""
796         self.create_clean_ou("OU=ou1," + self.base_dn)
797         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
798         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
799         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
800         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
801         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
802         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
803         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
804         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
805                                     scope=SCOPE_SUBTREE)
806         #this user should see all ous
807         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
808         self.assertEquals(sorted(res_list), sorted(self.full_list))
809
810         #these users should see ou1, 2, 5 and 6 but not 3 and 4
811         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
812                                     scope=SCOPE_SUBTREE)
813         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
814                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
815                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
816                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
817         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
818         self.assertEquals(sorted(res_list), sorted(ok_list))
819
820         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
821                                     scope=SCOPE_SUBTREE)
822         self.assertEquals(len(res), 4)
823         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
824         self.assertEquals(sorted(res_list), sorted(ok_list))
825
826     def test_search3(self):
827         """Make sure users can't see ous if access is explicitly denied - 2"""
828         self.create_clean_ou("OU=ou1," + self.base_dn)
829         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
830         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
831         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
832                                                  self.domain_sid)
833         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
834         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
835         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
836         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
837         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
838
839         print "Testing correct behavior on nonaccessible search base"
840         try:
841              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
842                                    scope=SCOPE_BASE)
843         except LdbError, (num, _):
844             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
845         else:
846             self.fail()
847
848         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
849         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
850
851         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
852                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
853
854         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
855                                     scope=SCOPE_SUBTREE)
856         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
857         self.assertEquals(sorted(res_list), sorted(ok_list))
858
859         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
860                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
861                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
862                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
863
864         #should not see ou3 and ou4, but should see ou5 and ou6
865         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
866                                     scope=SCOPE_SUBTREE)
867         self.assertEquals(len(res), 4)
868         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
869         self.assertEquals(sorted(res_list), sorted(ok_list))
870
871         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
872                                     scope=SCOPE_SUBTREE)
873         self.assertEquals(len(res), 4)
874         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
875         self.assertEquals(sorted(res_list), sorted(ok_list))
876
877     def test_search4(self):
878         """There is no difference in visibility if the user is also creator"""
879         self.create_clean_ou("OU=ou1," + self.base_dn)
880         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
881         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
882         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
883                                                  self.domain_sid)
884         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
885         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
886         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
887         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
888         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
889
890         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
891                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
892         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
893                                     scope=SCOPE_SUBTREE)
894         self.assertEquals(len(res), 2)
895         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
896         self.assertEquals(sorted(res_list), sorted(ok_list))
897
898         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
899                                     scope=SCOPE_SUBTREE)
900         self.assertEquals(len(res), 2)
901         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
902         self.assertEquals(sorted(res_list), sorted(ok_list))
903
904     def test_search5(self):
905         """Make sure users can see only attributes they are allowed to see"""
906         self.create_clean_ou("OU=ou1," + self.base_dn)
907         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
908         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
909         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
910                                                  self.domain_sid)
911         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
912         # assert user can only see dn
913         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
914                                     scope=SCOPE_SUBTREE)
915         ok_list = ['dn']
916         self.assertEquals(len(res), 1)
917         res_list = res[0].keys()
918         self.assertEquals(res_list, ok_list)
919
920         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
921                                     scope=SCOPE_BASE, attrs=["ou"])
922
923         self.assertEquals(len(res), 1)
924         res_list = res[0].keys()
925         self.assertEquals(res_list, ok_list)
926
927         #give read property on ou and assert user can only see dn and ou
928         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
929         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
930         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
931         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
932                                     scope=SCOPE_SUBTREE)
933         ok_list = ['dn', 'ou']
934         self.assertEquals(len(res), 1)
935         res_list = res[0].keys()
936         self.assertEquals(sorted(res_list), sorted(ok_list))
937
938         #give read property on Public Information and assert user can see ou and other members
939         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
940         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
941         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
942         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
943                                     scope=SCOPE_SUBTREE)
944
945         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
946         res_list = res[0].keys()
947         self.assertEquals(sorted(res_list), sorted(ok_list))
948
949     def test_search6(self):
950         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
951         self.create_clean_ou("OU=ou1," + self.base_dn)
952         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
953         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
954         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
955                                                  self.domain_sid)
956         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
957         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
958
959         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
960                                     scope=SCOPE_SUBTREE)
961         #nothing should be returned as ou is not accessible
962         self.assertEquals(len(res), 0)
963
964         #give read property on ou and assert user can only see dn and ou
965         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
966         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
967         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
968                                     scope=SCOPE_SUBTREE)
969         self.assertEquals(len(res), 1)
970         ok_list = ['dn', 'ou']
971         res_list = res[0].keys()
972         self.assertEquals(sorted(res_list), sorted(ok_list))
973
974         #give read property on Public Information and assert user can see ou and other members
975         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
976         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
977         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
978                                    scope=SCOPE_SUBTREE)
979         self.assertEquals(len(res), 1)
980         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
981         res_list = res[0].keys()
982         self.assertEquals(sorted(res_list), sorted(ok_list))
983
984 #tests on ldap delete operations
985 class AclDeleteTests(AclTests):
986
987     def setUp(self):
988         super(AclDeleteTests, self).setUp()
989         self.regular_user = "acl_delete_user1"
990         # Create regular user
991         self.ldb_admin.newuser(self.regular_user, self.user_pass)
992         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
993
994     def tearDown(self):
995         super(AclDeleteTests, self).tearDown()
996         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
997         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
998         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
999
1000     def test_delete_u1(self):
1001         """User is prohibited by default to delete another User object"""
1002         # Create user that we try to delete
1003         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1004         # Here delete User object should ALWAYS through exception
1005         try:
1006             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
1007         except LdbError, (num, _):
1008             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1009         else:
1010             self.fail()
1011
1012     def test_delete_u2(self):
1013         """User's group has RIGHT_DELETE to another User object"""
1014         user_dn = self.get_user_dn("test_delete_user1")
1015         # Create user that we try to delete
1016         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1017         mod = "(A;;SD;;;AU)"
1018         self.sd_utils.dacl_add_ace(user_dn, mod)
1019         # Try to delete User object
1020         self.ldb_user.delete(user_dn)
1021         res = self.ldb_admin.search(self.base_dn,
1022                 expression="(distinguishedName=%s)" % user_dn)
1023         self.assertEqual(len(res), 0)
1024
1025     def test_delete_u3(self):
1026         """User indentified by SID has RIGHT_DELETE to another User object"""
1027         user_dn = self.get_user_dn("test_delete_user1")
1028         # Create user that we try to delete
1029         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1030         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1031         self.sd_utils.dacl_add_ace(user_dn, mod)
1032         # Try to delete User object
1033         self.ldb_user.delete(user_dn)
1034         res = self.ldb_admin.search(self.base_dn,
1035                 expression="(distinguishedName=%s)" % user_dn)
1036         self.assertEqual(len(res), 0)
1037
1038     def test_delete_anonymous(self):
1039         """Test add operation with anonymous user"""
1040         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1041         self.ldb_admin.newuser("test_anonymous", "samba123@")
1042
1043         try:
1044             anonymous.delete(self.get_user_dn("test_anonymous"))
1045         except LdbError, (num, _):
1046             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1047         else:
1048             self.fail()
1049
1050 #tests on ldap rename operations
1051 class AclRenameTests(AclTests):
1052
1053     def setUp(self):
1054         super(AclRenameTests, self).setUp()
1055         self.regular_user = "acl_rename_user1"
1056         self.ou1 = "OU=test_rename_ou1"
1057         self.ou2 = "OU=test_rename_ou2"
1058         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1059         self.testuser1 = "test_rename_user1"
1060         self.testuser2 = "test_rename_user2"
1061         self.testuser3 = "test_rename_user3"
1062         self.testuser4 = "test_rename_user4"
1063         self.testuser5 = "test_rename_user5"
1064         # Create regular user
1065         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1066         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1067
1068     def tearDown(self):
1069         super(AclRenameTests, self).tearDown()
1070         # Rename OU3
1071         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1072         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1073         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1074         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1075         # Rename OU2
1076         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1077         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1078         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1079         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1080         # Rename OU1
1081         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1082         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1083         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1084         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1085         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1086         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1087
1088     def test_rename_u1(self):
1089         """Regular user fails to rename 'User object' within single OU"""
1090         # Create OU structure
1091         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1092         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1093         try:
1094             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1095                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1096         except LdbError, (num, _):
1097             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1098         else:
1099             self.fail()
1100
1101     def test_rename_u2(self):
1102         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1103         ou_dn = "OU=test_rename_ou1," + self.base_dn
1104         user_dn = "CN=test_rename_user1," + ou_dn
1105         rename_user_dn = "CN=test_rename_user5," + ou_dn
1106         # Create OU structure
1107         self.ldb_admin.create_ou(ou_dn)
1108         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1109         mod = "(A;;WP;;;AU)"
1110         self.sd_utils.dacl_add_ace(user_dn, mod)
1111         # Rename 'User object' having WP to AU
1112         self.ldb_user.rename(user_dn, rename_user_dn)
1113         res = self.ldb_admin.search(self.base_dn,
1114                 expression="(distinguishedName=%s)" % user_dn)
1115         self.assertEqual(len(res), 0)
1116         res = self.ldb_admin.search(self.base_dn,
1117                 expression="(distinguishedName=%s)" % rename_user_dn)
1118         self.assertNotEqual(len(res), 0)
1119
1120     def test_rename_u3(self):
1121         """Test rename with rights granted to 'User object' SID"""
1122         ou_dn = "OU=test_rename_ou1," + self.base_dn
1123         user_dn = "CN=test_rename_user1," + ou_dn
1124         rename_user_dn = "CN=test_rename_user5," + ou_dn
1125         # Create OU structure
1126         self.ldb_admin.create_ou(ou_dn)
1127         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1128         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1129         mod = "(A;;WP;;;%s)" % str(sid)
1130         self.sd_utils.dacl_add_ace(user_dn, mod)
1131         # Rename 'User object' having WP to AU
1132         self.ldb_user.rename(user_dn, rename_user_dn)
1133         res = self.ldb_admin.search(self.base_dn,
1134                 expression="(distinguishedName=%s)" % user_dn)
1135         self.assertEqual(len(res), 0)
1136         res = self.ldb_admin.search(self.base_dn,
1137                 expression="(distinguishedName=%s)" % rename_user_dn)
1138         self.assertNotEqual(len(res), 0)
1139
1140     def test_rename_u4(self):
1141         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1142         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1143         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1144         user_dn = "CN=test_rename_user2," + ou1_dn
1145         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1146         # Create OU structure
1147         self.ldb_admin.create_ou(ou1_dn)
1148         self.ldb_admin.create_ou(ou2_dn)
1149         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1150         mod = "(A;;WPSD;;;AU)"
1151         self.sd_utils.dacl_add_ace(user_dn, mod)
1152         mod = "(A;;CC;;;AU)"
1153         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1154         # Rename 'User object' having SD and CC to AU
1155         self.ldb_user.rename(user_dn, rename_user_dn)
1156         res = self.ldb_admin.search(self.base_dn,
1157                 expression="(distinguishedName=%s)" % user_dn)
1158         self.assertEqual(len(res), 0)
1159         res = self.ldb_admin.search(self.base_dn,
1160                 expression="(distinguishedName=%s)" % rename_user_dn)
1161         self.assertNotEqual(len(res), 0)
1162
1163     def test_rename_u5(self):
1164         """Test rename with rights granted to 'User object' SID"""
1165         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1166         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1167         user_dn = "CN=test_rename_user2," + ou1_dn
1168         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1169         # Create OU structure
1170         self.ldb_admin.create_ou(ou1_dn)
1171         self.ldb_admin.create_ou(ou2_dn)
1172         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1173         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1174         mod = "(A;;WPSD;;;%s)" % str(sid)
1175         self.sd_utils.dacl_add_ace(user_dn, mod)
1176         mod = "(A;;CC;;;%s)" % str(sid)
1177         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1178         # Rename 'User object' having SD and CC to AU
1179         self.ldb_user.rename(user_dn, rename_user_dn)
1180         res = self.ldb_admin.search(self.base_dn,
1181                 expression="(distinguishedName=%s)" % user_dn)
1182         self.assertEqual(len(res), 0)
1183         res = self.ldb_admin.search(self.base_dn,
1184                 expression="(distinguishedName=%s)" % rename_user_dn)
1185         self.assertNotEqual(len(res), 0)
1186
1187     def test_rename_u6(self):
1188         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1189         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1190         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1191         user_dn = "CN=test_rename_user2," + ou1_dn
1192         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1193         # Create OU structure
1194         self.ldb_admin.create_ou(ou1_dn)
1195         self.ldb_admin.create_ou(ou2_dn)
1196         #mod = "(A;CI;DCWP;;;AU)"
1197         mod = "(A;;DC;;;AU)"
1198         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1199         mod = "(A;;CC;;;AU)"
1200         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1201         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1202         mod = "(A;;WP;;;AU)"
1203         self.sd_utils.dacl_add_ace(user_dn, mod)
1204         # Rename 'User object' having SD and CC to AU
1205         self.ldb_user.rename(user_dn, rename_user_dn)
1206         res = self.ldb_admin.search(self.base_dn,
1207                 expression="(distinguishedName=%s)" % user_dn)
1208         self.assertEqual(len(res), 0)
1209         res = self.ldb_admin.search(self.base_dn,
1210                 expression="(distinguishedName=%s)" % rename_user_dn)
1211         self.assertNotEqual(len(res), 0)
1212
1213     def test_rename_u7(self):
1214         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1215         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1216         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1217         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1218         user_dn = "CN=test_rename_user2," + ou1_dn
1219         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1220         # Create OU structure
1221         self.ldb_admin.create_ou(ou1_dn)
1222         self.ldb_admin.create_ou(ou2_dn)
1223         self.ldb_admin.create_ou(ou3_dn)
1224         mod = "(A;CI;WPDC;;;AU)"
1225         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1226         mod = "(A;;CC;;;AU)"
1227         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1228         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1229         # Rename 'User object' having SD and CC to AU
1230         self.ldb_user.rename(user_dn, rename_user_dn)
1231         res = self.ldb_admin.search(self.base_dn,
1232                 expression="(distinguishedName=%s)" % user_dn)
1233         self.assertEqual(len(res), 0)
1234         res = self.ldb_admin.search(self.base_dn,
1235                 expression="(distinguishedName=%s)" % rename_user_dn)
1236         self.assertNotEqual(len(res), 0)
1237
1238     def test_rename_u8(self):
1239         """Test rename on an object with and without modify access on the RDN attribute"""
1240         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1241         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1242         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1243         # Create OU structure
1244         self.ldb_admin.create_ou(ou1_dn)
1245         self.ldb_admin.create_ou(ou2_dn)
1246         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1247         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1248         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1249         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1250         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1251         try:
1252             self.ldb_user.rename(ou2_dn, ou3_dn)
1253         except LdbError, (num, _):
1254             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1255         else:
1256             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1257             self.fail()
1258         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1259         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1260         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1261         self.ldb_user.rename(ou2_dn, ou3_dn)
1262         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1263         self.assertEqual(len(res), 0)
1264         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1265         self.assertNotEqual(len(res), 0)
1266
1267     def test_rename_u9(self):
1268         """Rename 'User object' cross OU, with explicit deny on sd and dc"""
1269         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1270         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1271         user_dn = "CN=test_rename_user2," + ou1_dn
1272         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1273         # Create OU structure
1274         self.ldb_admin.create_ou(ou1_dn)
1275         self.ldb_admin.create_ou(ou2_dn)
1276         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1277         mod = "(D;;SD;;;DA)"
1278         self.sd_utils.dacl_add_ace(user_dn, mod)
1279         mod = "(D;;DC;;;DA)"
1280         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1281         # Rename 'User object' having SD and CC to AU
1282         try:
1283             self.ldb_admin.rename(user_dn, rename_user_dn)
1284         except LdbError, (num, _):
1285             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1286         else:
1287             self.fail()
1288         #add an allow ace so we can delete this ou
1289         mod = "(A;;DC;;;DA)"
1290         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1291
1292
1293 #tests on Control Access Rights
1294 class AclCARTests(AclTests):
1295
1296     def setUp(self):
1297         super(AclCARTests, self).setUp()
1298
1299         # Get the old "dSHeuristics" if it was set
1300         dsheuristics = self.ldb_admin.get_dsheuristics()
1301         # Reset the "dSHeuristics" as they were before
1302         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
1303         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1304         self.ldb_admin.set_dsheuristics("000000001")
1305         # Get the old "minPwdAge"
1306         minPwdAge = self.ldb_admin.get_minPwdAge()
1307         # Reset the "minPwdAge" as it was before
1308         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
1309         # Set it temporarely to "0"
1310         self.ldb_admin.set_minPwdAge("0")
1311
1312         self.user_with_wp = "acl_car_user1"
1313         self.user_with_pc = "acl_car_user2"
1314         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1315         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1316         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1317         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1318
1319     def tearDown(self):
1320         super(AclCARTests, self).tearDown()
1321         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1322         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1323
1324     def test_change_password1(self):
1325         """Try a password change operation without any CARs given"""
1326         #users have change password by default - remove for negative testing
1327         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1328         sddl = desc.as_sddl(self.domain_sid)
1329         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1330         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1331         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1332         try:
1333             self.ldb_user.modify_ldif("""
1334 dn: """ + self.get_user_dn(self.user_with_wp) + """
1335 changetype: modify
1336 delete: unicodePwd
1337 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1338 add: unicodePwd
1339 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1340 """)
1341         except LdbError, (num, _):
1342             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1343         else:
1344             # for some reason we get constraint violation instead of insufficient access error
1345             self.fail()
1346
1347     def test_change_password2(self):
1348         """Make sure WP has no influence"""
1349         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1350         sddl = desc.as_sddl(self.domain_sid)
1351         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1352         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1353         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1354         mod = "(A;;WP;;;PS)"
1355         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1356         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1357         sddl = desc.as_sddl(self.domain_sid)
1358         try:
1359             self.ldb_user.modify_ldif("""
1360 dn: """ + self.get_user_dn(self.user_with_wp) + """
1361 changetype: modify
1362 delete: unicodePwd
1363 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1364 add: unicodePwd
1365 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1366 """)
1367         except LdbError, (num, _):
1368             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1369         else:
1370             # for some reason we get constraint violation instead of insufficient access error
1371             self.fail()
1372
1373     def test_change_password3(self):
1374         """Make sure WP has no influence"""
1375         mod = "(D;;WP;;;PS)"
1376         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1377         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1378         sddl = desc.as_sddl(self.domain_sid)
1379         self.ldb_user.modify_ldif("""
1380 dn: """ + self.get_user_dn(self.user_with_wp) + """
1381 changetype: modify
1382 delete: unicodePwd
1383 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1384 add: unicodePwd
1385 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1386 """)
1387
1388     def test_change_password5(self):
1389         """Make sure rights have no influence on dBCSPwd"""
1390         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1391         sddl = desc.as_sddl(self.domain_sid)
1392         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1393         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1394         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1395         mod = "(D;;WP;;;PS)"
1396         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1397         try:
1398             self.ldb_user.modify_ldif("""
1399 dn: """ + self.get_user_dn(self.user_with_wp) + """
1400 changetype: modify
1401 delete: dBCSPwd
1402 dBCSPwd: XXXXXXXXXXXXXXXX
1403 add: dBCSPwd
1404 dBCSPwd: YYYYYYYYYYYYYYYY
1405 """)
1406         except LdbError, (num, _):
1407             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1408         else:
1409             self.fail()
1410
1411     def test_change_password6(self):
1412         """Test uneven delete/adds"""
1413         try:
1414             self.ldb_user.modify_ldif("""
1415 dn: """ + self.get_user_dn(self.user_with_wp) + """
1416 changetype: modify
1417 delete: userPassword
1418 userPassword: thatsAcomplPASS1
1419 delete: userPassword
1420 userPassword: thatsAcomplPASS1
1421 add: userPassword
1422 userPassword: thatsAcomplPASS2
1423 """)
1424         except LdbError, (num, _):
1425             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1426         else:
1427             self.fail()
1428         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1429         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1430         try:
1431             self.ldb_user.modify_ldif("""
1432 dn: """ + self.get_user_dn(self.user_with_wp) + """
1433 changetype: modify
1434 delete: userPassword
1435 userPassword: thatsAcomplPASS1
1436 delete: userPassword
1437 userPassword: thatsAcomplPASS1
1438 add: userPassword
1439 userPassword: thatsAcomplPASS2
1440 """)
1441             # This fails on Windows 2000 domain level with constraint violation
1442         except LdbError, (num, _):
1443             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1444                             num == ERR_UNWILLING_TO_PERFORM)
1445         else:
1446             self.fail()
1447
1448
1449     def test_change_password7(self):
1450         """Try a password change operation without any CARs given"""
1451         #users have change password by default - remove for negative testing
1452         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1453         sddl = desc.as_sddl(self.domain_sid)
1454         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1455         #first change our own password
1456         self.ldb_user2.modify_ldif("""
1457 dn: """ + self.get_user_dn(self.user_with_pc) + """
1458 changetype: modify
1459 delete: unicodePwd
1460 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1461 add: unicodePwd
1462 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1463 """)
1464         #then someone else's
1465         self.ldb_user2.modify_ldif("""
1466 dn: """ + self.get_user_dn(self.user_with_wp) + """
1467 changetype: modify
1468 delete: unicodePwd
1469 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1470 add: unicodePwd
1471 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1472 """)
1473
1474     def test_reset_password1(self):
1475         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1476         try:
1477             self.ldb_user.modify_ldif("""
1478 dn: """ + self.get_user_dn(self.user_with_wp) + """
1479 changetype: modify
1480 replace: unicodePwd
1481 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1482 """)
1483         except LdbError, (num, _):
1484             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1485         else:
1486             self.fail()
1487         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1488         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1489         self.ldb_user.modify_ldif("""
1490 dn: """ + self.get_user_dn(self.user_with_wp) + """
1491 changetype: modify
1492 replace: unicodePwd
1493 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1494 """)
1495
1496     def test_reset_password2(self):
1497         """Try a user password reset operation (userPassword) before and after granting CAR"""
1498         try:
1499             self.ldb_user.modify_ldif("""
1500 dn: """ + self.get_user_dn(self.user_with_wp) + """
1501 changetype: modify
1502 replace: userPassword
1503 userPassword: thatsAcomplPASS1
1504 """)
1505         except LdbError, (num, _):
1506             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1507         else:
1508             self.fail()
1509         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1510         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1511         try:
1512             self.ldb_user.modify_ldif("""
1513 dn: """ + self.get_user_dn(self.user_with_wp) + """
1514 changetype: modify
1515 replace: userPassword
1516 userPassword: thatsAcomplPASS1
1517 """)
1518             # This fails on Windows 2000 domain level with constraint violation
1519         except LdbError, (num, _):
1520             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1521
1522     def test_reset_password3(self):
1523         """Grant WP and see what happens (unicodePwd)"""
1524         mod = "(A;;WP;;;PS)"
1525         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1526         try:
1527             self.ldb_user.modify_ldif("""
1528 dn: """ + self.get_user_dn(self.user_with_wp) + """
1529 changetype: modify
1530 replace: unicodePwd
1531 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1532 """)
1533         except LdbError, (num, _):
1534             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1535         else:
1536             self.fail()
1537
1538     def test_reset_password4(self):
1539         """Grant WP and see what happens (userPassword)"""
1540         mod = "(A;;WP;;;PS)"
1541         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1542         try:
1543             self.ldb_user.modify_ldif("""
1544 dn: """ + self.get_user_dn(self.user_with_wp) + """
1545 changetype: modify
1546 replace: userPassword
1547 userPassword: thatsAcomplPASS1
1548 """)
1549         except LdbError, (num, _):
1550             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1551         else:
1552             self.fail()
1553
1554     def test_reset_password5(self):
1555         """Explicitly deny WP but grant CAR (unicodePwd)"""
1556         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1557         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1558         self.ldb_user.modify_ldif("""
1559 dn: """ + self.get_user_dn(self.user_with_wp) + """
1560 changetype: modify
1561 replace: unicodePwd
1562 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1563 """)
1564
1565     def test_reset_password6(self):
1566         """Explicitly deny WP but grant CAR (userPassword)"""
1567         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1568         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1569         try:
1570             self.ldb_user.modify_ldif("""
1571 dn: """ + self.get_user_dn(self.user_with_wp) + """
1572 changetype: modify
1573 replace: userPassword
1574 userPassword: thatsAcomplPASS1
1575 """)
1576             # This fails on Windows 2000 domain level with constraint violation
1577         except LdbError, (num, _):
1578             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1579
1580 class AclExtendedTests(AclTests):
1581
1582     def setUp(self):
1583         super(AclExtendedTests, self).setUp()
1584         #regular user, will be the creator
1585         self.u1 = "ext_u1"
1586         #regular user
1587         self.u2 = "ext_u2"
1588         #admin user
1589         self.u3 = "ext_u3"
1590         self.ldb_admin.newuser(self.u1, self.user_pass)
1591         self.ldb_admin.newuser(self.u2, self.user_pass)
1592         self.ldb_admin.newuser(self.u3, self.user_pass)
1593         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
1594                                                 add_members_operation=True)
1595         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1596         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1597         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1598         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1599         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1600
1601     def tearDown(self):
1602         super(AclExtendedTests, self).tearDown()
1603         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1604         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1605         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1606         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1607         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1608
1609     def test_ntSecurityDescriptor(self):
1610         #create empty ou
1611         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1612         #give u1 Create children access
1613         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1614         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1615         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1616         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1617         #create a group under that, grant RP to u2
1618         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1619                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1620         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1621         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1622         #u2 must not read the descriptor
1623         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1624                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1625         self.assertNotEqual(len(res), 0)
1626         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1627         #grant RC to u2 - still no access
1628         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1629         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1630         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1631                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1632         self.assertNotEqual(len(res), 0)
1633         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1634         #u3 is member of administrators group, should be able to read sd
1635         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1636                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1637         self.assertEqual(len(res),1)
1638         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1639
1640 class AclUndeleteTests(AclTests):
1641
1642     def setUp(self):
1643         super(AclUndeleteTests, self).setUp()
1644         self.regular_user = "undeleter1"
1645         self.ou1 = "OU=undeleted_ou,"
1646         self.testuser1 = "to_be_undeleted1"
1647         self.testuser2 = "to_be_undeleted2"
1648         self.testuser3 = "to_be_undeleted3"
1649         self.testuser4 = "to_be_undeleted4"
1650         self.testuser5 = "to_be_undeleted5"
1651         self.testuser6 = "to_be_undeleted6"
1652
1653         self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn
1654
1655         # Create regular user
1656         self.testuser1_dn = self.get_user_dn(self.testuser1)
1657         self.testuser2_dn = self.get_user_dn(self.testuser2)
1658         self.testuser3_dn = self.get_user_dn(self.testuser3)
1659         self.testuser4_dn = self.get_user_dn(self.testuser4)
1660         self.testuser5_dn = self.get_user_dn(self.testuser5)
1661         self.deleted_dn1 = self.create_delete_user(self.testuser1)
1662         self.deleted_dn2 = self.create_delete_user(self.testuser2)
1663         self.deleted_dn3 = self.create_delete_user(self.testuser3)
1664         self.deleted_dn4 = self.create_delete_user(self.testuser4)
1665         self.deleted_dn5 = self.create_delete_user(self.testuser5)
1666
1667         self.ldb_admin.create_ou(self.ou1 + self.base_dn)
1668
1669         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1670         self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
1671                        add_members_operation=True)
1672         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1673         self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1674
1675     def tearDown(self):
1676         super(AclUndeleteTests, self).tearDown()
1677         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1678         delete_force(self.ldb_admin, self.get_user_dn(self.testuser1))
1679         delete_force(self.ldb_admin, self.get_user_dn(self.testuser2))
1680         delete_force(self.ldb_admin, self.get_user_dn(self.testuser3))
1681         delete_force(self.ldb_admin, self.get_user_dn(self.testuser4))
1682         delete_force(self.ldb_admin, self.get_user_dn(self.testuser5))
1683         delete_force(self.ldb_admin, self.new_dn_ou)
1684         delete_force(self.ldb_admin, self.ou1 + self.base_dn)
1685
1686     def GUID_string(self, guid):
1687         return ldb.schema_format_value("objectGUID", guid)
1688
1689     def create_delete_user(self, new_user):
1690         self.ldb_admin.newuser(new_user, self.user_pass)
1691
1692         res = self.ldb_admin.search(expression="(objectClass=*)",
1693                                     base=self.get_user_dn(new_user),
1694                                     scope=SCOPE_BASE,
1695                                     controls=["show_deleted:1"])
1696         guid = res[0]["objectGUID"][0]
1697         self.ldb_admin.delete(self.get_user_dn(new_user))
1698         res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
1699                          scope=SCOPE_BASE, controls=["show_deleted:1"])
1700         self.assertEquals(len(res), 1)
1701         return str(res[0].dn)
1702
1703     def undelete_deleted(self, olddn, newdn):
1704         msg = Message()
1705         msg.dn = Dn(self.ldb_user, olddn)
1706         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1707         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1708         res = self.ldb_user.modify(msg, ["show_recycled:1"])
1709
1710     def undelete_deleted_with_mod(self, olddn, newdn):
1711         msg = Message()
1712         msg.dn = Dn(ldb, olddn)
1713         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1714         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1715         msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
1716         res = self.ldb_user.modify(msg, ["show_deleted:1"])
1717
1718     def test_undelete(self):
1719         # it appears the user has to have LC on the old parent to be able to move the object
1720         # otherwise we get no such object. Since only System can modify the SD on deleted object
1721         # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
1722
1723         # deny write property on rdn, should fail
1724         mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1725         self.sd_utils.dacl_add_ace(self.deleted_dn1, mod)
1726         try:
1727             self.undelete_deleted(self.deleted_dn1, self.testuser1_dn)
1728             self.fail()
1729         except LdbError, (num, _):
1730             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1731
1732         # seems that permissions on isDeleted and distinguishedName are irrelevant
1733         mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1734         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1735         mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1736         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1737         self.undelete_deleted(self.deleted_dn2, self.testuser2_dn)
1738
1739         # attempt undelete with simultanious addition of url, WP to which is denied
1740         mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid)
1741         self.sd_utils.dacl_add_ace(self.deleted_dn3, mod)
1742         try:
1743             self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn)
1744             self.fail()
1745         except LdbError, (num, _):
1746             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1747
1748         # undelete in an ou, in which we have no right to create children
1749         mod = "(D;;CC;;;%s)" % str(self.sid)
1750         self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod)
1751         try:
1752             self.undelete_deleted(self.deleted_dn4, self.new_dn_ou)
1753             self.fail()
1754         except LdbError, (num, _):
1755             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1756
1757         # delete is not required
1758         mod = "(D;;SD;;;%s)" % str(self.sid)
1759         self.sd_utils.dacl_add_ace(self.deleted_dn5, mod)
1760         self.undelete_deleted(self.deleted_dn5, self.testuser5_dn)
1761
1762         # deny Reanimate-Tombstone, should fail
1763         mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid)
1764         self.sd_utils.dacl_add_ace(self.base_dn, mod)
1765         try:
1766             self.undelete_deleted(self.deleted_dn4, self.testuser4_dn)
1767             self.fail()
1768         except LdbError, (num, _):
1769             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1770
1771 class AclSPNTests(AclTests):
1772
1773     def setUp(self):
1774         super(AclSPNTests, self).setUp()
1775         self.dcname = "TESTSRV8"
1776         self.rodcname = "TESTRODC8"
1777         self.computername = "testcomp8"
1778         self.test_user = "spn_test_user8"
1779         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1780         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1781         self.site = "Default-First-Site-Name"
1782         self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
1783             site=self.site, netbios_name=self.rodcname, targetdir=None,
1784             domain=None)
1785         self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
1786                 netbios_name=self.dcname, targetdir=None, domain=None)
1787         self.ldb_admin.newuser(self.test_user, self.user_pass)
1788         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1789         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1790         self.create_computer(self.computername, self.dcctx.dnsdomain)
1791         self.create_rodc(self.rodcctx)
1792         self.create_dc(self.dcctx)
1793
1794     def tearDown(self):
1795         super(AclSPNTests, self).tearDown()
1796         self.rodcctx.cleanup_old_join()
1797         self.dcctx.cleanup_old_join()
1798         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1799         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1800
1801     def replace_spn(self, _ldb, dn, spn):
1802         print "Setting spn %s on %s" % (spn, dn)
1803         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1804                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1805         if "servicePrincipalName" in res[0].keys():
1806             flag = FLAG_MOD_REPLACE
1807         else:
1808             flag = FLAG_MOD_ADD
1809
1810         msg = Message()
1811         msg.dn = Dn(self.ldb_admin, dn)
1812         msg["servicePrincipalName"] = MessageElement(spn, flag,
1813                                                          "servicePrincipalName")
1814         _ldb.modify(msg)
1815
1816     def create_computer(self, computername, domainname):
1817         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1818         samaccountname = computername + "$"
1819         dnshostname = "%s.%s" % (computername, domainname)
1820         self.ldb_admin.add({
1821             "dn": dn,
1822             "objectclass": "computer",
1823             "sAMAccountName": samaccountname,
1824             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1825             "dNSHostName": dnshostname})
1826
1827     # same as for join_RODC, but do not set any SPNs
1828     def create_rodc(self, ctx):
1829          ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1830          ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1831          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1832
1833          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1834                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1835                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1836                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1837                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1838          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1839
1840          mysid = ctx.get_mysid()
1841          admin_dn = "<SID=%s>" % mysid
1842          ctx.managedby = admin_dn
1843
1844          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1845                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1846                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1847
1848          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1849          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1850          ctx.RODC = True
1851          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1852                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1853                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1854                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1855                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1856
1857          ctx.join_add_objects()
1858
1859     def create_dc(self, ctx):
1860         ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1861         ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1862         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1863         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1864         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1865                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1866                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1867                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1868                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1869
1870         ctx.join_add_objects()
1871
1872     def dc_spn_test(self, ctx):
1873         netbiosdomain = self.dcctx.get_domain_name()
1874         try:
1875             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1876         except LdbError, (num, _):
1877             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1878
1879         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1880         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1881         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1882         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1883         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1884                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1885         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1886         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1887                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1888         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1889                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
1890         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1891         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1892                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1893         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1894         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1895         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1896                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1897         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1898         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1899                          (ctx.myname, ctx.dnsdomain))
1900         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1901                          (ctx.myname))
1902         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1903                          (ctx.myname, ctx.dnsdomain))
1904         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1905                          (ctx.myname, ctx.dnsdomain))
1906         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1907                          (ctx.ntds_guid, ctx.dnsdomain))
1908
1909         #the following spns do not match the restrictions and should fail
1910         try:
1911             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1912                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1913         except LdbError, (num, _):
1914             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1915         try:
1916             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1917                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1918         except LdbError, (num, _):
1919             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1920         try:
1921             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1922         except LdbError, (num, _):
1923             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1924         try:
1925             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1926                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1927         except LdbError, (num, _):
1928             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1929         try:
1930             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1931                              (ctx.ntds_guid, ctx.dnsdomain))
1932         except LdbError, (num, _):
1933             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1934
1935     def test_computer_spn(self):
1936         # with WP, any value can be set
1937         netbiosdomain = self.dcctx.get_domain_name()
1938         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1939                          (self.computername, netbiosdomain))
1940         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1941         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1942                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1943         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1944                          (self.computername, self.dcctx.dnsdomain))
1945         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1946                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1947         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1948                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1949         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1950         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1951                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1952         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1953                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1954         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1955                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1956         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1957         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1958                          (self.computername, self.dcctx.dnsdomain))
1959         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1960                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1961         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1962                          (self.computername, self.dcctx.dnsdomain))
1963         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1964                          (self.computername, self.dcctx.dnsdomain))
1965         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1966                          (self.computername))
1967         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1968                          (self.computername, self.dcctx.dnsdomain))
1969         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1970                          (self.computername, self.dcctx.dnsdomain))
1971         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1972
1973         #user has neither WP nor Validated-SPN, access denied expected
1974         try:
1975             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1976         except LdbError, (num, _):
1977             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1978
1979         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1980         self.sd_utils.dacl_add_ace(self.computerdn, mod)
1981         #grant Validated-SPN and check which values are accepted
1982         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1983
1984         # for regular computer objects we shouldalways get constraint violation
1985
1986         # This does not pass against Windows, although it should according to docs
1987         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
1988         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
1989                              (self.computername, self.dcctx.dnsdomain))
1990
1991         try:
1992             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1993         except LdbError, (num, _):
1994             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1995         try:
1996             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1997                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1998         except LdbError, (num, _):
1999             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2000         try:
2001             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
2002                              (self.computername, self.dcctx.dnsdomain))
2003         except LdbError, (num, _):
2004             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2005         try:
2006             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
2007                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2008         except LdbError, (num, _):
2009             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2010         try:
2011             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
2012                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
2013         except LdbError, (num, _):
2014             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2015         try:
2016             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
2017         except LdbError, (num, _):
2018             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2019         try:
2020             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
2021                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2022         except LdbError, (num, _):
2023             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2024
2025     def test_spn_rwdc(self):
2026         self.dc_spn_test(self.dcctx)
2027
2028     def test_spn_rodc(self):
2029         self.dc_spn_test(self.rodcctx)
2030
2031
2032 # Important unit running information
2033
2034 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
2035
2036 TestProgram(module=__name__, opts=subunitopts)