import time
import random
import base64
+import os
sys.path.append("bin/python")
sys.path.append("../lib/subunit/python")
from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
+from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba import Ldb, param, dom_sid_to_rid
self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer2," + self.base_dn)
+ self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["sAMAccountName"] = MessageElement("testgroup", FLAG_MOD_REPLACE,
+ m["sAMAccountName"] = MessageElement("testgroupXX", FLAG_MOD_REPLACE,
"sAMAccountName")
ldb.modify(m)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["sAMAccountName"] = MessageElement("testgroup2", FLAG_MOD_ADD,
+ m["sAMAccountName"] = MessageElement("testgroupXX2", FLAG_MOD_ADD,
"sAMAccountName")
try:
ldb.modify(m)
self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+
+ # this test needs to be disabled until we really understand
+ # what the rDN length constraints are
+ def DISABLED_test_largeRDN(self):
+ """Testing large rDN (limit 64 characters)"""
+ rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012";
+ self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+ ldif = """
+dn: %s,%s""" % (rdn,self.base_dn) + """
+objectClass: container
+"""
+ self.ldb.add_ldif(ldif)
+ self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+
+ rdn = "CN=a0123456789012345678901234567890123456789012345678901234567890120";
+ self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+ try:
+ ldif = """
+dn: %s,%s""" % (rdn,self.base_dn) + """
+objectClass: container
+"""
+ self.ldb.add_ldif(ldif)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ self.delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
+
def test_rename(self):
"""Tests the rename operation"""
print "Tests the rename operations"""
self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
+ def test_rename_twice(self):
+ """Tests the rename operation twice - this corresponds to a past bug"""
+ print "Tests the rename twice operation"""
+
+ self.ldb.add({
+ "dn": "cn=ldaptestuser5,cn=users," + self.base_dn,
+ "objectclass": ["user", "person"] })
+
+ ldb.rename("cn=ldaptestuser5,cn=users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn)
+ self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
+ self.ldb.add({
+ "dn": "cn=ldaptestuser5,cn=users," + self.base_dn,
+ "objectclass": ["user", "person"] })
+ ldb.rename("cn=ldaptestuser5,cn=Users," + self.base_dn, "cn=ldaptestUSER5,cn=users," + self.base_dn)
+ res = ldb.search(expression="cn=ldaptestuser5")
+ print "Found %u records" % len(res)
+ self.assertEquals(len(res), 1, "Wrong number of hits for cn=ldaptestuser5")
+ res = ldb.search(expression="(&(cn=ldaptestuser5)(objectclass=user))")
+ print "Found %u records" % len(res)
+ self.assertEquals(len(res), 1, "Wrong number of hits for (&(cn=ldaptestuser5)(objectclass=user))")
+ self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
+
def test_parentGUID(self):
"""Test parentGUID behaviour"""
print "Testing parentGUID behaviour\n"
"""Test the primary group token behaviour (hidden-generated-readonly attribute on groups)"""
print "Testing primary group token behaviour\n"
+ try:
+ ldb.add({
+ "dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
+ "objectclass": "group",
+ "primaryGroupToken": "100"})
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_UNDEFINED_ATTRIBUTE_TYPE)
+ self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+
ldb.add({
"dn": "cn=ldaptestuser,cn=users," + self.base_dn,
"objectclass": ["user", "person"]})
rid = dom_sid_to_rid(ldb.schema_format_value("objectSID", res1[0]["objectSID"][0]))
self.assertEquals(primary_group_token, rid)
-# TODO Has to wait until we support read-only generated attributes correctly
-# m = Message()
-# m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-# m["primaryGroupToken"] = "100"
-# try:
-# ldb.modify(m)
-# self.fail()
-# except LdbError, (num, msg):
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+ m["primaryGroupToken"] = "100"
+ try:
+ ldb.modify(m)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
self.delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
+ def test_wkguid(self):
+ """Test Well known GUID behaviours (including DN+Binary)"""
+ print "Test Well known GUID behaviours (including DN+Binary)"""
+
+ res = self.ldb.search(base=("<WKGUID=ab1d30f3768811d1aded00c04fd8d5cd,%s>" % self.base_dn), scope=SCOPE_BASE, attrs=[])
+ self.assertEquals(len(res), 1)
+
+ res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=B:32:ab1d30f3768811d1aded00c04fd8d5cd:%s" % res[0].dn))
+ self.assertEquals(len(res2), 1)
+
+ # Prove that the matching rule is over the whole DN+Binary
+ res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=B:32:ab1d30f3768811d1aded00c04fd8d5cd"))
+ self.assertEquals(len(res2), 0)
+ # Prove that the matching rule is over the whole DN+Binary
+ res2 = self.ldb.search(scope=SCOPE_BASE, attrs=["wellKnownObjects"], expression=("wellKnownObjects=%s") % res[0].dn)
+ self.assertEquals(len(res2), 0)
+
+ def test_subschemasubentry(self):
+ """Test subSchemaSubEntry appears when requested, but not when not requested"""
+ print "Test subSchemaSubEntry"""
+
+ res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["subSchemaSubEntry"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["subSchemaSubEntry"][0], "CN=Aggregate,"+self.schema_dn)
+
+ res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertTrue("subScheamSubEntry" not in res[0])
+
def test_all(self):
"""Basic tests"""
# res = ldb.search(expression="(&(anr==testy ldap)(objectClass=user))")
# self.assertEquals(len(res), 1, "Found only %d for (&(anr==testy ldap)(objectClass=user))" % len(res))
- self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
- self.assertEquals(res[0]["cn"][0], "ldaptestuser")
- self.assertEquals(res[0]["name"][0], "ldaptestuser")
+# self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
+# self.assertEquals(res[0]["cn"][0], "ldaptestuser")
+# self.assertEquals(res[0]["name"][0], "ldaptestuser")
# Testing ldb.search for (&(anr==testy ldap)(objectClass=user))
# res = ldb.search(expression="(&(anr==testy ldap)(objectClass=user))")
# self.assertEquals(len(res), 1, "Could not find (&(anr==testy ldap)(objectClass=user))")
- self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
- self.assertEquals(res[0]["cn"][0], "ldaptestuser")
- self.assertEquals(res[0]["name"][0], "ldaptestuser")
+# self.assertEquals(str(res[0].dn), ("CN=ldaptestuser,CN=Users," + self.base_dn))
+# self.assertEquals(res[0]["cn"][0], "ldaptestuser")
+# self.assertEquals(res[0]["name"][0], "ldaptestuser")
# Testing ldb.search for (&(anr=testy ldap user)(objectClass=user))
res = ldb.search(expression="(&(anr=testy ldap user)(objectClass=user))")
print "Testing ldb.search for (&(cn=ldaptestuser)(objectCategory=PerSon)) in with 'phantom root' control"
- res3control = gc_ldb.search(self.base_dn, expression="(&(cn=ldaptestuser)(objectCategory=PerSon))", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"])
- self.assertEquals(len(res3control), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog")
+ if gc_ldb is not None:
+ res3control = gc_ldb.search(self.base_dn, expression="(&(cn=ldaptestuser)(objectCategory=PerSon))", scope=SCOPE_SUBTREE, attrs=["cn"], controls=["search_options:1:2"])
+ self.assertEquals(len(res3control), 1, "Could not find (&(cn=ldaptestuser)(objectCategory=PerSon)) in Global Catalog")
- self.assertEquals(res[0].dn, res3control[0].dn)
+ self.assertEquals(res[0].dn, res3control[0].dn)
ldb.delete(res[0].dn)
self.delete_force(self.ldb, "cn=ldaptestuser3,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestuser4,cn=ldaptestcontainer2," + self.base_dn)
+ self.delete_force(self.ldb, "cn=ldaptestuser5,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestgroup2,cn=users," + self.base_dn)
self.delete_force(self.ldb, "cn=ldaptestcomputer,cn=computers," + self.base_dn)
self.assertFalse("objectClasses" in res[0])
self.assertFalse("attributeTypes" in res[0])
+
def test_schemaUpdateNow(self):
"""Testing schemaUpdateNow"""
- class_name = "test-class" + time.strftime("%s", time.gmtime())
+ attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
+ attr_ldap_display_name = attr_name.replace("-", "")
+
+ ldif = """
+dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
+objectClass: top
+objectClass: attributeSchema
+adminDescription: """ + attr_name + """
+adminDisplayName: """ + attr_name + """
+cn: """ + attr_name + """
+attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
+attributeSyntax: 2.5.5.12
+omSyntax: 64
+instanceType: 4
+isSingleValued: TRUE
+systemOnly: FALSE
+"""
+ self.ldb.add_ldif(ldif)
+
+ class_name = "test-Class" + time.strftime("%s", time.gmtime())
class_ldap_display_name = class_name.replace("-", "")
- object_name = "obj" + time.strftime("%s", time.gmtime())
ldif = """
dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
-lDAPDisplayName: """ + class_ldap_display_name + """
objectClass: top
objectClass: classSchema
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-objectCategory: CN=Class-Schema,""" + self.schema_dn + """
-defaultObjectCategory: CN=%s,%s""" % (class_name, self.schema_dn) + """
-distinguishedName: CN=%s,%s""" % (class_name, self.schema_dn) + """
-governsID: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
+governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
instanceType: 4
-name: """ + class_name + """
objectClassCategory: 1
subClassOf: organizationalPerson
systemFlags: 16
rDNAttID: cn
systemMustContain: cn
+systemMustContain: """ + attr_ldap_display_name + """
systemOnly: FALSE
"""
self.ldb.add_ldif(ldif)
+
ldif = """
dn:
changetype: modify
schemaUpdateNow: 1
"""
self.ldb.modify_ldif(ldif)
+
+ object_name = "obj" + time.strftime("%s", time.gmtime())
+
ldif = """
dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
objectClass: organizationalPerson
objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
name: """ + object_name + """
+""" + attr_ldap_display_name + """: test
"""
self.ldb.add_ldif(ldif)
- # Search for created objectClass
+
+ # Search for created attribute
+ res = []
+ res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
+ self.assertTrue("schemaIDGUID" in res[0])
+
+ # Search for created objectclass
res = []
res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
- self.assertNotEqual(res, [])
+ self.assertEquals(len(res), 1)
+ self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
+ self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
+ self.assertTrue("schemaIDGUID" in res[0])
+ # Search for created object
res = []
res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
- self.assertNotEqual(res, [])
+ self.assertEquals(len(res), 1)
# Delete the object
self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
if not "://" in host:
- host = "ldap://%s" % host
+ if os.path.isfile(host):
+ host = "tdb://%s" % host
+ else:
+ host = "ldap://%s" % host
ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
-gc_ldb = Ldb("%s:3268" % host, credentials=creds,
- session_info=system_session(), lp=lp)
+if not "tdb://" in host:
+ gc_ldb = Ldb("%s:3268" % host, credentials=creds,
+ session_info=system_session(), lp=lp)
+else:
+ gc_ldb = None
runner = SubunitTestRunner()
rc = 0