s4-ldb: improve detection of whether the server has a GC port
[ira/wip.git] / source4 / lib / ldb / tests / python / ldap.py
index 49aea28f5dd5d90e508533a6411926f30d90343f..408246b45c4e39b959faefb928b68d7b05979fc0 100755 (executable)
@@ -8,6 +8,7 @@ import sys
 import time
 import random
 import base64
+import os
 
 sys.path.append("bin/python")
 sys.path.append("../lib/subunit/python")
@@ -22,6 +23,7 @@ from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX
 from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
 from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
 from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
+from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
 from ldb import Message, MessageElement, Dn
 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
 from samba import Ldb, param, dom_sid_to_rid
@@ -108,6 +110,7 @@ class BasicTests(unittest.TestCase):
         self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer2," + self.base_dn)
+        self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
@@ -222,13 +225,13 @@ class BasicTests(unittest.TestCase):
 
         m = Message()
         m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-        m["sAMAccountName"] = MessageElement("testgroup", FLAG_MOD_REPLACE,
+        m["sAMAccountName"] = MessageElement("testgroupXX", FLAG_MOD_REPLACE,
           "sAMAccountName")
         ldb.modify(m)
 
         m = Message()
         m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-        m["sAMAccountName"] = MessageElement("testgroup2", FLAG_MOD_ADD,
+        m["sAMAccountName"] = MessageElement("testgroupXX2", FLAG_MOD_ADD,
           "sAMAccountName")
         try:
             ldb.modify(m)
@@ -484,6 +487,33 @@ class BasicTests(unittest.TestCase):
 
         self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
 
+
+        # this test needs to be disabled until we really understand
+        # what the rDN length constraints are
+    def DISABLED_test_largeRDN(self):
+        """Testing large rDN (limit 64 characters)"""
+        rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012";
+        self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+        ldif = """
+dn: %s,%s""" % (rdn,self.base_dn) + """
+objectClass: container
+"""
+        self.ldb.add_ldif(ldif)
+        self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+
+        rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120";
+        self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+        try:
+            ldif = """
+dn: %s,%s""" % (rdn,self.base_dn) + """
+objectClass: container
+"""
+            self.ldb.add_ldif(ldif)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+
     def test_rename(self):
         """Tests the rename operation"""
         print "Tests the rename operations"""
@@ -503,6 +533,28 @@ class BasicTests(unittest.TestCase):
 
         self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
 
+    def test_rename_twice(self):
+        """Tests the rename operation twice - this corresponds to a past bug"""
+        print "Tests the rename twice operation"""
+
+        self.ldb.add({
+             "dn": "cn=ldaptestuser5,cn=users," + self.base_dn,
+             "objectclass": ["user", "person"] })
+
+        ldb.rename("cn=ldaptestuser5,cn=users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn)
+        self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
+        self.ldb.add({
+             "dn": "cn=ldaptestuser5,cn=users," + self.base_dn,
+             "objectclass": ["user", "person"] })
+        ldb.rename("cn=ldaptestuser5,cn=Users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn)
+        res = ldb.search(expression="cn=ldaptestuser5")
+        print "Found %u records" % len(res)
+        self.assertEquals(len(res), 1, "Wrong number of hits for cn=ldaptestuser5")
+        res = ldb.search(expression="(&(cn=ldaptestuser5)(objectclass=user))")
+        print "Found %u records" % len(res)
+        self.assertEquals(len(res), 1, "Wrong number of hits for (&(cn=ldaptestuser5)(objectclass=user))")
+        self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
+
     def test_parentGUID(self):
         """Test parentGUID behaviour"""
         print "Testing parentGUID behaviour\n"
@@ -713,6 +765,16 @@ class BasicTests(unittest.TestCase):
         """Test the primary group token behaviour (hidden-generated-readonly attribute on groups)"""
         print "Testing primary group token behaviour\n"
 
+        try:
+            ldb.add({
+                "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
+                "objectclass": "group",
+                "primaryGroupToken": "100"})
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_UNDEFINED_ATTRIBUTE_TYPE)
+        self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+
         ldb.add({
             "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
             "objectclass": ["user", "person"]})
@@ -739,18 +801,47 @@ class BasicTests(unittest.TestCase):
        rid = dom_sid_to_rid(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0]))
         self.assertEquals(primary_group_token, rid)
 
-# TODO Has to wait until we support read-only generated attributes correctly
-#        m = Message()
-#        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-#        m["primaryGroupToken"] = "100"
-#        try:
-#                ldb.modify(m)
-#                self.fail()
-#        except LdbError, (num, msg):
+        m = Message()
+        m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+        m["primaryGroupToken"] = "100"
+        try:
+            ldb.modify(m)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
 
         self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
 
+    def test_wkguid(self):
+        """Test Well known GUID behaviours (including DN+Binary)"""
+        print "Test Well known GUID behaviours (including DN+Binary)"""
+
+        res = self.ldb.search(base=("<WKGUID=ab1d30f3768811d1aded00c04fd8d5cd,%s>" % self.base_dn), scope=SCOPE_BASE, attrs=[])
+        self.assertEquals(len(res), 1)
+        
+        res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=B:32:ab1d30f3768811d1aded00c04fd8d5cd:%s" % res[0].dn))
+        self.assertEquals(len(res2), 1)
+
+        # Prove that the matching rule is over the whole DN+Binary
+        res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=B:32:ab1d30f3768811d1aded00c04fd8d5cd"))
+        self.assertEquals(len(res2), 0)
+        # Prove that the matching rule is over the whole DN+Binary
+        res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=%s") % res[0].dn)
+        self.assertEquals(len(res2), 0)
+
+    def test_subschemasubentry(self):
+        """Test subSchemaSubEntry appears when requested, but not when not requested"""
+        print "Test subSchemaSubEntry"""
+
+        res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["subSchemaSubEntry"])
+        self.assertEquals(len(res), 1)
+        self.assertEquals(res[0]["subSchemaSubEntry"][0], "CN=Aggregate,"+self.schema_dn)
+
+        res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["*"])
+        self.assertEquals(len(res), 1)
+        self.assertTrue("subScheamSubEntry" not in res[0])
+
     def test_all(self):
         """Basic tests"""
 
