s4-tests: Tests for Validated-SPN implementation.
[amitay/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.append("bin/python")
10 import samba
11 samba.ensure_external_module("testtools", "testtools")
12 samba.ensure_external_module("subunit", "subunit/python")
13
14 import samba.getopt as options
15
16 from ldb import (
17     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
18     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
19 from ldb import ERR_CONSTRAINT_VIOLATION
20 from ldb import ERR_OPERATIONS_ERROR
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
23 from samba.ndr import ndr_pack, ndr_unpack
24 from samba.dcerpc import security, drsuapi, misc, netlogon, nbt
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils, join
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials
30 import samba.tests
31 from samba.tests import delete_force
32 from subunit.run import SubunitTestRunner
33 import unittest
34
35 parser = optparse.OptionParser("acl.py [options] <host>")
36 sambaopts = options.SambaOptions(parser)
37 parser.add_option_group(sambaopts)
38 parser.add_option_group(options.VersionOptions(parser))
39
40 # use command line creds if available
41 credopts = options.CredentialsOptions(parser)
42 parser.add_option_group(credopts)
43 opts, args = parser.parse_args()
44
45 if len(args) < 1:
46     parser.print_usage()
47     sys.exit(1)
48
49 host = args[0]
50 if not "://" in host:
51     ldaphost = "ldap://%s" % host
52 else:
53     ldaphost = host
54     start = host.rindex("://")
55     host = host.lstrip(start+3)
56
57 lp = sambaopts.get_loadparm()
58 creds = credopts.get_credentials(lp)
59 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
60
61 #
62 # Tests start here
63 #
64
65 class AclTests(samba.tests.TestCase):
66
67     def setUp(self):
68         super(AclTests, self).setUp()
69         self.ldb_admin = ldb
70         self.base_dn = ldb.domain_dn()
71         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
72         self.user_pass = "samba123@"
73         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
74         self.sd_utils = sd_utils.SDUtils(ldb)
75         #used for anonymous login
76         self.creds_tmp = Credentials()
77         self.creds_tmp.set_username("")
78         self.creds_tmp.set_password("")
79         self.creds_tmp.set_domain(creds.get_domain())
80         self.creds_tmp.set_realm(creds.get_realm())
81         self.creds_tmp.set_workstation(creds.get_workstation())
82         print "baseDN: %s" % self.base_dn
83
84     def get_user_dn(self, name):
85         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
86
87     def get_ldb_connection(self, target_username, target_password):
88         creds_tmp = Credentials()
89         creds_tmp.set_username(target_username)
90         creds_tmp.set_password(target_password)
91         creds_tmp.set_domain(creds.get_domain())
92         creds_tmp.set_realm(creds.get_realm())
93         creds_tmp.set_workstation(creds.get_workstation())
94         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
95                                       | gensec.FEATURE_SEAL)
96         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
97         return ldb_target
98
99     # Test if we have any additional groups for users than default ones
100     def assert_user_no_group_member(self, username):
101         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
102         try:
103             self.assertEqual(res[0]["memberOf"][0], "")
104         except KeyError:
105             pass
106         else:
107             self.fail()
108
109 #tests on ldap add operations
110 class AclAddTests(AclTests):
111
112     def setUp(self):
113         super(AclAddTests, self).setUp()
114         # Domain admin that will be creator of OU parent-child structure
115         self.usr_admin_owner = "acl_add_user1"
116         # Second domain admin that will not be creator of OU parent-child structure
117         self.usr_admin_not_owner = "acl_add_user2"
118         # Regular user
119         self.regular_user = "acl_add_user3"
120         self.test_user1 = "test_add_user1"
121         self.test_group1 = "test_add_group1"
122         self.ou1 = "OU=test_add_ou1"
123         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
124         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
125         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
126         self.ldb_admin.newuser(self.regular_user, self.user_pass)
127
128         # add admins to the Domain Admins group
129         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_owner,
130                        add_members_operation=True)
131         self.ldb_admin.add_remove_group_members("Domain Admins", self.usr_admin_not_owner,
132                        add_members_operation=True)
133
134         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
135         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
136         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
137
138     def tearDown(self):
139         super(AclAddTests, self).tearDown()
140         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
141                           (self.test_user1, self.ou2, self.base_dn))
142         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
143                           (self.test_group1, self.ou2, self.base_dn))
144         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
145         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
146         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
147         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
148         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
149         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
150
151     # Make sure top OU is deleted (and so everything under it)
152     def assert_top_ou_deleted(self):
153         res = self.ldb_admin.search(self.base_dn,
154             expression="(distinguishedName=%s,%s)" % (
155                 "OU=test_add_ou1", self.base_dn))
156         self.assertEqual(res, [])
157
158     def test_add_u1(self):
159         """Testing OU with the rights of Doman Admin not creator of the OU """
160         self.assert_top_ou_deleted()
161         # Change descriptor for top level OU
162         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
163         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
164         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
165         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
166         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
167         # Test user and group creation with another domain admin's credentials
168         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
169         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
170                                    grouptype=4)
171         # Make sure we HAVE created the two objects -- user and group
172         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
173         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
174         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
175         self.assertTrue(len(res) > 0)
176         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
177         self.assertTrue(len(res) > 0)
178
179     def test_add_u2(self):
180         """Testing OU with the regular user that has no rights granted over the OU """
181         self.assert_top_ou_deleted()
182         # Create a parent-child OU structure with domain admin credentials
183         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
184         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
185         # Test user and group creation with regular user credentials
186         try:
187             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
188             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
189                                    grouptype=4)
190         except LdbError, (num, _):
191             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
192         else:
193             self.fail()
194         # Make sure we HAVEN'T created any of two objects -- user or group
195         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
196         self.assertEqual(res, [])
197         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
198         self.assertEqual(res, [])
199
200     def test_add_u3(self):
201         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
202         self.assert_top_ou_deleted()
203         # Change descriptor for top level OU
204         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
205         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
206         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
207         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
208         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
209         # Test user and group creation with granted user only to one of the objects
210         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
211         try:
212             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
213                                    grouptype=4)
214         except LdbError, (num, _):
215             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
216         else:
217             self.fail()
218         # Make sure we HAVE created the one of two objects -- user
219         res = self.ldb_admin.search(self.base_dn,
220                 expression="(distinguishedName=%s,%s)" %
221                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
222                     self.base_dn))
223         self.assertNotEqual(len(res), 0)
224         res = self.ldb_admin.search(self.base_dn,
225                 expression="(distinguishedName=%s,%s)" %
226                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
227                     self.base_dn) )
228         self.assertEqual(res, [])
229
230     def test_add_u4(self):
231         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
232         self.assert_top_ou_deleted()
233         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
234         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
235         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
236         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
237                                  grouptype=4)
238         # Make sure we have successfully created the two objects -- user and group
239         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
240         self.assertTrue(len(res) > 0)
241         res = self.ldb_admin.search(self.base_dn,
242                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
243         self.assertTrue(len(res) > 0)
244
245     def test_add_anonymous(self):
246         """Test add operation with anonymous user"""
247         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
248         try:
249             anonymous.newuser("test_add_anonymous", self.user_pass)
250         except LdbError, (num, _):
251             self.assertEquals(num, ERR_OPERATIONS_ERROR)
252         else:
253             self.fail()
254
255 #tests on ldap modify operations
256 class AclModifyTests(AclTests):
257
258     def setUp(self):
259         super(AclModifyTests, self).setUp()
260         self.user_with_wp = "acl_mod_user1"
261         self.user_with_sm = "acl_mod_user2"
262         self.user_with_group_sm = "acl_mod_user3"
263         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
264         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
265         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
266         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
267         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
268         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
269         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
270         self.ldb_admin.newgroup("test_modify_group2", grouptype=4)
271         self.ldb_admin.newgroup("test_modify_group3", grouptype=4)
272         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
273
274     def tearDown(self):
275         super(AclModifyTests, self).tearDown()
276         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
277         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
278         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
279         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
280         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
281         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
282         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
283         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
284         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
285         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
286
287     def test_modify_u1(self):
288         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
289         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
290         # First test object -- User
291         print "Testing modify on User object"
292         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
293         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
294         ldif = """
295 dn: """ + self.get_user_dn("test_modify_user1") + """
296 changetype: modify
297 replace: displayName
298 displayName: test_changed"""
299         self.ldb_user.modify_ldif(ldif)
300         res = self.ldb_admin.search(self.base_dn,
301                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
302         self.assertEqual(res[0]["displayName"][0], "test_changed")
303         # Second test object -- Group
304         print "Testing modify on Group object"
305         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
306         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
307         ldif = """
308 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
309 changetype: modify
310 replace: displayName
311 displayName: test_changed"""
312         self.ldb_user.modify_ldif(ldif)
313         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
314         self.assertEqual(res[0]["displayName"][0], "test_changed")
315         # Third test object -- Organizational Unit
316         print "Testing modify on OU object"
317         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
318         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
319         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
320         ldif = """
321 dn: OU=test_modify_ou1,""" + self.base_dn + """
322 changetype: modify
323 replace: displayName
324 displayName: test_changed"""
325         self.ldb_user.modify_ldif(ldif)
326         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
327         self.assertEqual(res[0]["displayName"][0], "test_changed")
328
329     def test_modify_u2(self):
330         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
331         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
332         # First test object -- User
333         print "Testing modify on User object"
334         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
335         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
336         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
337         # Modify on attribute you have rights for
338         ldif = """
339 dn: """ + self.get_user_dn("test_modify_user1") + """
340 changetype: modify
341 replace: displayName
342 displayName: test_changed"""
343         self.ldb_user.modify_ldif(ldif)
344         res = self.ldb_admin.search(self.base_dn,
345                 expression="(distinguishedName=%s)" %
346                 self.get_user_dn("test_modify_user1"))
347         self.assertEqual(res[0]["displayName"][0], "test_changed")
348         # Modify on attribute you do not have rights for granted
349         ldif = """
350 dn: """ + self.get_user_dn("test_modify_user1") + """
351 changetype: modify
352 replace: url
353 url: www.samba.org"""
354         try:
355             self.ldb_user.modify_ldif(ldif)
356         except LdbError, (num, _):
357             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
358         else:
359             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
360             self.fail()
361         # Second test object -- Group
362         print "Testing modify on Group object"
363         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
364         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
365         ldif = """
366 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
367 changetype: modify
368 replace: displayName
369 displayName: test_changed"""
370         self.ldb_user.modify_ldif(ldif)
371         res = self.ldb_admin.search(self.base_dn,
372                 expression="(distinguishedName=%s)" %
373                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
374         self.assertEqual(res[0]["displayName"][0], "test_changed")
375         # Modify on attribute you do not have rights for granted
376         ldif = """
377 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
378 changetype: modify
379 replace: url
380 url: www.samba.org"""
381         try:
382             self.ldb_user.modify_ldif(ldif)
383         except LdbError, (num, _):
384             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
385         else:
386             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
387             self.fail()
388         # Second test object -- Organizational Unit
389         print "Testing modify on OU object"
390         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
391         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
392         ldif = """
393 dn: OU=test_modify_ou1,""" + self.base_dn + """
394 changetype: modify
395 replace: displayName
396 displayName: test_changed"""
397         self.ldb_user.modify_ldif(ldif)
398         res = self.ldb_admin.search(self.base_dn,
399                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
400                     + self.base_dn))
401         self.assertEqual(res[0]["displayName"][0], "test_changed")
402         # Modify on attribute you do not have rights for granted
403         ldif = """
404 dn: OU=test_modify_ou1,""" + self.base_dn + """
405 changetype: modify
406 replace: url
407 url: www.samba.org"""
408         try:
409             self.ldb_user.modify_ldif(ldif)
410         except LdbError, (num, _):
411             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
412         else:
413             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
414             self.fail()
415
416     def test_modify_u3(self):
417         """7 Modify one attribute as you have no what so ever rights granted"""
418         # First test object -- User
419         print "Testing modify on User object"
420         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
421         # Modify on attribute you do not have rights for granted
422         ldif = """
423 dn: """ + self.get_user_dn("test_modify_user1") + """
424 changetype: modify
425 replace: url
426 url: www.samba.org"""
427         try:
428             self.ldb_user.modify_ldif(ldif)
429         except LdbError, (num, _):
430             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
431         else:
432             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
433             self.fail()
434
435         # Second test object -- Group
436         print "Testing modify on Group object"
437         self.ldb_admin.newgroup("test_modify_group1", grouptype=4)
438         # Modify on attribute you do not have rights for granted
439         ldif = """
440 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
441 changetype: modify
442 replace: url
443 url: www.samba.org"""
444         try:
445             self.ldb_user.modify_ldif(ldif)
446         except LdbError, (num, _):
447             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
448         else:
449             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
450             self.fail()
451
452         # Second test object -- Organizational Unit
453         print "Testing modify on OU object"
454         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
455         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
456         # Modify on attribute you do not have rights for granted
457         ldif = """
458 dn: OU=test_modify_ou1,""" + self.base_dn + """
459 changetype: modify
460 replace: url
461 url: www.samba.org"""
462         try:
463             self.ldb_user.modify_ldif(ldif)
464         except LdbError, (num, _):
465             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
466         else:
467             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
468             self.fail()
469
470
471     def test_modify_u4(self):
472         """11 Grant WP to PRINCIPAL_SELF and test modify"""
473         ldif = """
474 dn: """ + self.get_user_dn(self.user_with_wp) + """
475 changetype: modify
476 add: adminDescription
477 adminDescription: blah blah blah"""
478         try:
479             self.ldb_user.modify_ldif(ldif)
480         except LdbError, (num, _):
481             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
482         else:
483             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
484             self.fail()
485
486         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
487         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
488         # Modify on attribute you have rights for
489         self.ldb_user.modify_ldif(ldif)
490         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
491                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
492         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
493
494     def test_modify_u5(self):
495         """12 test self membership"""
496         ldif = """
497 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
498 changetype: modify
499 add: Member
500 Member: """ +  self.get_user_dn(self.user_with_sm)
501 #the user has no rights granted, this should fail
502         try:
503             self.ldb_user2.modify_ldif(ldif)
504         except LdbError, (num, _):
505             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
506         else:
507             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
508             self.fail()
509
510 #grant self-membership, should be able to add himself
511         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
512         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
513         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
514         self.ldb_user2.modify_ldif(ldif)
515         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
516                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
517         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
518 #but not other users
519         ldif = """
520 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
521 changetype: modify
522 add: Member
523 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
524         try:
525             self.ldb_user2.modify_ldif(ldif)
526         except LdbError, (num, _):
527             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
528         else:
529             self.fail()
530
531     def test_modify_u6(self):
532         """13 test self membership"""
533         ldif = """
534 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
535 changetype: modify
536 add: Member
537 Member: """ +  self.get_user_dn(self.user_with_sm) + """
538 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
539
540 #grant self-membership, should be able to add himself  but not others at the same time
541         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
542         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
543         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
544         try:
545             self.ldb_user2.modify_ldif(ldif)
546         except LdbError, (num, _):
547             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
548         else:
549             self.fail()
550
551     def test_modify_u7(self):
552         """13 User with WP modifying Member"""
553 #a second user is given write property permission
554         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
555         mod = "(A;;WP;;;%s)" % str(user_sid)
556         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
557         ldif = """
558 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
559 changetype: modify
560 add: Member
561 Member: """ +  self.get_user_dn(self.user_with_wp)
562         self.ldb_user.modify_ldif(ldif)
563         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
564                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
565         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
566         ldif = """
567 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
568 changetype: modify
569 delete: Member"""
570         self.ldb_user.modify_ldif(ldif)
571         ldif = """
572 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
573 changetype: modify
574 add: Member
575 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
576         self.ldb_user.modify_ldif(ldif)
577         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
578                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
579         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
580
581     def test_modify_anonymous(self):
582         """Test add operation with anonymous user"""
583         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
584         self.ldb_admin.newuser("test_anonymous", "samba123@")
585         m = Message()
586         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
587
588         m["description"] = MessageElement("sambauser2",
589                                           FLAG_MOD_ADD,
590                                           "description")
591         try:
592             anonymous.modify(m)
593         except LdbError, (num, _):
594             self.assertEquals(num, ERR_OPERATIONS_ERROR)
595         else:
596             self.fail()
597
598 #enable these when we have search implemented
599 class AclSearchTests(AclTests):
600
601     def setUp(self):
602         super(AclSearchTests, self).setUp()
603         self.u1 = "search_u1"
604         self.u2 = "search_u2"
605         self.u3 = "search_u3"
606         self.group1 = "group1"
607         self.ldb_admin.newuser(self.u1, self.user_pass)
608         self.ldb_admin.newuser(self.u2, self.user_pass)
609         self.ldb_admin.newuser(self.u3, self.user_pass)
610         self.ldb_admin.newgroup(self.group1, grouptype=-2147483646)
611         self.ldb_admin.add_remove_group_members(self.group1, self.u2,
612                                                 add_members_operation=True)
613         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
614         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
615         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
616         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
617                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
618                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
619                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
620                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
621                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
622         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
623         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
624
625     def create_clean_ou(self, object_dn):
626         """ Base repeating setup for unittests to follow """
627         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
628                 expression="distinguishedName=%s" % object_dn)
629         # Make sure top testing OU has been deleted before starting the test
630         self.assertEqual(res, [])
631         self.ldb_admin.create_ou(object_dn)
632         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
633         # Make sure there are inheritable ACEs initially
634         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
635         # Find and remove all inherit ACEs
636         res = re.findall("\(.*?\)", desc_sddl)
637         res = [x for x in res if ("CI" in x) or ("OI" in x)]
638         for x in res:
639             desc_sddl = desc_sddl.replace(x, "")
640         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
641         # can propagate from above
642         # remove SACL, we are not interested
643         desc_sddl = desc_sddl.replace(":AI", ":AIP")
644         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
645         # Verify all inheritable ACEs are gone
646         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
647         self.assertFalse("CI" in desc_sddl)
648         self.assertFalse("OI" in desc_sddl)
649
650     def tearDown(self):
651         super(AclSearchTests, self).tearDown()
652         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
653         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
654         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
655         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
656         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
657         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
658         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
659         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
660         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
661         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
662         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
663         delete_force(self.ldb_admin, self.get_user_dn("group1"))
664
665     def test_search_anonymous1(self):
666         """Verify access of rootDSE with the correct request"""
667         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
668         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
669         self.assertEquals(len(res), 1)
670         #verify some of the attributes
671         #dont care about values
672         self.assertTrue("ldapServiceName" in res[0])
673         self.assertTrue("namingContexts" in res[0])
674         self.assertTrue("isSynchronized" in res[0])
675         self.assertTrue("dsServiceName" in res[0])
676         self.assertTrue("supportedSASLMechanisms" in res[0])
677         self.assertTrue("isGlobalCatalogReady" in res[0])
678         self.assertTrue("domainControllerFunctionality" in res[0])
679         self.assertTrue("serverName" in res[0])
680
681     def test_search_anonymous2(self):
682         """Make sure we cannot access anything else"""
683         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
684         try:
685             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
686         except LdbError, (num, _):
687             self.assertEquals(num, ERR_OPERATIONS_ERROR)
688         else:
689             self.fail()
690         try:
691             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
692         except LdbError, (num, _):
693             self.assertEquals(num, ERR_OPERATIONS_ERROR)
694         else:
695             self.fail()
696         try:
697             res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
698                                         scope=SCOPE_SUBTREE)
699         except LdbError, (num, _):
700             self.assertEquals(num, ERR_OPERATIONS_ERROR)
701         else:
702             self.fail()
703
704     def test_search_anonymous3(self):
705         """Set dsHeuristics and repeat"""
706         self.ldb_admin.set_dsheuristics("0000002")
707         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
708         mod = "(A;CI;LC;;;AN)"
709         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
710         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
711         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
712         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
713                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
714         self.assertEquals(len(res), 1)
715         self.assertTrue("dn" in res[0])
716         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
717                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
718         res = anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
719                                scope=SCOPE_SUBTREE)
720         self.assertEquals(len(res), 1)
721         self.assertTrue("dn" in res[0])
722         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
723
724     def test_search1(self):
725         """Make sure users can see us if given LC to user and group"""
726         self.create_clean_ou("OU=ou1," + self.base_dn)
727         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
728         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
729         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
730                                                  self.domain_sid)
731         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
732         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
733         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
734         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
735         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
736
737         #regular users must see only ou1 and ou2
738         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
739                                     scope=SCOPE_SUBTREE)
740         self.assertEquals(len(res), 2)
741         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
742                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
743
744         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
745         self.assertEquals(sorted(res_list), sorted(ok_list))
746
747         #these users should see all ous
748         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
749                                     scope=SCOPE_SUBTREE)
750         self.assertEquals(len(res), 6)
751         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
752         self.assertEquals(sorted(res_list), sorted(self.full_list))
753
754         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
755                                     scope=SCOPE_SUBTREE)
756         self.assertEquals(len(res), 6)
757         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
758         self.assertEquals(sorted(res_list), sorted(self.full_list))
759
760     def test_search2(self):
761         """Make sure users can't see us if access is explicitly denied"""
762         self.create_clean_ou("OU=ou1," + self.base_dn)
763         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
764         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
765         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
766         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
767         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
768         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
769         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
770         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
771                                     scope=SCOPE_SUBTREE)
772         #this user should see all ous
773         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
774         self.assertEquals(sorted(res_list), sorted(self.full_list))
775
776         #these users should see ou1, 2, 5 and 6 but not 3 and 4
777         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
778                                     scope=SCOPE_SUBTREE)
779         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
780                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
781                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
782                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
783         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
784         self.assertEquals(sorted(res_list), sorted(ok_list))
785
786         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
787                                     scope=SCOPE_SUBTREE)
788         self.assertEquals(len(res), 4)
789         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
790         self.assertEquals(sorted(res_list), sorted(ok_list))
791
792     def test_search3(self):
793         """Make sure users can't see ous if access is explicitly denied - 2"""
794         self.create_clean_ou("OU=ou1," + self.base_dn)
795         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
796         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
797         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
798                                                  self.domain_sid)
799         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
800         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
801         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
802         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
803         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
804
805         print "Testing correct behavior on nonaccessible search base"
806         try:
807              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
808                                    scope=SCOPE_BASE)
809         except LdbError, (num, _):
810             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
811         else:
812             self.fail()
813
814         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
815         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
816
817         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
818                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
819
820         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
821                                     scope=SCOPE_SUBTREE)
822         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
823         self.assertEquals(sorted(res_list), sorted(ok_list))
824
825         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
826                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
827                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
828                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
829
830         #should not see ou3 and ou4, but should see ou5 and ou6
831         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
832                                     scope=SCOPE_SUBTREE)
833         self.assertEquals(len(res), 4)
834         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
835         self.assertEquals(sorted(res_list), sorted(ok_list))
836
837         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
838                                     scope=SCOPE_SUBTREE)
839         self.assertEquals(len(res), 4)
840         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
841         self.assertEquals(sorted(res_list), sorted(ok_list))
842
843     def test_search4(self):
844         """There is no difference in visibility if the user is also creator"""
845         self.create_clean_ou("OU=ou1," + self.base_dn)
846         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
847         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
848         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
849                                                  self.domain_sid)
850         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
851         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
852         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
853         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
854         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
855
856         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
857                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
858         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
859                                     scope=SCOPE_SUBTREE)
860         self.assertEquals(len(res), 2)
861         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
862         self.assertEquals(sorted(res_list), sorted(ok_list))
863
864         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
865                                     scope=SCOPE_SUBTREE)
866         self.assertEquals(len(res), 2)
867         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
868         self.assertEquals(sorted(res_list), sorted(ok_list))
869
870     def test_search5(self):
871         """Make sure users can see only attributes they are allowed to see"""
872         self.create_clean_ou("OU=ou1," + self.base_dn)
873         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
874         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
875         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
876                                                  self.domain_sid)
877         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
878         # assert user can only see dn
879         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
880                                     scope=SCOPE_SUBTREE)
881         ok_list = ['dn']
882         self.assertEquals(len(res), 1)
883         res_list = res[0].keys()
884         self.assertEquals(res_list, ok_list)
885
886         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
887                                     scope=SCOPE_BASE, attrs=["ou"])
888
889         self.assertEquals(len(res), 1)
890         res_list = res[0].keys()
891         self.assertEquals(res_list, ok_list)
892
893         #give read property on ou and assert user can only see dn and ou
894         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
895         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
896         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
897         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
898                                     scope=SCOPE_SUBTREE)
899         ok_list = ['dn', 'ou']
900         self.assertEquals(len(res), 1)
901         res_list = res[0].keys()
902         self.assertEquals(sorted(res_list), sorted(ok_list))
903
904         #give read property on Public Information and assert user can see ou and other members
905         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
906         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
907         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
908         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
909                                     scope=SCOPE_SUBTREE)
910
911         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
912         res_list = res[0].keys()
913         self.assertEquals(sorted(res_list), sorted(ok_list))
914
915     def test_search6(self):
916         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
917         self.create_clean_ou("OU=ou1," + self.base_dn)
918         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
919         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
920         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
921                                                  self.domain_sid)
922         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
923         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
924
925         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
926                                     scope=SCOPE_SUBTREE)
927         #nothing should be returned as ou is not accessible
928         self.assertEquals(res, [])
929
930         #give read property on ou and assert user can only see dn and ou
931         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
932         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
933         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
934                                     scope=SCOPE_SUBTREE)
935         self.assertEquals(len(res), 1)
936         ok_list = ['dn', 'ou']
937         res_list = res[0].keys()
938         self.assertEquals(sorted(res_list), sorted(ok_list))
939
940         #give read property on Public Information and assert user can see ou and other members
941         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
942         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
943         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
944                                    scope=SCOPE_SUBTREE)
945         self.assertEquals(len(res), 1)
946         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
947         res_list = res[0].keys()
948         self.assertEquals(sorted(res_list), sorted(ok_list))
949
950 #tests on ldap delete operations
951 class AclDeleteTests(AclTests):
952
953     def setUp(self):
954         super(AclDeleteTests, self).setUp()
955         self.regular_user = "acl_delete_user1"
956         # Create regular user
957         self.ldb_admin.newuser(self.regular_user, self.user_pass)
958         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
959
960     def tearDown(self):
961         super(AclDeleteTests, self).tearDown()
962         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
963         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
964         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
965
966     def test_delete_u1(self):
967         """User is prohibited by default to delete another User object"""
968         # Create user that we try to delete
969         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
970         # Here delete User object should ALWAYS through exception
971         try:
972             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
973         except LdbError, (num, _):
974             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
975         else:
976             self.fail()
977
978     def test_delete_u2(self):
979         """User's group has RIGHT_DELETE to another User object"""
980         user_dn = self.get_user_dn("test_delete_user1")
981         # Create user that we try to delete
982         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
983         mod = "(A;;SD;;;AU)"
984         self.sd_utils.dacl_add_ace(user_dn, mod)
985         # Try to delete User object
986         self.ldb_user.delete(user_dn)
987         res = self.ldb_admin.search(self.base_dn,
988                 expression="(distinguishedName=%s)" % user_dn)
989         self.assertEqual(res, [])
990
991     def test_delete_u3(self):
992         """User indentified by SID has RIGHT_DELETE to another User object"""
993         user_dn = self.get_user_dn("test_delete_user1")
994         # Create user that we try to delete
995         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
996         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
997         self.sd_utils.dacl_add_ace(user_dn, mod)
998         # Try to delete User object
999         self.ldb_user.delete(user_dn)
1000         res = self.ldb_admin.search(self.base_dn,
1001                 expression="(distinguishedName=%s)" % user_dn)
1002         self.assertEqual(res, [])
1003
1004     def test_delete_anonymous(self):
1005         """Test add operation with anonymous user"""
1006         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1007         self.ldb_admin.newuser("test_anonymous", "samba123@")
1008
1009         try:
1010             anonymous.delete(self.get_user_dn("test_anonymous"))
1011         except LdbError, (num, _):
1012             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1013         else:
1014             self.fail()
1015
1016 #tests on ldap rename operations
1017 class AclRenameTests(AclTests):
1018
1019     def setUp(self):
1020         super(AclRenameTests, self).setUp()
1021         self.regular_user = "acl_rename_user1"
1022         self.ou1 = "OU=test_rename_ou1"
1023         self.ou2 = "OU=test_rename_ou2"
1024         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1025         self.testuser1 = "test_rename_user1"
1026         self.testuser2 = "test_rename_user2"
1027         self.testuser3 = "test_rename_user3"
1028         self.testuser4 = "test_rename_user4"
1029         self.testuser5 = "test_rename_user5"
1030         # Create regular user
1031         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1032         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1033
1034     def tearDown(self):
1035         super(AclRenameTests, self).tearDown()
1036         # Rename OU3
1037         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1038         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1039         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1040         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1041         # Rename OU2
1042         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1043         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1044         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1045         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1046         # Rename OU1
1047         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1048         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1049         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1050         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1051         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1052         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1053
1054     def test_rename_u1(self):
1055         """Regular user fails to rename 'User object' within single OU"""
1056         # Create OU structure
1057         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1058         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1059         try:
1060             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1061                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1062         except LdbError, (num, _):
1063             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1064         else:
1065             self.fail()
1066
1067     def test_rename_u2(self):
1068         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1069         ou_dn = "OU=test_rename_ou1," + self.base_dn
1070         user_dn = "CN=test_rename_user1," + ou_dn
1071         rename_user_dn = "CN=test_rename_user5," + ou_dn
1072         # Create OU structure
1073         self.ldb_admin.create_ou(ou_dn)
1074         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1075         mod = "(A;;WP;;;AU)"
1076         self.sd_utils.dacl_add_ace(user_dn, mod)
1077         # Rename 'User object' having WP to AU
1078         self.ldb_user.rename(user_dn, rename_user_dn)
1079         res = self.ldb_admin.search(self.base_dn,
1080                 expression="(distinguishedName=%s)" % user_dn)
1081         self.assertEqual(res, [])
1082         res = self.ldb_admin.search(self.base_dn,
1083                 expression="(distinguishedName=%s)" % rename_user_dn)
1084         self.assertNotEqual(res, [])
1085
1086     def test_rename_u3(self):
1087         """Test rename with rights granted to 'User object' SID"""
1088         ou_dn = "OU=test_rename_ou1," + self.base_dn
1089         user_dn = "CN=test_rename_user1," + ou_dn
1090         rename_user_dn = "CN=test_rename_user5," + ou_dn
1091         # Create OU structure
1092         self.ldb_admin.create_ou(ou_dn)
1093         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1094         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1095         mod = "(A;;WP;;;%s)" % str(sid)
1096         self.sd_utils.dacl_add_ace(user_dn, mod)
1097         # Rename 'User object' having WP to AU
1098         self.ldb_user.rename(user_dn, rename_user_dn)
1099         res = self.ldb_admin.search(self.base_dn,
1100                 expression="(distinguishedName=%s)" % user_dn)
1101         self.assertEqual(res, [])
1102         res = self.ldb_admin.search(self.base_dn,
1103                 expression="(distinguishedName=%s)" % rename_user_dn)
1104         self.assertNotEqual(res, [])
1105
1106     def test_rename_u4(self):
1107         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1108         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1109         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1110         user_dn = "CN=test_rename_user2," + ou1_dn
1111         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1112         # Create OU structure
1113         self.ldb_admin.create_ou(ou1_dn)
1114         self.ldb_admin.create_ou(ou2_dn)
1115         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1116         mod = "(A;;WPSD;;;AU)"
1117         self.sd_utils.dacl_add_ace(user_dn, mod)
1118         mod = "(A;;CC;;;AU)"
1119         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1120         # Rename 'User object' having SD and CC to AU
1121         self.ldb_user.rename(user_dn, rename_user_dn)
1122         res = self.ldb_admin.search(self.base_dn,
1123                 expression="(distinguishedName=%s)" % user_dn)
1124         self.assertEqual(res, [])
1125         res = self.ldb_admin.search(self.base_dn,
1126                 expression="(distinguishedName=%s)" % rename_user_dn)
1127         self.assertNotEqual(res, [])
1128
1129     def test_rename_u5(self):
1130         """Test rename with rights granted to 'User object' SID"""
1131         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1132         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1133         user_dn = "CN=test_rename_user2," + ou1_dn
1134         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1135         # Create OU structure
1136         self.ldb_admin.create_ou(ou1_dn)
1137         self.ldb_admin.create_ou(ou2_dn)
1138         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1139         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1140         mod = "(A;;WPSD;;;%s)" % str(sid)
1141         self.sd_utils.dacl_add_ace(user_dn, mod)
1142         mod = "(A;;CC;;;%s)" % str(sid)
1143         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1144         # Rename 'User object' having SD and CC to AU
1145         self.ldb_user.rename(user_dn, rename_user_dn)
1146         res = self.ldb_admin.search(self.base_dn,
1147                 expression="(distinguishedName=%s)" % user_dn)
1148         self.assertEqual(res, [])
1149         res = self.ldb_admin.search(self.base_dn,
1150                 expression="(distinguishedName=%s)" % rename_user_dn)
1151         self.assertNotEqual(res, [])
1152
1153     def test_rename_u6(self):
1154         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1155         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1156         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1157         user_dn = "CN=test_rename_user2," + ou1_dn
1158         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1159         # Create OU structure
1160         self.ldb_admin.create_ou(ou1_dn)
1161         self.ldb_admin.create_ou(ou2_dn)
1162         #mod = "(A;CI;DCWP;;;AU)"
1163         mod = "(A;;DC;;;AU)"
1164         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1165         mod = "(A;;CC;;;AU)"
1166         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1167         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1168         mod = "(A;;WP;;;AU)"
1169         self.sd_utils.dacl_add_ace(user_dn, mod)
1170         # Rename 'User object' having SD and CC to AU
1171         self.ldb_user.rename(user_dn, rename_user_dn)
1172         res = self.ldb_admin.search(self.base_dn,
1173                 expression="(distinguishedName=%s)" % user_dn)
1174         self.assertEqual(res, [])
1175         res = self.ldb_admin.search(self.base_dn,
1176                 expression="(distinguishedName=%s)" % rename_user_dn)
1177         self.assertNotEqual(res, [])
1178
1179     def test_rename_u7(self):
1180         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1181         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1182         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1183         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1184         user_dn = "CN=test_rename_user2," + ou1_dn
1185         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1186         # Create OU structure
1187         self.ldb_admin.create_ou(ou1_dn)
1188         self.ldb_admin.create_ou(ou2_dn)
1189         self.ldb_admin.create_ou(ou3_dn)
1190         mod = "(A;CI;WPDC;;;AU)"
1191         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1192         mod = "(A;;CC;;;AU)"
1193         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1194         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1195         # Rename 'User object' having SD and CC to AU
1196         self.ldb_user.rename(user_dn, rename_user_dn)
1197         res = self.ldb_admin.search(self.base_dn,
1198                 expression="(distinguishedName=%s)" % user_dn)
1199         self.assertEqual(res, [])
1200         res = self.ldb_admin.search(self.base_dn,
1201                 expression="(distinguishedName=%s)" % rename_user_dn)
1202         self.assertNotEqual(res, [])
1203
1204     def test_rename_u8(self):
1205         """Test rename on an object with and without modify access on the RDN attribute"""
1206         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1207         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1208         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1209         # Create OU structure
1210         self.ldb_admin.create_ou(ou1_dn)
1211         self.ldb_admin.create_ou(ou2_dn)
1212         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1213         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1214         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1215         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1216         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1217         try:
1218             self.ldb_user.rename(ou2_dn, ou3_dn)
1219         except LdbError, (num, _):
1220             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1221         else:
1222             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1223             self.fail()
1224         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1225         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1226         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1227         self.ldb_user.rename(ou2_dn, ou3_dn)
1228         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1229         self.assertEqual(res, [])
1230         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1231         self.assertNotEqual(res, [])
1232
1233 #tests on Control Access Rights
1234 class AclCARTests(AclTests):
1235
1236     def setUp(self):
1237         super(AclCARTests, self).setUp()
1238         self.user_with_wp = "acl_car_user1"
1239         self.user_with_pc = "acl_car_user2"
1240         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1241         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1242         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1243         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1244
1245     def tearDown(self):
1246         super(AclCARTests, self).tearDown()
1247         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1248         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1249
1250     def test_change_password1(self):
1251         """Try a password change operation without any CARs given"""
1252         #users have change password by default - remove for negative testing
1253         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1254         sddl = desc.as_sddl(self.domain_sid)
1255         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1256         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1257         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1258         try:
1259             self.ldb_user.modify_ldif("""
1260 dn: """ + self.get_user_dn(self.user_with_wp) + """
1261 changetype: modify
1262 delete: unicodePwd
1263 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1264 add: unicodePwd
1265 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1266 """)
1267         except LdbError, (num, _):
1268             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1269         else:
1270             # for some reason we get constraint violation instead of insufficient access error
1271             self.fail()
1272
1273     def test_change_password2(self):
1274         """Make sure WP has no influence"""
1275         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1276         sddl = desc.as_sddl(self.domain_sid)
1277         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1278         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1279         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1280         mod = "(A;;WP;;;PS)"
1281         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1282         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1283         sddl = desc.as_sddl(self.domain_sid)
1284         try:
1285             self.ldb_user.modify_ldif("""
1286 dn: """ + self.get_user_dn(self.user_with_wp) + """
1287 changetype: modify
1288 delete: unicodePwd
1289 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1290 add: unicodePwd
1291 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1292 """)
1293         except LdbError, (num, _):
1294             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1295         else:
1296             # for some reason we get constraint violation instead of insufficient access error
1297             self.fail()
1298
1299     def test_change_password3(self):
1300         """Make sure WP has no influence"""
1301         mod = "(D;;WP;;;PS)"
1302         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1303         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1304         sddl = desc.as_sddl(self.domain_sid)
1305         self.ldb_user.modify_ldif("""
1306 dn: """ + self.get_user_dn(self.user_with_wp) + """
1307 changetype: modify
1308 delete: unicodePwd
1309 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1310 add: unicodePwd
1311 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1312 """)
1313
1314     def test_change_password5(self):
1315         """Make sure rights have no influence on dBCSPwd"""
1316         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1317         sddl = desc.as_sddl(self.domain_sid)
1318         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1319         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1320         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1321         mod = "(D;;WP;;;PS)"
1322         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1323         try:
1324             self.ldb_user.modify_ldif("""
1325 dn: """ + self.get_user_dn(self.user_with_wp) + """
1326 changetype: modify
1327 delete: dBCSPwd
1328 dBCSPwd: XXXXXXXXXXXXXXXX
1329 add: dBCSPwd
1330 dBCSPwd: YYYYYYYYYYYYYYYY
1331 """)
1332         except LdbError, (num, _):
1333             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1334         else:
1335             self.fail()
1336
1337     def test_change_password6(self):
1338         """Test uneven delete/adds"""
1339         try:
1340             self.ldb_user.modify_ldif("""
1341 dn: """ + self.get_user_dn(self.user_with_wp) + """
1342 changetype: modify
1343 delete: userPassword
1344 userPassword: thatsAcomplPASS1
1345 delete: userPassword
1346 userPassword: thatsAcomplPASS1
1347 add: userPassword
1348 userPassword: thatsAcomplPASS2
1349 """)
1350         except LdbError, (num, _):
1351             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1352         else:
1353             self.fail()
1354         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1355         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1356         try:
1357             self.ldb_user.modify_ldif("""
1358 dn: """ + self.get_user_dn(self.user_with_wp) + """
1359 changetype: modify
1360 delete: userPassword
1361 userPassword: thatsAcomplPASS1
1362 delete: userPassword
1363 userPassword: thatsAcomplPASS1
1364 add: userPassword
1365 userPassword: thatsAcomplPASS2
1366 """)
1367             # This fails on Windows 2000 domain level with constraint violation
1368         except LdbError, (num, _):
1369             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1370                             num == ERR_UNWILLING_TO_PERFORM)
1371         else:
1372             self.fail()
1373
1374
1375     def test_change_password7(self):
1376         """Try a password change operation without any CARs given"""
1377         #users have change password by default - remove for negative testing
1378         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1379         sddl = desc.as_sddl(self.domain_sid)
1380         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1381         #first change our own password
1382         self.ldb_user2.modify_ldif("""
1383 dn: """ + self.get_user_dn(self.user_with_pc) + """
1384 changetype: modify
1385 delete: unicodePwd
1386 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1387 add: unicodePwd
1388 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1389 """)
1390         #then someone else's
1391         self.ldb_user2.modify_ldif("""
1392 dn: """ + self.get_user_dn(self.user_with_wp) + """
1393 changetype: modify
1394 delete: unicodePwd
1395 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1396 add: unicodePwd
1397 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1398 """)
1399
1400     def test_reset_password1(self):
1401         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1402         try:
1403             self.ldb_user.modify_ldif("""
1404 dn: """ + self.get_user_dn(self.user_with_wp) + """
1405 changetype: modify
1406 replace: unicodePwd
1407 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1408 """)
1409         except LdbError, (num, _):
1410             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1411         else:
1412             self.fail()
1413         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1414         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1415         self.ldb_user.modify_ldif("""
1416 dn: """ + self.get_user_dn(self.user_with_wp) + """
1417 changetype: modify
1418 replace: unicodePwd
1419 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1420 """)
1421
1422     def test_reset_password2(self):
1423         """Try a user password reset operation (userPassword) before and after granting CAR"""
1424         try:
1425             self.ldb_user.modify_ldif("""
1426 dn: """ + self.get_user_dn(self.user_with_wp) + """
1427 changetype: modify
1428 replace: userPassword
1429 userPassword: thatsAcomplPASS1
1430 """)
1431         except LdbError, (num, _):
1432             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1433         else:
1434             self.fail()
1435         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1436         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1437         try:
1438             self.ldb_user.modify_ldif("""
1439 dn: """ + self.get_user_dn(self.user_with_wp) + """
1440 changetype: modify
1441 replace: userPassword
1442 userPassword: thatsAcomplPASS1
1443 """)
1444             # This fails on Windows 2000 domain level with constraint violation
1445         except LdbError, (num, _):
1446             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1447
1448     def test_reset_password3(self):
1449         """Grant WP and see what happens (unicodePwd)"""
1450         mod = "(A;;WP;;;PS)"
1451         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1452         try:
1453             self.ldb_user.modify_ldif("""
1454 dn: """ + self.get_user_dn(self.user_with_wp) + """
1455 changetype: modify
1456 replace: unicodePwd
1457 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1458 """)
1459         except LdbError, (num, _):
1460             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1461         else:
1462             self.fail()
1463
1464     def test_reset_password4(self):
1465         """Grant WP and see what happens (userPassword)"""
1466         mod = "(A;;WP;;;PS)"
1467         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1468         try:
1469             self.ldb_user.modify_ldif("""
1470 dn: """ + self.get_user_dn(self.user_with_wp) + """
1471 changetype: modify
1472 replace: userPassword
1473 userPassword: thatsAcomplPASS1
1474 """)
1475         except LdbError, (num, _):
1476             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1477         else:
1478             self.fail()
1479
1480     def test_reset_password5(self):
1481         """Explicitly deny WP but grant CAR (unicodePwd)"""
1482         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1483         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1484         self.ldb_user.modify_ldif("""
1485 dn: """ + self.get_user_dn(self.user_with_wp) + """
1486 changetype: modify
1487 replace: unicodePwd
1488 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1489 """)
1490
1491     def test_reset_password6(self):
1492         """Explicitly deny WP but grant CAR (userPassword)"""
1493         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1494         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1495         try:
1496             self.ldb_user.modify_ldif("""
1497 dn: """ + self.get_user_dn(self.user_with_wp) + """
1498 changetype: modify
1499 replace: userPassword
1500 userPassword: thatsAcomplPASS1
1501 """)
1502             # This fails on Windows 2000 domain level with constraint violation
1503         except LdbError, (num, _):
1504             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1505
1506 class AclExtendedTests(AclTests):
1507
1508     def setUp(self):
1509         super(AclExtendedTests, self).setUp()
1510         #regular user, will be the creator
1511         self.u1 = "ext_u1"
1512         #regular user
1513         self.u2 = "ext_u2"
1514         #admin user
1515         self.u3 = "ext_u3"
1516         self.ldb_admin.newuser(self.u1, self.user_pass)
1517         self.ldb_admin.newuser(self.u2, self.user_pass)
1518         self.ldb_admin.newuser(self.u3, self.user_pass)
1519         self.ldb_admin.add_remove_group_members("Domain Admins", self.u3,
1520                                                 add_members_operation=True)
1521         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1522         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1523         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1524         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1525         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1526
1527     def tearDown(self):
1528         super(AclExtendedTests, self).tearDown()
1529         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1530         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1531         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1532         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1533         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1534
1535     def test_ntSecurityDescriptor(self):
1536         #create empty ou
1537         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1538         #give u1 Create children access
1539         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1540         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1541         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1542         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1543         #create a group under that, grant RP to u2
1544         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1", grouptype=4)
1545         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1546         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1547         #u2 must not read the descriptor
1548         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1549                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1550         self.assertNotEqual(res,[])
1551         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1552         #grant RC to u2 - still no access
1553         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1554         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1555         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1556                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1557         self.assertNotEqual(res,[])
1558         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1559         #u3 is member of administrators group, should be able to read sd
1560         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1561                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1562         self.assertEqual(len(res),1)
1563         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1564
1565 class AclSPNTests(AclTests):
1566
1567     def setUp(self):
1568         super(AclSPNTests, self).setUp()
1569         self.dcname = "TESTSRV8"
1570         self.rodcname = "TESTRODC8"
1571         self.computername = "testcomp8"
1572         self.test_user = "spn_test_user8"
1573         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1574         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1575         self.site = "Default-First-Site-Name"
1576         self.rodcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.rodcname,
1577                                           targetdir=None, domain=None)
1578         self.dcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.dcname,
1579                                         targetdir=None, domain=None)
1580         self.ldb_admin.newuser(self.test_user, self.user_pass)
1581         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1582         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1583         self.create_computer(self.computername, self.dcctx.dnsdomain)
1584         self.create_rodc(self.rodcctx)
1585         self.create_dc(self.dcctx)
1586
1587     def tearDown(self):
1588         super(AclSPNTests, self).tearDown()
1589         self.rodcctx.cleanup_old_join()
1590         self.dcctx.cleanup_old_join()
1591         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1592         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1593
1594     def replace_spn(self, _ldb, dn, spn):
1595         print "Setting spn %s on %s" % (spn, dn)
1596         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1597                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1598         if "servicePrincipalName" in res[0].keys():
1599             flag = FLAG_MOD_REPLACE
1600         else:
1601             flag = FLAG_MOD_ADD
1602
1603         msg = Message()
1604         msg.dn = Dn(self.ldb_admin, dn)
1605         msg["servicePrincipalName"] = MessageElement(spn, flag,
1606                                                          "servicePrincipalName")
1607         _ldb.modify(msg)
1608
1609     def create_computer(self, computername, domainname):
1610         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1611         samaccountname = computername + "$"
1612         dnshostname = "%s.%s" % (computername, domainname)
1613         self.ldb_admin.add({
1614             "dn": dn,
1615             "objectclass": "computer",
1616             "sAMAccountName": samaccountname,
1617             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1618             "dNSHostName": dnshostname})
1619
1620     # same as for join_RODC, but do not set any SPNs
1621     def create_rodc(self, ctx):
1622          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1623
1624          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1625                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1626                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1627                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1628                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1629          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1630
1631          mysid = ctx.get_mysid()
1632          admin_dn = "<SID=%s>" % mysid
1633          ctx.managedby = admin_dn
1634
1635          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1636                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1637                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1638
1639          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1640          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1641          ctx.RODC = True
1642          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1643                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1644                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1645                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1646                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1647
1648          ctx.join_add_objects()
1649
1650     def create_dc(self, ctx):
1651         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1652         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1653         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1654                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1655                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1656                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1657                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1658
1659         ctx.join_add_objects()
1660
1661     def dc_spn_test(self, ctx):
1662         netbiosdomain = self.dcctx.get_domain_name()
1663         try:
1664             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1665         except LdbError, (num, _):
1666             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1667
1668         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1669         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1670         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1671         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1672         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1673                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1674         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1675         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1676                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1677         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1678                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1679         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1680         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1681                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1682         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1683         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1684         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1685                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1686         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1687         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1688                          (ctx.myname, ctx.dnsdomain))
1689         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1690                          (ctx.myname))
1691         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1692                          (ctx.myname, ctx.dnsdomain))
1693         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1694                          (ctx.myname, ctx.dnsdomain))
1695         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1696                          (ctx.ntds_guid, ctx.dnsdomain))
1697
1698         #the following spns do not match the restrictions and should fail
1699         try:
1700             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1701                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1702         except LdbError, (num, _):
1703             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1704         try:
1705             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1706                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1707         except LdbError, (num, _):
1708             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1709         try:
1710             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1711         except LdbError, (num, _):
1712             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1713         try:
1714             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1715                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1716         except LdbError, (num, _):
1717             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1718         try:
1719             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1720                              (ctx.ntds_guid, ctx.dnsdomain))
1721         except LdbError, (num, _):
1722             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1723
1724     def test_computer_spn(self):
1725         # with WP, any value can be set
1726         netbiosdomain = self.dcctx.get_domain_name()
1727         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1728                          (self.computername, netbiosdomain))
1729         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1730         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1731                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1732         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1733                          (self.computername, self.dcctx.dnsdomain))
1734         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1735                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1736         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1737                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1738         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1739         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1740                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1741         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1742                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1743         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1744                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1745         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1746         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1747                          (self.computername, self.dcctx.dnsdomain))
1748         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1749                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1750         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1751                          (self.computername, self.dcctx.dnsdomain))
1752         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1753                          (self.computername, self.dcctx.dnsdomain))
1754         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1755                          (self.computername))
1756         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1757                          (self.computername, self.dcctx.dnsdomain))
1758         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1759                          (self.computername, self.dcctx.dnsdomain))
1760         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1761
1762         #user has neither WP nor Validated-SPN, access denied expected
1763         try:
1764             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1765         except LdbError, (num, _):
1766             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1767
1768         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1769         self.sd_utils.dacl_add_ace(self.computerdn, mod)
1770         #grant Validated-SPN and check which values are accepted
1771         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1772
1773         # for regular computer objects we shouldalways get constraint violation
1774
1775         # This does not pass against Windows, although it should according to docs
1776         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
1777         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
1778                              (self.computername, self.dcctx.dnsdomain))
1779
1780         try:
1781             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1782         except LdbError, (num, _):
1783             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1784         try:
1785             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1786                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1787         except LdbError, (num, _):
1788             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1789         try:
1790             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
1791                              (self.computername, self.dcctx.dnsdomain))
1792         except LdbError, (num, _):
1793             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1794         try:
1795             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1796                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1797         except LdbError, (num, _):
1798             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1799         try:
1800             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
1801                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1802         except LdbError, (num, _):
1803             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1804         try:
1805             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1806         except LdbError, (num, _):
1807             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1808         try:
1809             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1810                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1811         except LdbError, (num, _):
1812             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1813
1814     def test_spn_rwdc(self):
1815         self.dc_spn_test(self.dcctx)
1816
1817     def test_spn_rodc(self):
1818         self.dc_spn_test(self.rodcctx)
1819
1820
1821 # Important unit running information
1822
1823 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
1824
1825 runner = SubunitTestRunner()
1826 rc = 0
1827 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1828     rc = 1
1829 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1830     rc = 1
1831 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1832     rc = 1
1833 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1834     rc = 1
1835
1836 # Get the old "dSHeuristics" if it was set
1837 dsheuristics = ldb.get_dsheuristics()
1838 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1839 ldb.set_dsheuristics("000000001")
1840 # Get the old "minPwdAge"
1841 minPwdAge = ldb.get_minPwdAge()
1842 # Set it temporarely to "0"
1843 ldb.set_minPwdAge("0")
1844 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1845     rc = 1
1846 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1847     rc = 1
1848 # Reset the "dSHeuristics" as they were before
1849 ldb.set_dsheuristics(dsheuristics)
1850 # Reset the "minPwdAge" as it was before
1851 ldb.set_minPwdAge(minPwdAge)
1852
1853 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1854     rc = 1
1855 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
1856     rc = 1
1857 sys.exit(rc)