PEP8: fix E265: block comment should start with '# '
[amitay/samba.git] / source4 / dsdb / tests / python / dirsync.py
index 3cb5d2210b895c5a3303ed8ff32b0d86cbdf4989..0f49fd7e6304de6208cde39056f1e3568f3e99a5 100755 (executable)
@@ -64,7 +64,7 @@ if not "://" in host:
 else:
     ldaphost = host
     start = host.rindex("://")
-    host = host.lstrip(start+3)
+    host = host.lstrip(start + 3)
 
 lp = sambaopts.get_loadparm()
 creds = credopts.get_credentials(lp)
@@ -83,7 +83,7 @@ class DirsyncBaseTests(samba.tests.TestCase):
         self.user_pass = samba.generate_random_password(12, 16)
         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
-        #used for anonymous login
+        # used for anonymous login
         print("baseDN: %s" % self.base_dn)
 
     def get_user_dn(self, name):
@@ -98,12 +98,12 @@ class DirsyncBaseTests(samba.tests.TestCase):
         creds_tmp.set_workstation(creds.get_workstation())
         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
                                       | gensec.FEATURE_SEAL)
-        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
         return ldb_target
 
 
-#tests on ldap add operations
+# tests on ldap add operations
 class SimpleDirsyncTests(DirsyncBaseTests):
 
     def setUp(self):
@@ -126,7 +126,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
 
         # add admins to the Domain Admins group
         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
-                       add_members_operation=True)
+                                                add_members_operation=True)
 
     def tearDown(self):
         super(SimpleDirsyncTests, self).tearDown()
@@ -141,7 +141,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         except Exception:
             pass
 
-    #def test_dirsync_errors(self):
+    # def test_dirsync_errors(self):
 
     def test_dirsync_supported(self):
         """Test the basic of the dirsync is supported"""
@@ -151,8 +151,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
         try:
             self.ldb_simple.search(self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:0:1"])
+                                   expression="samaccountname=*",
+                                   controls=["dirsync:1:0:1"])
         except LdbError as l:
             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
 
@@ -176,48 +176,48 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
         try:
             self.ldb_simple.search(self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:0:1"])
+                                   expression="samaccountname=*",
+                                   controls=["dirsync:1:0:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
 
         try:
             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:0:1"])
+                                   expression="samaccountname=*",
+                                   controls=["dirsync:1:0:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
 
         try:
             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:1:1"])
+                                   expression="samaccountname=*",
+                                   controls=["dirsync:1:1:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
 
         try:
             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:0:1"])
+                                    expression="samaccountname=*",
+                                    controls=["dirsync:1:0:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
 
         try:
             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:0:1"])
+                                  expression="samaccountname=*",
+                                  controls=["dirsync:1:0:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
 
         try:
             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
-                expression="samaccountname=*",
-                controls=["dirsync:1:1:1"])
+                                  expression="samaccountname=*",
+                                  controls=["dirsync:1:1:1"])
         except LdbError as l:
             print(l)
             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
@@ -278,8 +278,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
                                     controls=["dirsync:1:0:1"])
         count = len(res.msgs[0])
         res2 = self.ldb_admin.search(self.base_dn,
-                                    expression="samaccountname=Administrator",
-                                    controls=["dirsync:1:0:1"])
+                                     expression="samaccountname=Administrator",
+                                     controls=["dirsync:1:0:1"])
         count2 = len(res2.msgs[0])
         self.assertEqual(count, count2)
 
@@ -295,14 +295,14 @@ class SimpleDirsyncTests(DirsyncBaseTests):
                                     attrs=["parentGUID"],
                                     controls=["dirsync:1:0:1"])
         self.assertEqual(len(res.msgs), 0)
-        ouname="OU=testou,%s" % self.base_dn
+        ouname = "OU=testou,%s" % self.base_dn
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
         delta = Message()
         delta.dn = Dn(self.ldb_admin, str(ouname))
         delta["cn"] = MessageElement("test ou",
                                      FLAG_MOD_ADD,
-                                     "cn" )
+                                     "cn")
         self.ldb_admin.modify(delta)
         res = self.ldb_admin.search(self.base_dn,
                                     expression="name=testou",
@@ -399,7 +399,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         control2 = str(":".join(ctl))
 
         # Let's create an OU
-        ouname="OU=testou2,%s" % self.base_dn
+        ouname = "OU=testou2,%s" % self.base_dn
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
         res = self.ldb_admin.search(self.base_dn,
@@ -417,7 +417,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
 
         delta["cn"] = MessageElement("test ou",
                                      FLAG_MOD_ADD,
-                                     "cn" )
+                                     "cn")
         self.ldb_admin.modify(delta)
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
@@ -431,7 +431,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         delta.dn = Dn(self.ldb_admin, str(ouname))
         delta["cn"] = MessageElement([],
                                      FLAG_MOD_DELETE,
-                                     "cn" )
+                                     "cn")
         self.ldb_admin.modify(delta)
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
@@ -463,8 +463,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         # Let's search for members
         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
         res = self.ldb_simple.search(self.base_dn,
-                                    expression="(name=Administrators)",
-                                    controls=["dirsync:1:1:1"])
+                                     expression="(name=Administrators)",
+                                     controls=["dirsync:1:1:1"])
 
         self.assertTrue(len(res[0].get("member")) > 0)
         size = len(res[0].get("member"))
@@ -475,11 +475,11 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         ctl[3] = "10000"
         control1 = str(":".join(ctl))
         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