@@ -990,17 +1081,17 @@ servicePrincipalName: host/ldaptest2computer29
 #        res = ldb.search(expression="(&(anr==testy ldap)(objectClass=user))")
 #        self.assertEquals(len(res), 1, "Found only %d for (&(anr==testy ldap)(objectClass=user))" % len(res))
 
-        self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
-        self.assertEquals(res[0]["cn"][0], "ldaptestuser")
-        self.assertEquals(res[0]["name"][0], "ldaptestuser")
+#        self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
+#        self.assertEquals(res[0]["cn"][0], "ldaptestuser")
+#        self.assertEquals(res[0]["name"][0], "ldaptestuser")
 
         # Testing ldb.search for (&(anr==testy ldap)(objectClass=user))
 #        res = ldb.search(expression="(&(anr==testy ldap)(objectClass=user))")
 #        self.assertEquals(len(res), 1, "Could not find (&(anr==testy ldap)(objectClass=user))")
 
-        self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
-        self.assertEquals(res[0]["cn"][0], "ldaptestuser")
-        self.assertEquals(res[0]["name"][0], "ldaptestuser")
+#        self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
+#        self.assertEquals(res[0]["cn"][0], "ldaptestuser")
+#        self.assertEquals(res[0]["name"][0], "ldaptestuser")
 
         # Testing ldb.search for (&(anr=testy ldap user)(objectClass=user))
         res = ldb.search(expression="(&(anr=testy ldap user)(objectClass=user))")
@@ -1275,10 +1366,11 @@ member: cn=ldaptestuser2,cn=users,""" + self.base_dn + """
 
         print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control"
 
