else:
ldaphost = host
start = host.rindex("://")
- host = host.lstrip(start+3)
+ host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
self.user_pass = samba.generate_random_password(12, 16)
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
- #used for anonymous login
+ # used for anonymous login
print("baseDN: %s" % self.base_dn)
def get_user_dn(self, name):
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
-#tests on ldap add operations
+# tests on ldap add operations
class SimpleDirsyncTests(DirsyncBaseTests):
def setUp(self):
# add admins to the Domain Admins group
self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
- add_members_operation=True)
+ add_members_operation=True)
def tearDown(self):
super(SimpleDirsyncTests, self).tearDown()
except Exception:
pass
- #def test_dirsync_errors(self):
+ # def test_dirsync_errors(self):
def test_dirsync_supported(self):
"""Test the basic of the dirsync is supported"""
res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
try:
self.ldb_simple.search(self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
except LdbError as l:
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
try:
self.ldb_simple.search(self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:1:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:1:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
try:
self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
- expression="samaccountname=*",
- controls=["dirsync:1:1:1"])
+ expression="samaccountname=*",
+ controls=["dirsync:1:1:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
controls=["dirsync:1:0:1"])
count = len(res.msgs[0])
res2 = self.ldb_admin.search(self.base_dn,
- expression="samaccountname=Administrator",
- controls=["dirsync:1:0:1"])
+ expression="samaccountname=Administrator",
+ controls=["dirsync:1:0:1"])
count2 = len(res2.msgs[0])
self.assertEqual(count, count2)
attrs=["parentGUID"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 0)
- ouname="OU=testou,%s" % self.base_dn
+ ouname = "OU=testou,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
delta = Message()
delta.dn = Dn(self.ldb_admin, str(ouname))
delta["cn"] = MessageElement("test ou",
FLAG_MOD_ADD,
- "cn" )
+ "cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="name=testou",
control2 = str(":".join(ctl))
# Let's create an OU
- ouname="OU=testou2,%s" % self.base_dn
+ ouname = "OU=testou2,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
delta["cn"] = MessageElement("test ou",
FLAG_MOD_ADD,
- "cn" )
+ "cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
delta.dn = Dn(self.ldb_admin, str(ouname))
delta["cn"] = MessageElement([],
FLAG_MOD_DELETE,
- "cn" )
+ "cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
# Let's search for members
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_simple.search(self.base_dn,
- expression="(name=Administrators)",
- controls=["dirsync:1:1:1"])
+ expression="(name=Administrators)",
+ controls=["dirsync:1:1:1"])
self.assertTrue(len(res[0].get("member")) > 0)
size = len(res[0].get("member"))
ctl[3] = "10000"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
- add_members_operation=True)
+ add_members_operation=True)
res = self.ldb_simple.search(self.base_dn,
- expression="(name=Administrators)",
- controls=[control1])
+ expression="(name=Administrators)",
+ controls=[control1])
self.assertEqual(len(res[0].get("member")), size + 1)
ctl = str(res.controls[0]).split(":")
# remove the user from the group
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
- add_members_operation=False)
+ add_members_operation=False)
res = self.ldb_simple.search(self.base_dn,
- expression="(name=Administrators)",
- controls=[control1])
+ expression="(name=Administrators)",
+ controls=[control1])
- self.assertEqual(len(res[0].get("member")), size )
+ self.assertEqual(len(res[0].get("member")), size)
self.ldb_admin.newgroup("testgroup")
self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
- add_members_operation=True)
+ add_members_operation=True)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
controls=["dirsync:1:0:1"])
self.assertEqual(len(res[0].get("member")), 1)
- self.assertTrue(res[0].get("member") != "" )
+ self.assertTrue(res[0].get("member") != "")
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
- add_members_operation=False)
+ add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
def test_dirsync_deleted_items(self):
"""Check that dirsync returnd deleted objects too"""
# Let's create an OU
- ouname="OU=testou3,%s" % self.base_dn
+ ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
guid = None
for e in res:
if str(e["name"]) == "testou3":
- guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+ guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
expression="(objectClass=organizationalUnit)",
controls=[control1])
self.assertEqual(len(res), 1)
- guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertTrue(res[0].get("isDeleted"))
self.assertTrue(res[0].get("name") != None)
ctl = str(res.controls[0]).split(":")
cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
- controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+ controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=controls)
expression="(name=Administrators)",
controls=["dirsync:1:%d:1" % flag_incr_linked])
- self.assertTrue(res[0].get("member;range=1-1") != None )
+ self.assertTrue(res[0].get("member;range=1-1") != None)
self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
size = len(res[0].get("member;range=1-1"))
ctl[3] = "10000"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
- add_members_operation=True)
+ add_members_operation=True)
self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
- add_members_operation=True)
+ add_members_operation=True)
res = self.ldb_admin.search(self.base_dn,
# remove the user from the group
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
- add_members_operation=False)
+ add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
- self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
ctl = str(res.controls[0]).split(":")
control2 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
- add_members_operation=False)
+ add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control2])
- self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
- self.assertEqual(res[0].get("member;range=1-1"), None )
+ self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 2)
def test_dirsync_deleted_items(self):
"""Check that dirsync returnd deleted objects too"""
# Let's create an OU
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
- ouname="OU=testou3,%s" % self.base_dn
+ ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
# Specify LDAP_DIRSYNC_OBJECT_SECURITY
res = self.ldb_simple.search(self.base_dn,
- expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
- controls=["dirsync:1:1:1"])
+ expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+ controls=["dirsync:1:1:1"])
guid = None
for e in res:
if str(e["name"]) == "testou3":
- guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+ guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
self.assertTrue(guid != None)
ctl = str(res.controls[0]).split(":")
delete_force(self.ldb_admin, ouname)
res = self.ldb_simple.search(self.base_dn,
- expression="(objectClass=organizationalUnit)",
- controls=[control1])
+ expression="(objectClass=organizationalUnit)",
+ controls=[control1])
self.assertEqual(len(res), 1)
- guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertEqual(str(res[0].dn), "")