-                       add_members_operation=True)
+                                                add_members_operation=True)
 
         res = self.ldb_simple.search(self.base_dn,
-                                    expression="(name=Administrators)",
-                                    controls=[control1])
+                                     expression="(name=Administrators)",
+                                     controls=[control1])
 
         self.assertEqual(len(res[0].get("member")), size + 1)
         ctl = str(res.controls[0]).split(":")
@@ -490,24 +490,24 @@ class SimpleDirsyncTests(DirsyncBaseTests):
 
         # remove the user from the group
         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
-                       add_members_operation=False)
+                                                add_members_operation=False)
 
         res = self.ldb_simple.search(self.base_dn,
-                                    expression="(name=Administrators)",
-                                    controls=[control1])
+                                     expression="(name=Administrators)",
+                                     controls=[control1])
 
-        self.assertEqual(len(res[0].get("member")), size )
+        self.assertEqual(len(res[0].get("member")), size)
 
         self.ldb_admin.newgroup("testgroup")
         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
-                       add_members_operation=True)
+                                                add_members_operation=True)
 
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(name=testgroup)",
                                     controls=["dirsync:1:0:1"])
 
         self.assertEqual(len(res[0].get("member")), 1)
-        self.assertTrue(res[0].get("member") != "" )
+        self.assertTrue(res[0].get("member") != "")
 
         ctl = str(res.controls[0]).split(":")
         ctl[1] = "1"
@@ -530,7 +530,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         control1 = str(":".join(ctl))
 
         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
-                       add_members_operation=False)
+                                                add_members_operation=False)
 
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(name=testgroup)",
@@ -545,7 +545,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
     def test_dirsync_deleted_items(self):
         """Check that dirsync returnd deleted objects too"""
         # Let's create an OU
-        ouname="OU=testou3,%s" % self.base_dn
+        ouname = "OU=testou3,%s" % self.base_dn
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
         res = self.ldb_admin.search(self.base_dn,
@@ -554,7 +554,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         guid = None
         for e in res:
             if str(e["name"]) == "testou3":
-                guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+                guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
 
         ctl = str(res.controls[0]).split(":")
         ctl[1] = "1"
@@ -570,7 +570,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
                                     expression="(objectClass=organizationalUnit)",
                                     controls=[control1])
         self.assertEqual(len(res), 1)
-        guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+        guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
         self.assertEqual(guid2, guid)
         self.assertTrue(res[0].get("isDeleted"))
         self.assertTrue(res[0].get("name") != None)
@@ -582,7 +582,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         ctl = str(res.controls[0]).split(":")
         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
-        controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+        controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
                                     controls=controls)
@@ -598,7 +598,7 @@ class ExtendedDirsyncTests(SimpleDirsyncTests):
                                     expression="(name=Administrators)",
                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
 
-        self.assertTrue(res[0].get("member;range=1-1") != None )
+        self.assertTrue(res[0].get("member;range=1-1") != None)
         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
         size = len(res[0].get("member;range=1-1"))
 
@@ -608,9 +608,9 @@ class ExtendedDirsyncTests(SimpleDirsyncTests):
         ctl[3] = "10000"
         control1 = str(":".join(ctl))
         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
-                       add_members_operation=True)
+                                                add_members_operation=True)
         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
-                       add_members_operation=True)
+                                                add_members_operation=True)
 
 
         res = self.ldb_admin.search(self.base_dn,
@@ -626,13 +626,13 @@ class ExtendedDirsyncTests(SimpleDirsyncTests):
 
         # remove the user from the group
         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
-                       add_members_operation=False)
+                                                add_members_operation=False)
 
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(name=Administrators)",
                                     controls=[control1])
 
-        self.assertEqual(res[0].get("member;range=1-1"), None )
+        self.assertEqual(res[0].get("member;range=1-1"), None)
         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
 
         ctl = str(res.controls[0]).split(":")
@@ -642,39 +642,39 @@ class ExtendedDirsyncTests(SimpleDirsyncTests):
         control2 = str(":".join(ctl))
 
         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
-                       add_members_operation=False)
+                                                add_members_operation=False)
 
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(name=Administrators)",
                                     controls=[control2])
 
-        self.assertEqual(res[0].get("member;range=1-1"), None )
+        self.assertEqual(res[0].get("member;range=1-1"), None)
         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
 
         res = self.ldb_admin.search(self.base_dn,
                                     expression="(name=Administrators)",
                                     controls=[control1])
 
-        self.assertEqual(res[0].get("member;range=1-1"), None )
+        self.assertEqual(res[0].get("member;range=1-1"), None)
         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
 
     def test_dirsync_deleted_items(self):
         """Check that dirsync returnd deleted objects too"""
         # Let's create an OU
         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
-        ouname="OU=testou3,%s" % self.base_dn
+        ouname = "OU=testou3,%s" % self.base_dn
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
 
         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
         res = self.ldb_simple.search(self.base_dn,
-                                    expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
-                                    controls=["dirsync:1:1:1"])
+                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
+                                     controls=["dirsync:1:1:1"])
 
         guid = None
         for e in res:
             if str(e["name"]) == "testou3":
-                guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+                guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
 
         self.assertTrue(guid != None)
         ctl = str(res.controls[0]).split(":")
@@ -689,10 +689,10 @@ class ExtendedDirsyncTests(SimpleDirsyncTests):
         delete_force(self.ldb_admin, ouname)
 
         res = self.ldb_simple.search(self.base_dn,
-                                    expression="(objectClass=organizationalUnit)",
-                                    controls=[control1])
+                                     expression="(objectClass=organizationalUnit)",
+                                     controls=[control1])
         self.assertEqual(len(res), 1)
-        guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+        guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
         self.assertEqual(guid2, guid)
         self.assertEqual(str(res[0].dn), "")