from samba.tests.subunitrun import SubunitOptions, TestProgram
import samba.getopt as options
-from samba.join import dc_join
+from samba.join import DCJoinContext
from ldb import (
SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
else:
ldaphost = host
start = host.rindex("://")
- host = host.lstrip(start+3)
+ host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
# add admins to the Domain Admins group
self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
- add_members_operation=True)
+ add_members_operation=True)
self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
- add_members_operation=True)
+ add_members_operation=True)
self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
def tearDown(self):
super(AclAddTests, self).tearDown()
delete_force(self.ldb_admin, "CN=%s,%s,%s" %
- (self.test_user1, self.ou2, self.base_dn))
+ (self.test_user1, self.ou2, self.base_dn))
delete_force(self.ldb_admin, "CN=%s,%s,%s" %
- (self.test_group1, self.ou2, self.base_dn))
+ (self.test_group1, self.ou2, self.base_dn))
delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
# Make sure top OU is deleted (and so everything under it)
def assert_top_ou_deleted(self):
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s,%s)" % (
+ expression="(distinguishedName=%s,%s)" % (
"OU=test_add_ou1", self.base_dn))
self.assertEqual(len(res), 0)
self.fail()
# Make sure we HAVE created the one of two objects -- user
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s,%s)" %
- ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
- self.base_dn))
+ expression="(distinguishedName=%s,%s)" %
+ ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
+ self.base_dn))
self.assertNotEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s,%s)" %
- ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
- self.base_dn) )
+ expression="(distinguishedName=%s,%s)" %
+ ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
+ self.base_dn))
self.assertEqual(len(res), 0)
def test_add_u4(self):
self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
- grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
+ grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
# Make sure we have successfully created the two objects -- user and group
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
self.assertTrue(len(res) > 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
+ expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
self.assertTrue(len(res) > 0)
def test_add_anonymous(self):
self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
- self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
+ self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
self.ldb_admin.newuser("test_modify_user2", self.user_pass)
displayName: test_changed"""
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
+ expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Second test object -- Group
print("Testing modify on Group object")
displayName: test_changed"""
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" %
- self.get_user_dn("test_modify_user1"))
+ expression="(distinguishedName=%s)" %
+ self.get_user_dn("test_modify_user1"))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Modify on attribute you do not have rights for granted
ldif = """
displayName: test_changed"""
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" %
- str("CN=test_modify_group1,CN=Users," + self.base_dn))
+ expression="(distinguishedName=%s)" %
+ str("CN=test_modify_group1,CN=Users," + self.base_dn))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Modify on attribute you do not have rights for granted
ldif = """
displayName: test_changed"""
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
- + self.base_dn))
+ expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
+ + self.base_dn))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Modify on attribute you do not have rights for granted
ldif = """
# Modify on attribute you have rights for
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
- % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
+ % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"])
self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
def test_modify_u5(self):
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_sm)
+Member: """ + self.get_user_dn(self.user_with_sm)
#the user has no rights granted, this should fail
try:
self.ldb_user2.modify_ldif(ldif)
mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
self.ldb_user2.modify_ldif(ldif)
- res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
#but not other users
ldif = """
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_sm) + """
+Member: """ + self.get_user_dn(self.user_with_sm) + """
Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
#grant self-membership, should be able to add himself but not others at the same time
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_wp)
+Member: """ + self.get_user_dn(self.user_with_wp)
self.ldb_user.modify_ldif(ldif)
- res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
ldif = """
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
add: Member
Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
self.ldb_user.modify_ldif(ldif)
- res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
def test_modify_anonymous(self):
self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
- self.full_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
+ self.full_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
def create_clean_ou(self, object_dn):
""" Base repeating setup for unittests to follow """
res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
- expression="distinguishedName=%s" % object_dn)
+ expression="distinguishedName=%s" % object_dn)
# Make sure top testing OU has been deleted before starting the test
self.assertEqual(len(res), 0)
self.ldb_admin.create_ou(object_dn)
self.fail()
try:
res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
except LdbError as e17:
(num, _) = e17.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 2)
- ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
+ ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
#these users should see all ous
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 6)
- res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
self.assertEquals(sorted(res_list), sorted(self.full_list))
res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 6)
- res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
self.assertEquals(sorted(res_list), sorted(self.full_list))
def test_search2(self):
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
#this user should see all ous
- res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
self.assertEquals(sorted(res_list), sorted(self.full_list))
#these users should see ou1, 2, 5 and 6 but not 3 and 4
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
- ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ scope=SCOPE_SUBTREE)
+ ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 4)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
def test_search3(self):
print("Testing correct behavior on nonaccessible search base")
try:
- self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_BASE)
+ self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
+ scope=SCOPE_BASE)
except LdbError as e18:
(num, _) = e18.args
self.assertEquals(num, ERR_NO_SUCH_OBJECT)
mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
- ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
+ ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
- ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
+ ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
#should not see ou3 and ou4, but should see ou5 and ou6
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 4)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 4)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
def test_search4(self):
self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
- ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
- Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
+ ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
+ Dn(self.ldb_admin, "OU=ou1," + self.base_dn)]
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 2)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 2)
- res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
+ res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
def test_search5(self):
self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
# assert user can only see dn
res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
ok_list = ['dn']
self.assertEquals(len(res), 1)
res_list = list(res[0].keys())
self.assertEquals(res_list, ok_list)
res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_BASE, attrs=["ou"])
+ scope=SCOPE_BASE, attrs=["ou"])
self.assertEquals(len(res), 1)
res_list = list(res[0].keys())
self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
ok_list = ['dn', 'ou']
self.assertEquals(len(res), 1)
res_list = list(res[0].keys())
self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
res_list = list(res[0].keys())
self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
#nothing should be returned as ou is not accessible
self.assertEquals(len(res), 0)
mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
- scope=SCOPE_SUBTREE)
+ scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 1)
ok_list = ['dn', 'ou']
res_list = list(res[0].keys())
res_list = list(res[0].keys())
self.assertEquals(sorted(res_list), sorted(ok_list))
+ def assert_search_on_attr(self, dn, samdb, attr, expected_list):
+
+ expected_num = len(expected_list)
+ res = samdb.search(dn, expression="(%s=*)" % attr, scope=SCOPE_SUBTREE)
+ self.assertEquals(len(res), expected_num)
+
+ res_list = [ x["dn"] for x in res if x["dn"] in expected_list ]
+ self.assertEquals(sorted(res_list), sorted(expected_list))
+
+ def test_search7(self):
+ """Checks object search visibility when users don't have full rights"""
+ self.create_clean_ou("OU=ou1," + self.base_dn)
+ mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid),
+ str(self.group_sid))
+ self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
+ tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
+ self.domain_sid)
+ self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
+ self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
+ sd=tmp_desc)
+ self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
+ sd=tmp_desc)
+ self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn,
+ sd=tmp_desc)
+ self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn,
+ sd=tmp_desc)
+
+ ou2_dn = Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
+ ou1_dn = Dn(self.ldb_admin, "OU=ou1," + self.base_dn)
+
+ # even though unprivileged users can't read these attributes for OU2,
+ # the object should still be visible in searches, because they have
+ # 'List Contents' rights still. This isn't really disclosive because
+ # ALL objects have these attributes
+ visible_attrs = ["objectClass", "distinguishedName", "name",
+ "objectGUID"]
+ two_objects = [ou2_dn, ou1_dn]
+
+ for attr in visible_attrs:
+ # a regular user should just see the 2 objects
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
+ expected_list=two_objects)
+
+ # whereas the following users have LC rights for all the objects,
+ # so they should see them all
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
+ expected_list=self.full_list)
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
+ expected_list=self.full_list)
+
+ # however when searching on the following attributes, objects will not
+ # be visible unless the user has Read Property rights
+ hidden_attrs = ["objectCategory", "instanceType", "ou", "uSNChanged",
+ "uSNCreated", "whenCreated"]
+ one_object = [ou1_dn]
+
+ for attr in hidden_attrs:
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user3, attr,
+ expected_list=one_object)
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user, attr,
+ expected_list=one_object)
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_user2, attr,
+ expected_list=one_object)
+
+ # admin has RP rights so can still see all the objects
+ self.assert_search_on_attr(str(ou1_dn), self.ldb_admin, attr,
+ expected_list=self.full_list)
+
#tests on ldap delete operations
class AclDeleteTests(AclTests):
# Try to delete User object
self.ldb_user.delete(user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
def test_delete_u3(self):
# Try to delete User object
self.ldb_user.delete(user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
def test_delete_anonymous(self):
self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
try:
self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
- "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
+ "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
except LdbError as e21:
(num, _) = e21.args
self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
# Rename 'User object' having WP to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u3(self):
# Rename 'User object' having WP to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u4(self):
# Rename 'User object' having SD and CC to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u5(self):
# Rename 'User object' having SD and CC to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u6(self):
# Rename 'User object' having SD and CC to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u7(self):
# Rename 'User object' having SD and CC to AU
self.ldb_user.rename(user_dn, rename_user_dn)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % user_dn)
+ expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
- expression="(distinguishedName=%s)" % rename_user_dn)
+ expression="(distinguishedName=%s)" % rename_user_dn)
self.assertNotEqual(len(res), 0)
def test_rename_u8(self):
#u3 is member of administrators group, should be able to read sd
res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
SCOPE_BASE, None, ["nTSecurityDescriptor"])
- self.assertEqual(len(res),1)
+ self.assertEqual(len(res), 1)
self.assertTrue("nTSecurityDescriptor" in res[0].keys())
class AclUndeleteTests(AclTests):
self.testuser5 = "to_be_undeleted5"
self.testuser6 = "to_be_undeleted6"
- self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn
+ self.new_dn_ou = "CN=" + self.testuser4 + "," + self.ou1 + self.base_dn
# Create regular user
self.testuser1_dn = self.get_user_dn(self.testuser1)
self.ldb_admin.newuser(self.regular_user, self.user_pass)
self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
- add_members_operation=True)
+ add_members_operation=True)
self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
guid = res[0]["objectGUID"][0]
self.ldb_admin.delete(self.get_user_dn(new_user))
res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
- scope=SCOPE_BASE, controls=["show_deleted:1"])
+ scope=SCOPE_BASE, controls=["show_deleted:1"])
self.assertEquals(len(res), 1)
return str(res[0].dn)
self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
self.site = "Default-First-Site-Name"
- self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
- site=self.site, netbios_name=self.rodcname, targetdir=None,
- domain=None)
- self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
- netbios_name=self.dcname, targetdir=None, domain=None)
+ self.rodcctx = DCJoinContext(server=host, creds=creds, lp=lp,
+ site=self.site, netbios_name=self.rodcname,
+ targetdir=None, domain=None)
+ self.dcctx = DCJoinContext(server=host, creds=creds, lp=lp,
+ site=self.site, netbios_name=self.dcname,
+ targetdir=None, domain=None)
self.ldb_admin.newuser(self.test_user, self.user_pass)
self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
msg = Message()
msg.dn = Dn(self.ldb_admin, dn)
msg["servicePrincipalName"] = MessageElement(spn, flag,
- "servicePrincipalName")
+ "servicePrincipalName")
_ldb.modify(msg)
def create_computer(self, computername, domainname):
# same as for join_RODC, but do not set any SPNs
def create_rodc(self, ctx):
- ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
-
- ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
- ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
-
- mysid = ctx.get_mysid()
- admin_dn = "<SID=%s>" % mysid
- ctx.managedby = admin_dn
-
- ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
- samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
- samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
-
- ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
- ctx.secure_channel_type = misc.SEC_CHAN_RODC
- ctx.RODC = True
- ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
- drsuapi.DRSUAPI_DRS_PER_SYNC |
- drsuapi.DRSUAPI_DRS_GET_ANC |
- drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
- drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
-
- ctx.join_add_objects()
+ ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+ ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
+ ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+ mysid = ctx.get_mysid()
+ admin_dn = "<SID=%s>" % mysid
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+ samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+ ctx.join_add_objects()
def create_dc(self, ctx):
- ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
+ ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
+ ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
ctx.secure_channel_type = misc.SEC_CHAN_BDC
ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
# This does not pass against Windows, although it should according to docs
self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
- (self.computername, self.dcctx.dnsdomain))
+ (self.computername, self.dcctx.dnsdomain))
try:
self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))