-        res3control = gc_ldb.search(self.base_dn, expression="(&(cn=ldaptestuser)(objectCategory=PerSon))", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"])
-        self.assertEquals(len(res3control), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog")
+        if gc_ldb is not None:
+            res3control = gc_ldb.search(self.base_dn, expression="(&(cn=ldaptestuser)(objectCategory=PerSon))", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"])
+            self.assertEquals(len(res3control), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog")
 
-        self.assertEquals(res[0].dn, res3control[0].dn)
+            self.assertEquals(res[0].dn, res3control[0].dn)
 
         ldb.delete(res[0].dn)
 
@@ -1587,6 +1679,7 @@ member: CN=ldaptestutf8user èùéìòà,CN=Users,""" + self.base_dn + """
         self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer2," + self.base_dn)
+        self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn)
         self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
@@ -1848,34 +1941,50 @@ class SchemaTests(unittest.TestCase):
         self.assertFalse("objectClasses" in res[0])
         self.assertFalse("attributeTypes" in res[0])
 
+
     def test_schemaUpdateNow(self):
         """Testing schemaUpdateNow"""
-        class_name = "test-class" + time.strftime("%s", time.gmtime())
+        attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
+        attr_ldap_display_name = attr_name.replace("-", "")
+
+        ldif = """
+dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
+objectClass: top
+objectClass: attributeSchema
+adminDescription: """ + attr_name + """
+adminDisplayName: """ + attr_name + """
+cn: """ + attr_name + """
+attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
+attributeSyntax: 2.5.5.12
+omSyntax: 64
+instanceType: 4
+isSingleValued: TRUE
+systemOnly: FALSE
+"""
+        self.ldb.add_ldif(ldif)
+
+        class_name = "test-Class" + time.strftime("%s", time.gmtime())
         class_ldap_display_name = class_name.replace("-", "")
-        object_name = "obj" + time.strftime("%s", time.gmtime())
 
         ldif = """
 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
-lDAPDisplayName: """ + class_ldap_display_name + """
 objectClass: top
 objectClass: classSchema
 adminDescription: """ + class_name + """
 adminDisplayName: """ + class_name + """
 cn: """ + class_name + """
-objectCategory: CN=Class-Schema,""" + self.schema_dn + """
-defaultObjectCategory: CN=%s,%s""" % (class_name, self.schema_dn) + """
-distinguishedName: CN=%s,%s""" % (class_name, self.schema_dn) + """
-governsID: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
+governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
 instanceType: 4
-name: """ + class_name + """
 objectClassCategory: 1
 subClassOf: organizationalPerson
 systemFlags: 16
 rDNAttID: cn
 systemMustContain: cn
+systemMustContain: """ + attr_ldap_display_name + """
 systemOnly: FALSE
 """
         self.ldb.add_ldif(ldif)
+
         ldif = """
 dn:
 changetype: modify
@@ -1883,6 +1992,9 @@ add: schemaUpdateNow
 schemaUpdateNow: 1
 """
         self.ldb.modify_ldif(ldif)
+
+        object_name = "obj" + time.strftime("%s", time.gmtime())
+
         ldif = """
 dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
 objectClass: organizationalPerson
@@ -1894,25 +2006,44 @@ instanceType: 4
 objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
 distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
 name: """ + object_name + """
+""" + attr_ldap_display_name + """: test
 """
         self.ldb.add_ldif(ldif)
-        # Search for created objectClass
+
+        # Search for created attribute
+        res = []
+        res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
+        self.assertEquals(len(res), 1)
+        self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
+        self.assertTrue("schemaIDGUID" in res[0])
+
+        # Search for created objectclass
         res = []
         res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
-        self.assertNotEqual(res, [])
+        self.assertEquals(len(res), 1)
+        self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
+        self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
+        self.assertTrue("schemaIDGUID" in res[0])
 
+        # Search for created object
         res = []
         res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
-        self.assertNotEqual(res, [])
+        self.assertEquals(len(res), 1)
         # Delete the object
         self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
 
 if not "://" in host:
-    host = "ldap://%s" % host
+    if os.path.isfile(host):
+        host = "tdb://%s" % host
+    else:
+        host = "ldap://%s" % host
 
 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
-gc_ldb = Ldb("%s:3268" % host, credentials=creds,
-             session_info=system_session(), lp=lp)
+if not "tdb://" in host:
+    gc_ldb = Ldb("%s:3268" % host, credentials=creds,
+                 session_info=system_session(), lp=lp)
+else:
+    gc_ldb = None
 
 runner = SubunitTestRunner()
 rc = 0