ecda3c5db61e31111dd23dd6964dde8ab70d912c
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11 samba.ensure_external_module("testtools", "testtools")
12 samba.ensure_external_module("subunit", "subunit/python")
13
14 import samba.getopt as options
15 from samba.join import dc_join
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
24 from samba.dcerpc import security, drsuapi, misc
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials, DONT_USE_KERBEROS
30 import samba.tests
31 from samba.tests import delete_force
32 from subunit.run import SubunitTestRunner
33 import unittest
34 import samba.dsdb
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51 if not "://" in host:
52     ldaphost = "ldap://%s" % host
53 else:
54     ldaphost = host
55     start = host.rindex("://")
56     host = host.lstrip(start+3)
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
61
62 #
63 # Tests start here
64 #
65
66 class AclTests(samba.tests.TestCase):
67
68     def setUp(self):
69         super(AclTests, self).setUp()
70         self.ldb_admin = ldb
71         self.base_dn = ldb.domain_dn()
72         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
73         self.user_pass = "samba123@"
74         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
75         self.sd_utils = sd_utils.SDUtils(ldb)
76         #used for anonymous login
77         self.creds_tmp = Credentials()
78         self.creds_tmp.set_username("")
79         self.creds_tmp.set_password("")
80         self.creds_tmp.set_domain(creds.get_domain())
81         self.creds_tmp.set_realm(creds.get_realm())
82         self.creds_tmp.set_workstation(creds.get_workstation())
83         print "baseDN: %s" % self.base_dn
84
85     def get_user_dn(self, name):
86         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
87
88     def get_ldb_connection(self, target_username, target_password):
89         creds_tmp = Credentials()
90         creds_tmp.set_username(target_username)
91         creds_tmp.set_password(target_password)
92         creds_tmp.set_domain(creds.get_domain())
93         creds_tmp.set_realm(creds.get_realm())
94         creds_tmp.set_workstation(creds.get_workstation())
95         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
96                                       | gensec.FEATURE_SEAL)
97         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
98         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
99         return ldb_target
100
101     # Test if we have any additional groups for users than default ones
102     def assert_user_no_group_member(self, username):
103         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
104         try:
105             self.assertEqual(res[0]["memberOf"][0], "")
106         except KeyError:
107             pass
108         else:
109             self.fail()
110
111 #tests on ldap add operations
112 class AclAddTests(AclTests):
113
114     def setUp(self):
115         super(AclAddTests, self).setUp()
116         # Domain admin that will be creator of OU parent-child structure
117         self.usr_admin_owner = "acl_add_user1"
118         # Second domain admin that will not be creator of OU parent-child structure
119         self.usr_admin_not_owner = "acl_add_user2"
120         # Regular user
121         self.regular_user = "acl_add_user3"
122         self.test_user1 = "test_add_user1"
123         self.test_group1 = "test_add_group1"
124         self.ou1 = "OU=test_add_ou1"
125         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
126         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
127         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
128         self.ldb_admin.newuser(self.regular_user, self.user_pass)
129
130         # add admins to the Domain Admins group
131         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
132                        add_members_operation=True)
133         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
134                        add_members_operation=True)
135
136         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
137         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
138         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
139
140     def tearDown(self):
141         super(AclAddTests, self).tearDown()
142         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
143                           (self.test_user1, self.ou2, self.base_dn))
144         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
145                           (self.test_group1, self.ou2, self.base_dn))
146         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
147         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
148         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
149         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
150         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
151         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
152
153     # Make sure top OU is deleted (and so everything under it)
154     def assert_top_ou_deleted(self):
155         res = self.ldb_admin.search(self.base_dn,
156             expression="(distinguishedName=%s,%s)" % (
157                 "OU=test_add_ou1", self.base_dn))
158         self.assertEqual(len(res), 0)
159
160     def test_add_u1(self):
161         """Testing OU with the rights of Doman Admin not creator of the OU """
162         self.assert_top_ou_deleted()
163         # Change descriptor for top level OU
164         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
165         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
166         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
167         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
168         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
169         # Test user and group creation with another domain admin's credentials
170         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
171         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
172                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
173         # Make sure we HAVE created the two objects -- user and group
174         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
175         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
176         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
177         self.assertTrue(len(res) > 0)
178         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
179         self.assertTrue(len(res) > 0)
180
181     def test_add_u2(self):
182         """Testing OU with the regular user that has no rights granted over the OU """
183         self.assert_top_ou_deleted()
184         # Create a parent-child OU structure with domain admin credentials
185         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
186         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
187         # Test user and group creation with regular user credentials
188         try:
189             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
190             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
191                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
192         except LdbError, (num, _):
193             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
194         else:
195             self.fail()
196         # Make sure we HAVEN'T created any of two objects -- user or group
197         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
198         self.assertEqual(len(res), 0)
199         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
200         self.assertEqual(len(res), 0)
201
202     def test_add_u3(self):
203         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
204         self.assert_top_ou_deleted()
205         # Change descriptor for top level OU
206         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
207         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
208         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
209         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
210         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
211         # Test user and group creation with granted user only to one of the objects
212         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
213         try:
214             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
215                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
216         except LdbError, (num, _):
217             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
218         else:
219             self.fail()
220         # Make sure we HAVE created the one of two objects -- user
221         res = self.ldb_admin.search(self.base_dn,
222                 expression="(distinguishedName=%s,%s)" %
223                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
224                     self.base_dn))
225         self.assertNotEqual(len(res), 0)
226         res = self.ldb_admin.search(self.base_dn,
227                 expression="(distinguishedName=%s,%s)" %
228                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
229                     self.base_dn) )
230         self.assertEqual(len(res), 0)
231
232     def test_add_u4(self):
233         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
234         self.assert_top_ou_deleted()
235         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
236         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
237         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
238         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
239                                  grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
240         # Make sure we have successfully created the two objects -- user and group
241         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
242         self.assertTrue(len(res) > 0)
243         res = self.ldb_admin.search(self.base_dn,
244                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
245         self.assertTrue(len(res) > 0)
246
247     def test_add_anonymous(self):
248         """Test add operation with anonymous user"""
249         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
250         try:
251             anonymous.newuser("test_add_anonymous", self.user_pass)
252         except LdbError, (num, _):
253             self.assertEquals(num, ERR_OPERATIONS_ERROR)
254         else:
255             self.fail()
256
257 #tests on ldap modify operations
258 class AclModifyTests(AclTests):
259
260     def setUp(self):
261         super(AclModifyTests, self).setUp()
262         self.user_with_wp = "acl_mod_user1"
263         self.user_with_sm = "acl_mod_user2"
264         self.user_with_group_sm = "acl_mod_user3"
265         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
266         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
267         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
268         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
269         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
270         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
271         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
272         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
273         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
274         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
275
276     def tearDown(self):
277         super(AclModifyTests, self).tearDown()
278         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
279         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
280         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
281         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
282         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
283         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
284         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
285         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
286         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
287         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
288
289     def test_modify_u1(self):
290         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
291         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
292         # First test object -- User
293         print "Testing modify on User object"
294         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
295         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
296         ldif = """
297 dn: """ + self.get_user_dn("test_modify_user1") + """
298 changetype: modify
299 replace: displayName
300 displayName: test_changed"""
301         self.ldb_user.modify_ldif(ldif)
302         res = self.ldb_admin.search(self.base_dn,
303                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
304         self.assertEqual(res[0]["displayName"][0], "test_changed")
305         # Second test object -- Group
306         print "Testing modify on Group object"
307         self.ldb_admin.newgroup("test_modify_group1",
308                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
309         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
310         ldif = """
311 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
312 changetype: modify
313 replace: displayName
314 displayName: test_changed"""
315         self.ldb_user.modify_ldif(ldif)
316         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
317         self.assertEqual(res[0]["displayName"][0], "test_changed")
318         # Third test object -- Organizational Unit
319         print "Testing modify on OU object"
320         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
321         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
322         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
323         ldif = """
324 dn: OU=test_modify_ou1,""" + self.base_dn + """
325 changetype: modify
326 replace: displayName
327 displayName: test_changed"""
328         self.ldb_user.modify_ldif(ldif)
329         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
330         self.assertEqual(res[0]["displayName"][0], "test_changed")
331
332     def test_modify_u2(self):
333         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
334         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
335         # First test object -- User
336         print "Testing modify on User object"
337         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
338         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
339         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
340         # Modify on attribute you have rights for
341         ldif = """
342 dn: """ + self.get_user_dn("test_modify_user1") + """
343 changetype: modify
344 replace: displayName
345 displayName: test_changed"""
346         self.ldb_user.modify_ldif(ldif)
347         res = self.ldb_admin.search(self.base_dn,
348                 expression="(distinguishedName=%s)" %
349                 self.get_user_dn("test_modify_user1"))
350         self.assertEqual(res[0]["displayName"][0], "test_changed")
351         # Modify on attribute you do not have rights for granted
352         ldif = """
353 dn: """ + self.get_user_dn("test_modify_user1") + """
354 changetype: modify
355 replace: url
356 url: www.samba.org"""
357         try:
358             self.ldb_user.modify_ldif(ldif)
359         except LdbError, (num, _):
360             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
361         else:
362             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
363             self.fail()
364         # Second test object -- Group
365         print "Testing modify on Group object"
366         self.ldb_admin.newgroup("test_modify_group1",
367                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
368         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
369         ldif = """
370 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
371 changetype: modify
372 replace: displayName
373 displayName: test_changed"""
374         self.ldb_user.modify_ldif(ldif)
375         res = self.ldb_admin.search(self.base_dn,
376                 expression="(distinguishedName=%s)" %
377                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
378         self.assertEqual(res[0]["displayName"][0], "test_changed")
379         # Modify on attribute you do not have rights for granted
380         ldif = """
381 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
382 changetype: modify
383 replace: url
384 url: www.samba.org"""
385         try:
386             self.ldb_user.modify_ldif(ldif)
387         except LdbError, (num, _):
388             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
389         else:
390             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
391             self.fail()
392         # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
393         ldif = """
394 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
395 changetype: modify
396 replace: url
397 url: www.samba.org
398 replace: displayName
399 displayName: test_changed"""
400         try:
401             self.ldb_user.modify_ldif(ldif)
402         except LdbError, (num, _):
403             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
404         else:
405             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
406             self.fail()
407         # Second test object -- Organizational Unit
408         print "Testing modify on OU object"
409         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
410         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
411         ldif = """
412 dn: OU=test_modify_ou1,""" + self.base_dn + """
413 changetype: modify
414 replace: displayName
415 displayName: test_changed"""
416         self.ldb_user.modify_ldif(ldif)
417         res = self.ldb_admin.search(self.base_dn,
418                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
419                     + self.base_dn))
420         self.assertEqual(res[0]["displayName"][0], "test_changed")
421         # Modify on attribute you do not have rights for granted
422         ldif = """
423 dn: OU=test_modify_ou1,""" + self.base_dn + """
424 changetype: modify
425 replace: url
426 url: www.samba.org"""
427         try:
428             self.ldb_user.modify_ldif(ldif)
429         except LdbError, (num, _):
430             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
431         else:
432             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
433             self.fail()
434
435     def test_modify_u3(self):
436         """7 Modify one attribute as you have no what so ever rights granted"""
437         # First test object -- User
438         print "Testing modify on User object"
439         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
440         # Modify on attribute you do not have rights for granted
441         ldif = """
442 dn: """ + self.get_user_dn("test_modify_user1") + """
443 changetype: modify
444 replace: url
445 url: www.samba.org"""
446         try:
447             self.ldb_user.modify_ldif(ldif)
448         except LdbError, (num, _):
449             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
450         else:
451             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
452             self.fail()
453
454         # Second test object -- Group
455         print "Testing modify on Group object"
456         self.ldb_admin.newgroup("test_modify_group1",
457                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
458         # Modify on attribute you do not have rights for granted
459         ldif = """
460 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
461 changetype: modify
462 replace: url
463 url: www.samba.org"""
464         try:
465             self.ldb_user.modify_ldif(ldif)
466         except LdbError, (num, _):
467             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
468         else:
469             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
470             self.fail()
471
472         # Second test object -- Organizational Unit
473         print "Testing modify on OU object"
474         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
475         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
476         # Modify on attribute you do not have rights for granted
477         ldif = """
478 dn: OU=test_modify_ou1,""" + self.base_dn + """
479 changetype: modify
480 replace: url
481 url: www.samba.org"""
482         try:
483             self.ldb_user.modify_ldif(ldif)
484         except LdbError, (num, _):
485             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
486         else:
487             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
488             self.fail()
489
490
491     def test_modify_u4(self):
492         """11 Grant WP to PRINCIPAL_SELF and test modify"""
493         ldif = """
494 dn: """ + self.get_user_dn(self.user_with_wp) + """
495 changetype: modify
496 add: adminDescription
497 adminDescription: blah blah blah"""
498         try:
499             self.ldb_user.modify_ldif(ldif)
500         except LdbError, (num, _):
501             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
502         else:
503             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
504             self.fail()
505
506         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
507         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
508         # Modify on attribute you have rights for
509         self.ldb_user.modify_ldif(ldif)
510         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
511                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
512         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
513
514     def test_modify_u5(self):
515         """12 test self membership"""
516         ldif = """
517 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
518 changetype: modify
519 add: Member
520 Member: """ +  self.get_user_dn(self.user_with_sm)
521 #the user has no rights granted, this should fail
522         try:
523             self.ldb_user2.modify_ldif(ldif)
524         except LdbError, (num, _):
525             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
526         else:
527             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
528             self.fail()
529
530 #grant self-membership, should be able to add himself
531         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
532         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
533         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
534         self.ldb_user2.modify_ldif(ldif)
535         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
536                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
537         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
538 #but not other users
539         ldif = """
540 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
541 changetype: modify
542 add: Member
543 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
544         try:
545             self.ldb_user2.modify_ldif(ldif)
546         except LdbError, (num, _):
547             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
548         else:
549             self.fail()
550
551     def test_modify_u6(self):
552         """13 test self membership"""
553         ldif = """
554 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
555 changetype: modify
556 add: Member
557 Member: """ +  self.get_user_dn(self.user_with_sm) + """
558 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
559
560 #grant self-membership, should be able to add himself  but not others at the same time
561         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
562         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
563         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
564         try:
565             self.ldb_user2.modify_ldif(ldif)
566         except LdbError, (num, _):
567             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
568         else:
569             self.fail()
570
571     def test_modify_u7(self):
572         """13 User with WP modifying Member"""
573 #a second user is given write property permission
574         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
575         mod = "(A;;WP;;;%s)" % str(user_sid)
576         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
577         ldif = """
578 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
579 changetype: modify
580 add: Member
581 Member: """ +  self.get_user_dn(self.user_with_wp)
582         self.ldb_user.modify_ldif(ldif)
583         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
584                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
585         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
586         ldif = """
587 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
588 changetype: modify
589 delete: Member"""
590         self.ldb_user.modify_ldif(ldif)
591         ldif = """
592 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
593 changetype: modify
594 add: Member
595 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
596         self.ldb_user.modify_ldif(ldif)
597         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
598                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
599         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
600
601     def test_modify_anonymous(self):
602         """Test add operation with anonymous user"""
603         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
604         self.ldb_admin.newuser("test_anonymous", "samba123@")
605         m = Message()
606         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
607
608         m["description"] = MessageElement("sambauser2",
609                                           FLAG_MOD_ADD,
610                                           "description")
611         try:
612             anonymous.modify(m)
613         except LdbError, (num, _):
614             self.assertEquals(num, ERR_OPERATIONS_ERROR)
615         else:
616             self.fail()
617
618 #enable these when we have search implemented
619 class AclSearchTests(AclTests):
620
621     def setUp(self):
622         super(AclSearchTests, self).setUp()
623         self.u1 = "search_u1"
624         self.u2 = "search_u2"
625         self.u3 = "search_u3"
626         self.group1 = "group1"
627         self.ldb_admin.newuser(self.u1, self.user_pass)
628         self.ldb_admin.newuser(self.u2, self.user_pass)
629         self.ldb_admin.newuser(self.u3, self.user_pass)
630         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
631         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
632                                                 add_members_operation=True)
633         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
634         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
635         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
636         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
637                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
638                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
639                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
640                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
641                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
642         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
643         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
644
645     def create_clean_ou(self, object_dn):
646         """ Base repeating setup for unittests to follow """
647         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
648                 expression="distinguishedName=%s" % object_dn)
649         # Make sure top testing OU has been deleted before starting the test
650         self.assertEqual(len(res), 0)
651         self.ldb_admin.create_ou(object_dn)
652         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
653         # Make sure there are inheritable ACEs initially
654         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
655         # Find and remove all inherit ACEs
656         res = re.findall("\(.*?\)", desc_sddl)
657         res = [x for x in res if ("CI" in x) or ("OI" in x)]
658         for x in res:
659             desc_sddl = desc_sddl.replace(x, "")
660         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
661         # can propagate from above
662         # remove SACL, we are not interested
663         desc_sddl = desc_sddl.replace(":AI", ":AIP")
664         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
665         # Verify all inheritable ACEs are gone
666         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
667         self.assertFalse("CI" in desc_sddl)
668         self.assertFalse("OI" in desc_sddl)
669
670     def tearDown(self):
671         super(AclSearchTests, self).tearDown()
672         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
673         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
674         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
675         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
676         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
677         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
678         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
679         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
680         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
681         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
682         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
683         delete_force(self.ldb_admin, self.get_user_dn("group1"))
684
685     def test_search_anonymous1(self):
686         """Verify access of rootDSE with the correct request"""
687         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
688         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
689         self.assertEquals(len(res), 1)
690         #verify some of the attributes
691         #dont care about values
692         self.assertTrue("ldapServiceName" in res[0])
693         self.assertTrue("namingContexts" in res[0])
694         self.assertTrue("isSynchronized" in res[0])
695         self.assertTrue("dsServiceName" in res[0])
696         self.assertTrue("supportedSASLMechanisms" in res[0])
697         self.assertTrue("isGlobalCatalogReady" in res[0])
698         self.assertTrue("domainControllerFunctionality" in res[0])
699         self.assertTrue("serverName" in res[0])
700
701     def test_search_anonymous2(self):
702         """Make sure we cannot access anything else"""
703         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
704         try:
705             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
706         except LdbError, (num, _):
707             self.assertEquals(num, ERR_OPERATIONS_ERROR)
708         else:
709             self.fail()
710         try:
711             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
712         except LdbError, (num, _):
713             self.assertEquals(num, ERR_OPERATIONS_ERROR)
714         else:
715             self.fail()
716         try:
717             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
718                                         scope=SCOPE_SUBTREE)
719         except LdbError, (num, _):
720             self.assertEquals(num, ERR_OPERATIONS_ERROR)
721         else:
722             self.fail()
723
724     def test_search_anonymous3(self):
725         """Set dsHeuristics and repeat"""
726         self.ldb_admin.set_dsheuristics("0000002")
727         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
728         mod = "(A;CI;LC;;;AN)"
729         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
730         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
731         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
732         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
733                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
734         self.assertEquals(len(res), 1)
735         self.assertTrue("dn" in res[0])
736         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
737                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
738         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
739                                scope=SCOPE_SUBTREE)
740         self.assertEquals(len(res), 1)
741         self.assertTrue("dn" in res[0])
742         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
743
744     def test_search1(self):
745         """Make sure users can see us if given LC to user and group"""
746         self.create_clean_ou("OU=ou1," + self.base_dn)
747         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
748         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
749         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
750                                                  self.domain_sid)
751         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
752         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
753         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
754         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
755         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
756
757         #regular users must see only ou1 and ou2
758         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
759                                     scope=SCOPE_SUBTREE)
760         self.assertEquals(len(res), 2)
761         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
762                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
763
764         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
765         self.assertEquals(sorted(res_list), sorted(ok_list))
766
767         #these users should see all ous
768         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
769                                     scope=SCOPE_SUBTREE)
770         self.assertEquals(len(res), 6)
771         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
772         self.assertEquals(sorted(res_list), sorted(self.full_list))
773
774         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
775                                     scope=SCOPE_SUBTREE)
776         self.assertEquals(len(res), 6)
777         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
778         self.assertEquals(sorted(res_list), sorted(self.full_list))
779
780     def test_search2(self):
781         """Make sure users can't see us if access is explicitly denied"""
782         self.create_clean_ou("OU=ou1," + self.base_dn)
783         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
784         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
785         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
786         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
787         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
788         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
789         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
790         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
791                                     scope=SCOPE_SUBTREE)
792         #this user should see all ous
793         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
794         self.assertEquals(sorted(res_list), sorted(self.full_list))
795
796         #these users should see ou1, 2, 5 and 6 but not 3 and 4
797         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
798                                     scope=SCOPE_SUBTREE)
799         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
800                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
801                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
802                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
803         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
804         self.assertEquals(sorted(res_list), sorted(ok_list))
805
806         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
807                                     scope=SCOPE_SUBTREE)
808         self.assertEquals(len(res), 4)
809         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
810         self.assertEquals(sorted(res_list), sorted(ok_list))
811
812     def test_search3(self):
813         """Make sure users can't see ous if access is explicitly denied - 2"""
814         self.create_clean_ou("OU=ou1," + self.base_dn)
815         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
816         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
817         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
818                                                  self.domain_sid)
819         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
820         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
821         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
822         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
823         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
824
825         print "Testing correct behavior on nonaccessible search base"
826         try:
827              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
828                                    scope=SCOPE_BASE)
829         except LdbError, (num, _):
830             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
831         else:
832             self.fail()
833
834         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
835         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
836
837         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
838                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
839
840         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
841                                     scope=SCOPE_SUBTREE)
842         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
843         self.assertEquals(sorted(res_list), sorted(ok_list))
844
845         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
846                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
847                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
848                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
849
850         #should not see ou3 and ou4, but should see ou5 and ou6
851         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
852                                     scope=SCOPE_SUBTREE)
853         self.assertEquals(len(res), 4)
854         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
855         self.assertEquals(sorted(res_list), sorted(ok_list))
856
857         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
858                                     scope=SCOPE_SUBTREE)
859         self.assertEquals(len(res), 4)
860         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
861         self.assertEquals(sorted(res_list), sorted(ok_list))
862
863     def test_search4(self):
864         """There is no difference in visibility if the user is also creator"""
865         self.create_clean_ou("OU=ou1," + self.base_dn)
866         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
867         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
868         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
869                                                  self.domain_sid)
870         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
871         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
872         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
873         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
874         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
875
876         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
877                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
878         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
879                                     scope=SCOPE_SUBTREE)
880         self.assertEquals(len(res), 2)
881         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
882         self.assertEquals(sorted(res_list), sorted(ok_list))
883
884         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
885                                     scope=SCOPE_SUBTREE)
886         self.assertEquals(len(res), 2)
887         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
888         self.assertEquals(sorted(res_list), sorted(ok_list))
889
890     def test_search5(self):
891         """Make sure users can see only attributes they are allowed to see"""
892         self.create_clean_ou("OU=ou1," + self.base_dn)
893         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
894         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
895         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
896                                                  self.domain_sid)
897         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
898         # assert user can only see dn
899         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
900                                     scope=SCOPE_SUBTREE)
901         ok_list = ['dn']
902         self.assertEquals(len(res), 1)
903         res_list = res[0].keys()
904         self.assertEquals(res_list, ok_list)
905
906         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
907                                     scope=SCOPE_BASE, attrs=["ou"])
908
909         self.assertEquals(len(res), 1)
910         res_list = res[0].keys()
911         self.assertEquals(res_list, ok_list)
912
913         #give read property on ou and assert user can only see dn and ou
914         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
915         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
916         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
917         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
918                                     scope=SCOPE_SUBTREE)
919         ok_list = ['dn', 'ou']
920         self.assertEquals(len(res), 1)
921         res_list = res[0].keys()
922         self.assertEquals(sorted(res_list), sorted(ok_list))
923
924         #give read property on Public Information and assert user can see ou and other members
925         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
926         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
927         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
928         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
929                                     scope=SCOPE_SUBTREE)
930
931         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
932         res_list = res[0].keys()
933         self.assertEquals(sorted(res_list), sorted(ok_list))
934
935     def test_search6(self):
936         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
937         self.create_clean_ou("OU=ou1," + self.base_dn)
938         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
939         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
940         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
941                                                  self.domain_sid)
942         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
943         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
944
945         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
946                                     scope=SCOPE_SUBTREE)
947         #nothing should be returned as ou is not accessible
948         self.assertEquals(len(res), 0)
949
950         #give read property on ou and assert user can only see dn and ou
951         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
952         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
953         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
954                                     scope=SCOPE_SUBTREE)
955         self.assertEquals(len(res), 1)
956         ok_list = ['dn', 'ou']
957         res_list = res[0].keys()
958         self.assertEquals(sorted(res_list), sorted(ok_list))
959
960         #give read property on Public Information and assert user can see ou and other members
961         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
962         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
963         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
964                                    scope=SCOPE_SUBTREE)
965         self.assertEquals(len(res), 1)
966         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
967         res_list = res[0].keys()
968         self.assertEquals(sorted(res_list), sorted(ok_list))
969
970 #tests on ldap delete operations
971 class AclDeleteTests(AclTests):
972
973     def setUp(self):
974         super(AclDeleteTests, self).setUp()
975         self.regular_user = "acl_delete_user1"
976         # Create regular user
977         self.ldb_admin.newuser(self.regular_user, self.user_pass)
978         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
979
980     def tearDown(self):
981         super(AclDeleteTests, self).tearDown()
982         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
983         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
984         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
985
986     def test_delete_u1(self):
987         """User is prohibited by default to delete another User object"""
988         # Create user that we try to delete
989         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
990         # Here delete User object should ALWAYS through exception
991         try:
992             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
993         except LdbError, (num, _):
994             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
995         else:
996             self.fail()
997
998     def test_delete_u2(self):
999         """User's group has RIGHT_DELETE to another User object"""
1000         user_dn = self.get_user_dn("test_delete_user1")
1001         # Create user that we try to delete
1002         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1003         mod = "(A;;SD;;;AU)"
1004         self.sd_utils.dacl_add_ace(user_dn, mod)
1005         # Try to delete User object
1006         self.ldb_user.delete(user_dn)
1007         res = self.ldb_admin.search(self.base_dn,
1008                 expression="(distinguishedName=%s)" % user_dn)
1009         self.assertEqual(len(res), 0)
1010
1011     def test_delete_u3(self):
1012         """User indentified by SID has RIGHT_DELETE to another User object"""
1013         user_dn = self.get_user_dn("test_delete_user1")
1014         # Create user that we try to delete
1015         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1016         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1017         self.sd_utils.dacl_add_ace(user_dn, mod)
1018         # Try to delete User object
1019         self.ldb_user.delete(user_dn)
1020         res = self.ldb_admin.search(self.base_dn,
1021                 expression="(distinguishedName=%s)" % user_dn)
1022         self.assertEqual(len(res), 0)
1023
1024     def test_delete_anonymous(self):
1025         """Test add operation with anonymous user"""
1026         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1027         self.ldb_admin.newuser("test_anonymous", "samba123@")
1028
1029         try:
1030             anonymous.delete(self.get_user_dn("test_anonymous"))
1031         except LdbError, (num, _):
1032             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1033         else:
1034             self.fail()
1035
1036 #tests on ldap rename operations
1037 class AclRenameTests(AclTests):
1038
1039     def setUp(self):
1040         super(AclRenameTests, self).setUp()
1041         self.regular_user = "acl_rename_user1"
1042         self.ou1 = "OU=test_rename_ou1"
1043         self.ou2 = "OU=test_rename_ou2"
1044         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1045         self.testuser1 = "test_rename_user1"
1046         self.testuser2 = "test_rename_user2"
1047         self.testuser3 = "test_rename_user3"
1048         self.testuser4 = "test_rename_user4"
1049         self.testuser5 = "test_rename_user5"
1050         # Create regular user
1051         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1052         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1053
1054     def tearDown(self):
1055         super(AclRenameTests, self).tearDown()
1056         # Rename OU3
1057         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1058         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1059         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1060         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1061         # Rename OU2
1062         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1063         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1064         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1065         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1066         # Rename OU1
1067         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1068         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1069         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1070         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1071         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1072         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1073
1074     def test_rename_u1(self):
1075         """Regular user fails to rename 'User object' within single OU"""
1076         # Create OU structure
1077         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1078         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1079         try:
1080             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1081                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1082         except LdbError, (num, _):
1083             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1084         else:
1085             self.fail()
1086
1087     def test_rename_u2(self):
1088         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1089         ou_dn = "OU=test_rename_ou1," + self.base_dn
1090         user_dn = "CN=test_rename_user1," + ou_dn
1091         rename_user_dn = "CN=test_rename_user5," + ou_dn
1092         # Create OU structure
1093         self.ldb_admin.create_ou(ou_dn)
1094         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1095         mod = "(A;;WP;;;AU)"
1096         self.sd_utils.dacl_add_ace(user_dn, mod)
1097         # Rename 'User object' having WP to AU
1098         self.ldb_user.rename(user_dn, rename_user_dn)
1099         res = self.ldb_admin.search(self.base_dn,
1100                 expression="(distinguishedName=%s)" % user_dn)
1101         self.assertEqual(len(res), 0)
1102         res = self.ldb_admin.search(self.base_dn,
1103                 expression="(distinguishedName=%s)" % rename_user_dn)
1104         self.assertNotEqual(len(res), 0)
1105
1106     def test_rename_u3(self):
1107         """Test rename with rights granted to 'User object' SID"""
1108         ou_dn = "OU=test_rename_ou1," + self.base_dn
1109         user_dn = "CN=test_rename_user1," + ou_dn
1110         rename_user_dn = "CN=test_rename_user5," + ou_dn
1111         # Create OU structure
1112         self.ldb_admin.create_ou(ou_dn)
1113         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1114         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1115         mod = "(A;;WP;;;%s)" % str(sid)
1116         self.sd_utils.dacl_add_ace(user_dn, mod)
1117         # Rename 'User object' having WP to AU
1118         self.ldb_user.rename(user_dn, rename_user_dn)
1119         res = self.ldb_admin.search(self.base_dn,
1120                 expression="(distinguishedName=%s)" % user_dn)
1121         self.assertEqual(len(res), 0)
1122         res = self.ldb_admin.search(self.base_dn,
1123                 expression="(distinguishedName=%s)" % rename_user_dn)
1124         self.assertNotEqual(len(res), 0)
1125
1126     def test_rename_u4(self):
1127         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1128         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1129         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1130         user_dn = "CN=test_rename_user2," + ou1_dn
1131         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1132         # Create OU structure
1133         self.ldb_admin.create_ou(ou1_dn)
1134         self.ldb_admin.create_ou(ou2_dn)
1135         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1136         mod = "(A;;WPSD;;;AU)"
1137         self.sd_utils.dacl_add_ace(user_dn, mod)
1138         mod = "(A;;CC;;;AU)"
1139         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1140         # Rename 'User object' having SD and CC to AU
1141         self.ldb_user.rename(user_dn, rename_user_dn)
1142         res = self.ldb_admin.search(self.base_dn,
1143                 expression="(distinguishedName=%s)" % user_dn)
1144         self.assertEqual(len(res), 0)
1145         res = self.ldb_admin.search(self.base_dn,
1146                 expression="(distinguishedName=%s)" % rename_user_dn)
1147         self.assertNotEqual(len(res), 0)
1148
1149     def test_rename_u5(self):
1150         """Test rename with rights granted to 'User object' SID"""
1151         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1152         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1153         user_dn = "CN=test_rename_user2," + ou1_dn
1154         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1155         # Create OU structure
1156         self.ldb_admin.create_ou(ou1_dn)
1157         self.ldb_admin.create_ou(ou2_dn)
1158         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1159         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1160         mod = "(A;;WPSD;;;%s)" % str(sid)
1161         self.sd_utils.dacl_add_ace(user_dn, mod)
1162         mod = "(A;;CC;;;%s)" % str(sid)
1163         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1164         # Rename 'User object' having SD and CC to AU
1165         self.ldb_user.rename(user_dn, rename_user_dn)
1166         res = self.ldb_admin.search(self.base_dn,
1167                 expression="(distinguishedName=%s)" % user_dn)
1168         self.assertEqual(len(res), 0)
1169         res = self.ldb_admin.search(self.base_dn,
1170                 expression="(distinguishedName=%s)" % rename_user_dn)
1171         self.assertNotEqual(len(res), 0)
1172
1173     def test_rename_u6(self):
1174         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1175         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1176         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1177         user_dn = "CN=test_rename_user2," + ou1_dn
1178         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1179         # Create OU structure
1180         self.ldb_admin.create_ou(ou1_dn)
1181         self.ldb_admin.create_ou(ou2_dn)
1182         #mod = "(A;CI;DCWP;;;AU)"
1183         mod = "(A;;DC;;;AU)"
1184         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1185         mod = "(A;;CC;;;AU)"
1186         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1187         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1188         mod = "(A;;WP;;;AU)"
1189         self.sd_utils.dacl_add_ace(user_dn, mod)
1190         # Rename 'User object' having SD and CC to AU
1191         self.ldb_user.rename(user_dn, rename_user_dn)
1192         res = self.ldb_admin.search(self.base_dn,
1193                 expression="(distinguishedName=%s)" % user_dn)
1194         self.assertEqual(len(res), 0)
1195         res = self.ldb_admin.search(self.base_dn,
1196                 expression="(distinguishedName=%s)" % rename_user_dn)
1197         self.assertNotEqual(len(res), 0)
1198
1199     def test_rename_u7(self):
1200         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1201         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1202         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1203         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1204         user_dn = "CN=test_rename_user2," + ou1_dn
1205         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1206         # Create OU structure
1207         self.ldb_admin.create_ou(ou1_dn)
1208         self.ldb_admin.create_ou(ou2_dn)
1209         self.ldb_admin.create_ou(ou3_dn)
1210         mod = "(A;CI;WPDC;;;AU)"
1211         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1212         mod = "(A;;CC;;;AU)"
1213         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1214         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1215         # Rename 'User object' having SD and CC to AU
1216         self.ldb_user.rename(user_dn, rename_user_dn)
1217         res = self.ldb_admin.search(self.base_dn,
1218                 expression="(distinguishedName=%s)" % user_dn)
1219         self.assertEqual(len(res), 0)
1220         res = self.ldb_admin.search(self.base_dn,
1221                 expression="(distinguishedName=%s)" % rename_user_dn)
1222         self.assertNotEqual(len(res), 0)
1223
1224     def test_rename_u8(self):
1225         """Test rename on an object with and without modify access on the RDN attribute"""
1226         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1227         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1228         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1229         # Create OU structure
1230         self.ldb_admin.create_ou(ou1_dn)
1231         self.ldb_admin.create_ou(ou2_dn)
1232         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1233         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1234         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1235         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1236         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1237         try:
1238             self.ldb_user.rename(ou2_dn, ou3_dn)
1239         except LdbError, (num, _):
1240             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1241         else:
1242             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1243             self.fail()
1244         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1245         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1246         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1247         self.ldb_user.rename(ou2_dn, ou3_dn)
1248         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1249         self.assertEqual(len(res), 0)
1250         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1251         self.assertNotEqual(len(res), 0)
1252
1253 #tests on Control Access Rights
1254 class AclCARTests(AclTests):
1255
1256     def setUp(self):
1257         super(AclCARTests, self).setUp()
1258         self.user_with_wp = "acl_car_user1"
1259         self.user_with_pc = "acl_car_user2"
1260         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1261         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1262         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1263         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1264
1265     def tearDown(self):
1266         super(AclCARTests, self).tearDown()
1267         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1268         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1269
1270     def test_change_password1(self):
1271         """Try a password change operation without any CARs given"""
1272         #users have change password by default - remove for negative testing
1273         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1274         sddl = desc.as_sddl(self.domain_sid)
1275         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1276         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1277         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1278         try:
1279             self.ldb_user.modify_ldif("""
1280 dn: """ + self.get_user_dn(self.user_with_wp) + """
1281 changetype: modify
1282 delete: unicodePwd
1283 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1284 add: unicodePwd
1285 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1286 """)
1287         except LdbError, (num, _):
1288             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1289         else:
1290             # for some reason we get constraint violation instead of insufficient access error
1291             self.fail()
1292
1293     def test_change_password2(self):
1294         """Make sure WP has no influence"""
1295         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1296         sddl = desc.as_sddl(self.domain_sid)
1297         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1298         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1299         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1300         mod = "(A;;WP;;;PS)"
1301         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1302         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1303         sddl = desc.as_sddl(self.domain_sid)
1304         try:
1305             self.ldb_user.modify_ldif("""
1306 dn: """ + self.get_user_dn(self.user_with_wp) + """
1307 changetype: modify
1308 delete: unicodePwd
1309 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1310 add: unicodePwd
1311 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1312 """)
1313         except LdbError, (num, _):
1314             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1315         else:
1316             # for some reason we get constraint violation instead of insufficient access error
1317             self.fail()
1318
1319     def test_change_password3(self):
1320         """Make sure WP has no influence"""
1321         mod = "(D;;WP;;;PS)"
1322         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1323         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1324         sddl = desc.as_sddl(self.domain_sid)
1325         self.ldb_user.modify_ldif("""
1326 dn: """ + self.get_user_dn(self.user_with_wp) + """
1327 changetype: modify
1328 delete: unicodePwd
1329 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1330 add: unicodePwd
1331 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1332 """)
1333
1334     def test_change_password5(self):
1335         """Make sure rights have no influence on dBCSPwd"""
1336         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1337         sddl = desc.as_sddl(self.domain_sid)
1338         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1339         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1340         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1341         mod = "(D;;WP;;;PS)"
1342         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1343         try:
1344             self.ldb_user.modify_ldif("""
1345 dn: """ + self.get_user_dn(self.user_with_wp) + """
1346 changetype: modify
1347 delete: dBCSPwd
1348 dBCSPwd: XXXXXXXXXXXXXXXX
1349 add: dBCSPwd
1350 dBCSPwd: YYYYYYYYYYYYYYYY
1351 """)
1352         except LdbError, (num, _):
1353             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1354         else:
1355             self.fail()
1356
1357     def test_change_password6(self):
1358         """Test uneven delete/adds"""
1359         try:
1360             self.ldb_user.modify_ldif("""
1361 dn: """ + self.get_user_dn(self.user_with_wp) + """
1362 changetype: modify
1363 delete: userPassword
1364 userPassword: thatsAcomplPASS1
1365 delete: userPassword
1366 userPassword: thatsAcomplPASS1
1367 add: userPassword
1368 userPassword: thatsAcomplPASS2
1369 """)
1370         except LdbError, (num, _):
1371             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1372         else:
1373             self.fail()
1374         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1375         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1376         try:
1377             self.ldb_user.modify_ldif("""
1378 dn: """ + self.get_user_dn(self.user_with_wp) + """
1379 changetype: modify
1380 delete: userPassword
1381 userPassword: thatsAcomplPASS1
1382 delete: userPassword
1383 userPassword: thatsAcomplPASS1
1384 add: userPassword
1385 userPassword: thatsAcomplPASS2
1386 """)
1387             # This fails on Windows 2000 domain level with constraint violation
1388         except LdbError, (num, _):
1389             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1390                             num == ERR_UNWILLING_TO_PERFORM)
1391         else:
1392             self.fail()
1393
1394
1395     def test_change_password7(self):
1396         """Try a password change operation without any CARs given"""
1397         #users have change password by default - remove for negative testing
1398         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1399         sddl = desc.as_sddl(self.domain_sid)
1400         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1401         #first change our own password
1402         self.ldb_user2.modify_ldif("""
1403 dn: """ + self.get_user_dn(self.user_with_pc) + """
1404 changetype: modify
1405 delete: unicodePwd
1406 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1407 add: unicodePwd
1408 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1409 """)
1410         #then someone else's
1411         self.ldb_user2.modify_ldif("""
1412 dn: """ + self.get_user_dn(self.user_with_wp) + """
1413 changetype: modify
1414 delete: unicodePwd
1415 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1416 add: unicodePwd
1417 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1418 """)
1419
1420     def test_reset_password1(self):
1421         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1422         try:
1423             self.ldb_user.modify_ldif("""
1424 dn: """ + self.get_user_dn(self.user_with_wp) + """
1425 changetype: modify
1426 replace: unicodePwd
1427 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1428 """)
1429         except LdbError, (num, _):
1430             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1431         else:
1432             self.fail()
1433         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1434         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1435         self.ldb_user.modify_ldif("""
1436 dn: """ + self.get_user_dn(self.user_with_wp) + """
1437 changetype: modify
1438 replace: unicodePwd
1439 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1440 """)
1441
1442     def test_reset_password2(self):
1443         """Try a user password reset operation (userPassword) before and after granting CAR"""
1444         try:
1445             self.ldb_user.modify_ldif("""
1446 dn: """ + self.get_user_dn(self.user_with_wp) + """
1447 changetype: modify
1448 replace: userPassword
1449 userPassword: thatsAcomplPASS1
1450 """)
1451         except LdbError, (num, _):
1452             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1453         else:
1454             self.fail()
1455         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1456         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1457         try:
1458             self.ldb_user.modify_ldif("""
1459 dn: """ + self.get_user_dn(self.user_with_wp) + """
1460 changetype: modify
1461 replace: userPassword
1462 userPassword: thatsAcomplPASS1
1463 """)
1464             # This fails on Windows 2000 domain level with constraint violation
1465         except LdbError, (num, _):
1466             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1467
1468     def test_reset_password3(self):
1469         """Grant WP and see what happens (unicodePwd)"""
1470         mod = "(A;;WP;;;PS)"
1471         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1472         try:
1473             self.ldb_user.modify_ldif("""
1474 dn: """ + self.get_user_dn(self.user_with_wp) + """
1475 changetype: modify
1476 replace: unicodePwd
1477 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1478 """)
1479         except LdbError, (num, _):
1480             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1481         else:
1482             self.fail()
1483
1484     def test_reset_password4(self):
1485         """Grant WP and see what happens (userPassword)"""
1486         mod = "(A;;WP;;;PS)"
1487         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1488         try:
1489             self.ldb_user.modify_ldif("""
1490 dn: """ + self.get_user_dn(self.user_with_wp) + """
1491 changetype: modify
1492 replace: userPassword
1493 userPassword: thatsAcomplPASS1
1494 """)
1495         except LdbError, (num, _):
1496             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1497         else:
1498             self.fail()
1499
1500     def test_reset_password5(self):
1501         """Explicitly deny WP but grant CAR (unicodePwd)"""
1502         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1503         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1504         self.ldb_user.modify_ldif("""
1505 dn: """ + self.get_user_dn(self.user_with_wp) + """
1506 changetype: modify
1507 replace: unicodePwd
1508 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1509 """)
1510
1511     def test_reset_password6(self):
1512         """Explicitly deny WP but grant CAR (userPassword)"""
1513         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1514         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1515         try:
1516             self.ldb_user.modify_ldif("""
1517 dn: """ + self.get_user_dn(self.user_with_wp) + """
1518 changetype: modify
1519 replace: userPassword
1520 userPassword: thatsAcomplPASS1
1521 """)
1522             # This fails on Windows 2000 domain level with constraint violation
1523         except LdbError, (num, _):
1524             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1525
1526 class AclExtendedTests(AclTests):
1527
1528     def setUp(self):
1529         super(AclExtendedTests, self).setUp()
1530         #regular user, will be the creator
1531         self.u1 = "ext_u1"
1532         #regular user
1533         self.u2 = "ext_u2"
1534         #admin user
1535         self.u3 = "ext_u3"
1536         self.ldb_admin.newuser(self.u1, self.user_pass)
1537         self.ldb_admin.newuser(self.u2, self.user_pass)
1538         self.ldb_admin.newuser(self.u3, self.user_pass)
1539         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
1540                                                 add_members_operation=True)
1541         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1542         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1543         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1544         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1545         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1546
1547     def tearDown(self):
1548         super(AclExtendedTests, self).tearDown()
1549         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1550         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1551         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1552         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1553         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1554
1555     def test_ntSecurityDescriptor(self):
1556         #create empty ou
1557         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1558         #give u1 Create children access
1559         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1560         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1561         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1562         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1563         #create a group under that, grant RP to u2
1564         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1565                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1566         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1567         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1568         #u2 must not read the descriptor
1569         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1570                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1571         self.assertNotEqual(len(res), 0)
1572         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1573         #grant RC to u2 - still no access
1574         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1575         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1576         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1577                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1578         self.assertNotEqual(len(res), 0)
1579         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1580         #u3 is member of administrators group, should be able to read sd
1581         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1582                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1583         self.assertEqual(len(res),1)
1584         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1585
1586
1587 class AclSPNTests(AclTests):
1588
1589     def setUp(self):
1590         super(AclSPNTests, self).setUp()
1591         self.dcname = "TESTSRV8"
1592         self.rodcname = "TESTRODC8"
1593         self.computername = "testcomp8"
1594         self.test_user = "spn_test_user8"
1595         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1596         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1597         self.site = "Default-First-Site-Name"
1598         self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
1599             site=self.site, netbios_name=self.rodcname, targetdir=None,
1600             domain=None)
1601         self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
1602                 netbios_name=self.dcname, targetdir=None, domain=None)
1603         self.ldb_admin.newuser(self.test_user, self.user_pass)
1604         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1605         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1606         self.create_computer(self.computername, self.dcctx.dnsdomain)
1607         self.create_rodc(self.rodcctx)
1608         self.create_dc(self.dcctx)
1609
1610     def tearDown(self):
1611         super(AclSPNTests, self).tearDown()
1612         self.rodcctx.cleanup_old_join()
1613         self.dcctx.cleanup_old_join()
1614         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1615         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1616
1617     def replace_spn(self, _ldb, dn, spn):
1618         print "Setting spn %s on %s" % (spn, dn)
1619         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1620                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1621         if "servicePrincipalName" in res[0].keys():
1622             flag = FLAG_MOD_REPLACE
1623         else:
1624             flag = FLAG_MOD_ADD
1625
1626         msg = Message()
1627         msg.dn = Dn(self.ldb_admin, dn)
1628         msg["servicePrincipalName"] = MessageElement(spn, flag,
1629                                                          "servicePrincipalName")
1630         _ldb.modify(msg)
1631
1632     def create_computer(self, computername, domainname):
1633         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1634         samaccountname = computername + "$"
1635         dnshostname = "%s.%s" % (computername, domainname)
1636         self.ldb_admin.add({
1637             "dn": dn,
1638             "objectclass": "computer",
1639             "sAMAccountName": samaccountname,
1640             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1641             "dNSHostName": dnshostname})
1642
1643     # same as for join_RODC, but do not set any SPNs
1644     def create_rodc(self, ctx):
1645          ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1646          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1647
1648          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1649                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1650                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1651                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1652                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1653          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1654
1655          mysid = ctx.get_mysid()
1656          admin_dn = "<SID=%s>" % mysid
1657          ctx.managedby = admin_dn
1658
1659          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1660                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1661                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1662
1663          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1664          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1665          ctx.RODC = True
1666          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1667                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1668                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1669                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1670                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1671
1672          ctx.join_add_objects()
1673
1674     def create_dc(self, ctx):
1675         ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1676         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1677         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1678         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1679                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1680                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1681                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1682                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1683
1684         ctx.join_add_objects()
1685
1686     def dc_spn_test(self, ctx):
1687         netbiosdomain = self.dcctx.get_domain_name()
1688         try:
1689             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1690         except LdbError, (num, _):
1691             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1692
1693         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1694         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1695         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1696         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1697         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1698                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1699         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1700         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1701                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1702         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1703                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
1704         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1705         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1706                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1707         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1708         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1709         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1710                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1711         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1712         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1713                          (ctx.myname, ctx.dnsdomain))
1714         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1715                          (ctx.myname))
1716         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1717                          (ctx.myname, ctx.dnsdomain))
1718         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1719                          (ctx.myname, ctx.dnsdomain))
1720         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1721                          (ctx.ntds_guid, ctx.dnsdomain))
1722
1723         #the following spns do not match the restrictions and should fail
1724         try:
1725             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1726                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1727         except LdbError, (num, _):
1728             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1729         try:
1730             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1731                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1732         except LdbError, (num, _):
1733             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1734         try:
1735             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1736         except LdbError, (num, _):
1737             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1738         try:
1739             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1740                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1741         except LdbError, (num, _):
1742             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1743         try:
1744             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1745                              (ctx.ntds_guid, ctx.dnsdomain))
1746         except LdbError, (num, _):
1747             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1748
1749     def test_computer_spn(self):
1750         # with WP, any value can be set
1751         netbiosdomain = self.dcctx.get_domain_name()
1752         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1753                          (self.computername, netbiosdomain))
1754         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1755         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1756                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1757         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1758                          (self.computername, self.dcctx.dnsdomain))
1759         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1760                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1761         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1762                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1763         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1764         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1765                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1766         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1767                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1768         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1769                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1770         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1771         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1772                          (self.computername, self.dcctx.dnsdomain))
1773         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1774                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1775         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1776                          (self.computername, self.dcctx.dnsdomain))
1777         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1778                          (self.computername, self.dcctx.dnsdomain))
1779         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1780                          (self.computername))
1781         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1782                          (self.computername, self.dcctx.dnsdomain))
1783         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1784                          (self.computername, self.dcctx.dnsdomain))
1785         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1786
1787         #user has neither WP nor Validated-SPN, access denied expected
1788         try:
1789             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1790         except LdbError, (num, _):
1791             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1792
1793         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1794         self.sd_utils.dacl_add_ace(self.computerdn, mod)
1795         #grant Validated-SPN and check which values are accepted
1796         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1797
1798         # for regular computer objects we shouldalways get constraint violation
1799
1800         # This does not pass against Windows, although it should according to docs
1801         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
1802         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
1803                              (self.computername, self.dcctx.dnsdomain))
1804
1805         try:
1806             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1807         except LdbError, (num, _):
1808             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1809         try:
1810             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1811                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1812         except LdbError, (num, _):
1813             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1814         try:
1815             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
1816                              (self.computername, self.dcctx.dnsdomain))
1817         except LdbError, (num, _):
1818             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1819         try:
1820             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1821                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1822         except LdbError, (num, _):
1823             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1824         try:
1825             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
1826                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1827         except LdbError, (num, _):
1828             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1829         try:
1830             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1831         except LdbError, (num, _):
1832             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1833         try:
1834             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1835                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1836         except LdbError, (num, _):
1837             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1838
1839     def test_spn_rwdc(self):
1840         self.dc_spn_test(self.dcctx)
1841
1842     def test_spn_rodc(self):
1843         self.dc_spn_test(self.rodcctx)
1844
1845
1846 # Important unit running information
1847
1848 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
1849
1850 runner = SubunitTestRunner()
1851 rc = 0
1852 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1853     rc = 1
1854 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1855     rc = 1
1856 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1857     rc = 1
1858 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1859     rc = 1
1860
1861 # Get the old "dSHeuristics" if it was set
1862 dsheuristics = ldb.get_dsheuristics()
1863 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1864 ldb.set_dsheuristics("000000001")
1865 # Get the old "minPwdAge"
1866 minPwdAge = ldb.get_minPwdAge()
1867 # Set it temporarely to "0"
1868 ldb.set_minPwdAge("0")
1869 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1870     rc = 1
1871 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1872     rc = 1
1873 # Reset the "dSHeuristics" as they were before
1874 ldb.set_dsheuristics(dsheuristics)
1875 # Reset the "minPwdAge" as it was before
1876 ldb.set_minPwdAge(minPwdAge)
1877
1878 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1879     rc = 1
1880 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
1881     rc = 1
1882 sys.exit(rc)