PY3 = sys.version_info > (3, 0)
+TDB_PREFIX = "tdb://"
+MDB_PREFIX = "mdb://"
+
+MDB_INDEX_OBJ = {
+ "dn": "@INDEXLIST",
+ "@IDXONE": [b"1"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]
+}
def tempdir():
import tempfile
encoded2 = ldb.binary_encode('test\\x')
self.assertEqual(encoded2, encoded)
-class SimpleLdb(TestCase):
+
+class LdbBaseTest(TestCase):
+ def setUp(self):
+ super(LdbBaseTest, self).setUp()
+ try:
+ if self.prefix is None:
+ self.prefix = TDB_PREFIX
+ except AttributeError:
+ self.prefix = TDB_PREFIX
+
+ def tearDown(self):
+ super(LdbBaseTest, self).tearDown()
+
+ def url(self):
+ return self.prefix + self.filename
+
+ def flags(self):
+ if self.prefix == MDB_PREFIX:
+ return ldb.FLG_NOSYNC
+ else:
+ return 0
+
+
+class SimpleLdb(LdbBaseTest):
def setUp(self):
super(SimpleLdb, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "test.ldb")
- self.ldb = ldb.Ldb(self.filename)
+ self.ldb = ldb.Ldb(self.url(), flags=self.flags())
+ try:
+ self.ldb.add(self.index)
+ except AttributeError:
+ pass
def tearDown(self):
shutil.rmtree(self.testdir)
super(SimpleLdb, self).tearDown()
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.ldb)
def test_connect(self):
- ldb.Ldb(self.filename)
+ ldb.Ldb(self.url(), flags=self.flags())
def test_connect_none(self):
ldb.Ldb()
def test_connect_later(self):
x = ldb.Ldb()
- x.connect(self.filename)
+ x.connect(self.url(), flags=self.flags())
def test_repr(self):
x = ldb.Ldb()
self.assertEqual([], x.modules())
def test_modules_tdb(self):
- x = ldb.Ldb(self.filename)
+ x = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
def test_firstmodule_none(self):
self.assertEqual(x.firstmodule, None)
def test_firstmodule_tdb(self):
- x = ldb.Ldb(self.filename)
+ x = ldb.Ldb(self.url(), flags=self.flags())
mod = x.firstmodule
self.assertEqual(repr(mod), "<ldb module 'tdb'>")
def test_search(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search()), 0)
def test_search_controls(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
def test_search_attrs(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_string_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_attr_string(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(TypeError, l.search, attrs="dc")
self.assertRaises(TypeError, l.search, attrs=b"dc")
def test_opaque(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.set_opaque("my_opaque", l)
self.assertTrue(l.get_opaque("my_opaque") is not None)
self.assertEqual(None, l.get_opaque("unknown"))
def test_search_scope_base_empty_db(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_BASE)), 0)
def test_search_scope_onelevel_empty_db(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_ONELEVEL)), 0)
def test_delete(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
def test_delete_w_unhandled_ctrl(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo1")
m["b"] = [b"a"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
l.delete(m.dn)
def test_contains(self):
- name = self.filename
- l = ldb.Ldb(name)
+ name = self.url()
+ l = ldb.Ldb(name, flags=self.flags())
self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
- l = ldb.Ldb(name)
+ l = ldb.Ldb(name, flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo3")
m["b"] = ["a"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
l.delete(m.dn)
def test_get_config_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_config_basedn())
def test_get_root_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_root_basedn())
def test_get_schema_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_schema_basedn())
def test_get_default_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_default_basedn())
def test_add(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
+ m["objectUUID"] = b"0123456789abcdef"
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo4"))
def test_search_iterator(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
s = l.search_iterator()
s.abandon()
try:
m1 = ldb.Message()
m1.dn = ldb.Dn(l, "dc=foo4")
m1["bla"] = b"bla"
+ m1["objectUUID"] = b"0123456789abcdef"
l.add(m1)
try:
s = l.search_iterator()
m2 = ldb.Message()
m2.dn = ldb.Dn(l, "dc=foo5")
m2["bla"] = b"bla"
+ m2["objectUUID"] = b"0123456789abcdee"
l.add(m2)
s = l.search_iterator()
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = "bla"
+ m["objectUUID"] = b"0123456789abcdef"
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo4"))
def test_add_w_unhandled_ctrl(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
def test_add_dict(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": ldb.Dn(l, "dc=foo5"),
- "bla": b"bla"}
+ "bla": b"bla",
+ "objectUUID": b"0123456789abcdef"}
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": ldb.Dn(l, "dc=foo5"),
- "bla": "bla"}
+ "bla": "bla",
+ "objectUUID": b"0123456789abcdef"}
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_string_dn(self):
- l = ldb.Ldb(self.filename)
- m = {"dn": "dc=foo6", "bla": b"bla"}
+ l = ldb.Ldb(self.url(), flags=self.flags())
+ m = {"dn": "dc=foo6", "bla": b"bla",
+ "objectUUID": b"0123456789abcdef"}
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo6"))
def test_add_dict_bytes_dn(self):
- l = ldb.Ldb(self.filename)
- m = {"dn": b"dc=foo6", "bla": b"bla"}
+ l = ldb.Ldb(self.url(), flags=self.flags())
+ m = {"dn": b"dc=foo6", "bla": b"bla",
+ "objectUUID": b"0123456789abcdef"}
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=foo6"))
def test_rename(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo7")
m["bla"] = b"bla"
+ m["objectUUID"] = b"0123456789abcdef"
self.assertEqual(len(l.search()), 0)
l.add(m)
try:
l.delete(ldb.Dn(l, "dc=bar"))
def test_rename_string_dns(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo8")
m["bla"] = b"bla"
+ m["objectUUID"] = b"0123456789abcdef"
self.assertEqual(len(l.search()), 0)
l.add(m)
self.assertEqual(len(l.search()), 1)
l.delete(ldb.Dn(l, "dc=bar"))
def test_empty_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(0, len(l.search()))
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=empty")
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
rm = l.search()
self.assertEqual(1, len(rm))
- self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
+ self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+ set(rm[0].keys()))
rm = l.search(m.dn)
self.assertEqual(1, len(rm))
- self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
+ self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+ set(rm[0].keys()))
rm = l.search(m.dn, attrs=["blah"])
self.assertEqual(1, len(rm))
self.assertEqual(0, len(rm[0]))
def test_modify_delete(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m["bla"] = [b"1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
rm = l.search(m.dn)[0]
self.assertEqual([b"1234"], list(rm["bla"]))
l.modify(m)
rm = l.search(m.dn)
self.assertEqual(1, len(rm))
- self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
+ self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+ set(rm[0].keys()))
rm = l.search(m.dn, attrs=["bla"])
self.assertEqual(1, len(rm))
self.assertEqual(0, len(rm[0]))
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_delete_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m.text["bla"] = ["1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
rm = l.search(m.dn)[0]
self.assertEqual(["1234"], list(rm.text["bla"]))
l.modify(m)
rm = l.search(m.dn)
self.assertEqual(1, len(rm))
- self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
+ self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+ set(rm[0].keys()))
rm = l.search(m.dn, attrs=["bla"])
self.assertEqual(1, len(rm))
self.assertEqual(0, len(rm[0]))
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_add(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual([b"1234", b"456"], list(rm["bla"]))
finally:
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_add_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual(["1234", "456"], list(rm.text["bla"]))
finally:
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_replace(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m["bla"] = [b"1234", b"456"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual([b"789"], list(rm["bla"]))
rm = l.search(m.dn, attrs=["bla"])[0]
self.assertEqual(1, len(rm))
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_replace_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m.text["bla"] = ["1234", "456"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual(["789"], list(rm.text["bla"]))
rm = l.search(m.dn, attrs=["bla"])[0]
self.assertEqual(1, len(rm))
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_flags_change(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual([b"1234", b"456"], list(rm["bla"]))
# Now create another modify, but switch the flags before we do it
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_flags_change_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
try:
m = ldb.Message()
self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
l.modify(m)
rm = l.search(m.dn)[0]
- self.assertEqual(2, len(rm))
+ self.assertEqual(3, len(rm))
self.assertEqual(["1234", "456"], list(rm.text["bla"]))
# Now create another modify, but switch the flags before we do it
l.delete(ldb.Dn(l, "dc=add"))
def test_transaction_commit(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo9"))
m["foo"] = [b"bar"]
+ m["objectUUID"] = b"0123456789abcdef"
l.add(m)
l.transaction_commit()
l.delete(m.dn)
def test_transaction_cancel(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo10"))
m["foo"] = [b"bar"]
+ m["objectUUID"] = b"0123456789abcdee"
l.add(m)
l.transaction_cancel()
self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
def test_set_debug(self):
def my_report_fn(level, text):
pass
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.set_debug(my_report_fn)
def test_zero_byte_string(self):
"""Testing we do not get trapped in the \0 byte in a property string."""
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.add({
"dn" : b"dc=somedn",
"objectclass" : b"user",
"cN" : b"LDAPtestUSER",
"givenname" : b"ldap",
"displayname" : b"foo\0bar",
+ "objectUUID" : b"0123456789abcdef"
})
res = l.search(expression="(dn=dc=somedn)")
self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
def test_no_crash_broken_expr(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
-class SearchTests(TestCase):
+# Run the SimpleLdb tests against an lmdb backend
+class SimpleLdbLmdb(SimpleLdb):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ self.index = MDB_INDEX_OBJ
+ super(SimpleLdbLmdb, self).setUp()
+
+ def tearDown(self):
+ super(SimpleLdbLmdb, self).tearDown()
+
+class SearchTests(LdbBaseTest):
def tearDown(self):
shutil.rmtree(self.testdir)
super(SearchTests, self).tearDown()
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.l)
+
+
def setUp(self):
super(SearchTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "search_test.ldb")
- self.l = ldb.Ldb(self.filename, options=["modules:rdn_name"])
+ options = ["modules:rdn_name"]
+ if hasattr(self, 'IDXCHECK'):
+ options.append("disable_full_db_scan_for_self_test:1")
+ self.l = ldb.Ldb(self.url(),
+ flags=self.flags(),
+ options=options)
+ try:
+ self.l.add(self.index)
+ except AttributeError:
+ pass
+
+ self.l.add({"dn": "@ATTRIBUTES",
+ "DC": "CASE_INSENSITIVE"})
- self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
+ # Note that we can't use the name objectGUID here, as we
+ # want to stay clear of the objectGUID handler in LDB and
+ # instead use just the 16 bytes raw, which we just keep
+ # to printable chars here for ease of handling.
+
+ self.l.add({"dn": "DC=SAMBA,DC=ORG",
+ "name": b"samba.org",
+ "objectUUID": b"0123456789abcdef"})
self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
"name": b"Admins",
- "x": "z", "y": "a"})
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
"name": b"Users",
- "x": "z", "y": "a"})
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
"name": b"OU #1",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde3"})
self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
"name": b"OU #2",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde4"})
self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
"name": b"OU #3",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde5"})
self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
"name": b"OU #4",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde6"})
self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
"name": b"OU #5",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde7"})
self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
"name": b"OU #6",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde8"})
self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
"name": b"OU #7",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde9"})
self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
"name": b"OU #8",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcde0"})
self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
"name": b"OU #9",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcdea"})
self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcdeb"})
self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "y", "y": "a"})
+ "x": "y", "y": "a",
+ "objectUUID": b"0123456789abcdec"})
self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "y", "y": "b"})
+ "x": "y", "y": "b",
+ "objectUUID": b"0123456789abcded"})
self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcdee"})
self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd01"})
self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd02"})
self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd03"})
self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd04"})
self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd05"})
self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd06"})
self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "b"})
+ "x": "x", "y": "b",
+ "objectUUID": b"0123456789abcd07"})
self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "c"})
+ "x": "x", "y": "c",
+ "objectUUID": b"0123456789abcd08"})
self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
"name": b"OU #10",
- "x": "x", "y": "c"})
+ "x": "x", "y": "c",
+ "objectUUID": b"0123456789abcd09"})
def test_base(self):
"""Testing a search"""
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res11), 1)
+ def test_base_lower(self):
+ """Testing a search"""
+
+ res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
+ scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res11), 1)
+
def test_base_or(self):
"""Testing a search"""
def test_check_base_error(self):
"""Testing a search"""
- self.l.add({"dn": "@OPTIONS", "checkBaseOnSearch": b"TRUE"})
+ checkbaseonsearch = {"dn": "@OPTIONS",
+ "checkBaseOnSearch": b"TRUE"}
+ try:
+ self.l.add(checkbaseonsearch)
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+ m = ldb.Message.from_dict(self.l,
+ checkbaseonsearch)
+ self.l.modify(m)
try:
res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
expression="(&(x=y)(|(y=b)(y=c)))")
self.assertEqual(len(res11), 1)
+ def test_subtree_and2_lower(self):
+ """Testing a search"""
+
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(&(x=y)(|(y=b)(y=c)))")
+ self.assertEqual(len(res11), 1)
+
def test_subtree_or(self):
"""Testing a search"""
expression="(|(x=y)(y=b))")
self.assertEqual(len(res11), 20)
+ def test_one_or2_lower(self):
+ """Testing a search"""
+
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_ONELEVEL,
+ expression="(|(x=y)(y=b))")
+ self.assertEqual(len(res11), 20)
+
+ def test_one_unindexable(self):
+ """Testing a search"""
+
+ try:
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_ONELEVEL,
+ expression="(y=b*)")
+ if hasattr(self, 'IDX') and \
+ not hasattr(self, 'IDXONE') and \
+ hasattr(self, 'IDXCHECK'):
+ self.fail("Should have failed as un-indexed search")
+
+ self.assertEqual(len(res11), 9)
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ estr = err.args[1]
+ self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
+ self.assertIn(estr, "ldb FULL SEARCH disabled")
+
+ def test_one_unindexable_presence(self):
+ """Testing a search"""
+
+ try:
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_ONELEVEL,
+ expression="(y=*)")
+ if hasattr(self, 'IDX') and \
+ not hasattr(self, 'IDXONE') and \
+ hasattr(self, 'IDXCHECK'):
+ self.fail("Should have failed as un-indexed search")
+
+ self.assertEqual(len(res11), 24)
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ estr = err.args[1]
+ self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
+ self.assertIn(estr, "ldb FULL SEARCH disabled")
+
+
def test_subtree_and_or(self):
"""Testing a search"""
expression="(&(ou=ouX)(y=a))")
self.assertEqual(len(res11), 0)
+ def test_subtree_and_idx_record(self):
+ """Testing a search against the index record"""
+
+ res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(@IDXDN=DC=SAMBA,DC=ORG)")
+ self.assertEqual(len(res11), 0)
+
+ def test_subtree_and_idxone_record(self):
+ """Testing a search against the index record"""
+
+ res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(@IDXONE=DC=SAMBA,DC=ORG)")
+ self.assertEqual(len(res11), 0)
+
+ def test_subtree_unindexable(self):
+ """Testing a search"""
+
+ try:
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(y=b*)")
+ if hasattr(self, 'IDX') and \
+ hasattr(self, 'IDXCHECK'):
+ self.fail("Should have failed as un-indexed search")
+
+ self.assertEqual(len(res11), 9)
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ estr = err.args[1]
+ self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
+ self.assertIn(estr, "ldb FULL SEARCH disabled")
+
+ def test_subtree_unindexable_presence(self):
+ """Testing a search"""
+
+ try:
+ res11 = self.l.search(base="DC=samba,DC=org",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(y=*)")
+ if hasattr(self, 'IDX') and \
+ hasattr(self, 'IDXCHECK'):
+ self.fail("Should have failed as un-indexed search")
+
+ self.assertEqual(len(res11), 24)
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ estr = err.args[1]
+ self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
+ self.assertIn(estr, "ldb FULL SEARCH disabled")
+
+
+ def test_dn_filter_one(self):
+ """Testing that a dn= filter succeeds
+ (or fails with disallowDNFilter
+ set and IDXGUID or (IDX and not IDXONE) mode)
+ when the scope is SCOPE_ONELEVEL.
+
+ This should be made more consistent, but for now lock in
+ the behaviour
+
+ """
+
+ res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_ONELEVEL,
+ expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+ if hasattr(self, 'disallowDNFilter') and \
+ hasattr(self, 'IDX') and \
+ (hasattr(self, 'IDXGUID') or \
+ ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
+ self.assertEqual(len(res11), 0)
+ else:
+ self.assertEqual(len(res11), 1)
+
+ def test_dn_filter_subtree(self):
+ """Testing that a dn= filter succeeds
+ (or fails with disallowDNFilter set)
+ when the scope is SCOPE_SUBTREE"""
+
+ res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+ if hasattr(self, 'disallowDNFilter') \
+ and hasattr(self, 'IDX'):
+ self.assertEqual(len(res11), 0)
+ else:
+ self.assertEqual(len(res11), 1)
+
+ def test_dn_filter_base(self):
+ """Testing that (incorrectly) a dn= filter works
+ when the scope is SCOPE_BASE"""
+
+ res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_BASE,
+ expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+
+ # At some point we should fix this, but it isn't trivial
+ self.assertEqual(len(res11), 1)
+
+
+# Run the search tests against an lmdb backend
+class SearchTestsLmdb(SearchTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ self.index = MDB_INDEX_OBJ
+ super(SearchTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(SearchTestsLmdb, self).tearDown()
+
class IndexedSearchTests(SearchTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
def setUp(self):
super(IndexedSearchTests, self).setUp()
+ self.l.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"]})
+ self.IDX = True
+
+class IndexedCheckSearchTests(IndexedSearchTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things (full scan disabled)"""
+ def setUp(self):
+ self.IDXCHECK = True
+ super(IndexedCheckSearchTests, self).setUp()
+
+class IndexedSearchDnFilterTests(SearchTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things"""
+ def setUp(self):
+ super(IndexedSearchDnFilterTests, self).setUp()
+ self.l.add({"dn": "@OPTIONS",
+ "disallowDNFilter": "TRUE"})
+ self.disallowDNFilter = True
+
+ self.l.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"]})
+ self.IDX = True
+
+class IndexedAndOneLevelSearchTests(SearchTests):
+ """Test searches using the index including @IDXONE, to ensure
+ the index doesn't break things"""
+ def setUp(self):
+ super(IndexedAndOneLevelSearchTests, self).setUp()
self.l.add({"dn": "@INDEXLIST",
"@IDXATTR": [b"x", b"y", b"ou"],
"@IDXONE": [b"1"]})
+ self.IDX = True
+ self.IDXONE = True
+class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
+ """Test searches using the index including @IDXONE, to ensure
+ the index doesn't break things (full scan disabled)"""
+ def setUp(self):
+ self.IDXCHECK = True
+ super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
+class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
+ """Test searches using the index including @IDXONE, to ensure
+ the index doesn't break things"""
+ def setUp(self):
+ super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
+ self.l.add({"dn": "@OPTIONS",
+ "disallowDNFilter": "TRUE"})
+ self.disallowDNFilter = True
-class DnTests(TestCase):
+ self.l.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXONE": [b"1"]})
+ self.IDX = True
+ self.IDXONE = True
+class GUIDIndexedSearchTests(SearchTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things"""
def setUp(self):
- super(DnTests, self).setUp()
+ self.index = {"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]}
+ super(GUIDIndexedSearchTests, self).setUp()
+
+ self.IDXGUID = True
+ self.IDXONE = True
+
+
+class GUIDIndexedDNFilterSearchTests(SearchTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things"""
+ def setUp(self):
+ self.index = {"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]}
+ super(GUIDIndexedDNFilterSearchTests, self).setUp()
+ self.l.add({"dn": "@OPTIONS",
+ "disallowDNFilter": "TRUE"})
+ self.disallowDNFilter = True
+ self.IDX = True
+ self.IDXGUID = True
+
+class GUIDAndOneLevelIndexedSearchTests(SearchTests):
+ """Test searches using the index including @IDXONE, to ensure
+ the index doesn't break things"""
+ def setUp(self):
+ self.index = {"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]}
+ super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
+ self.l.add({"dn": "@OPTIONS",
+ "disallowDNFilter": "TRUE"})
+ self.disallowDNFilter = True
+ self.IDX = True
+ self.IDXGUID = True
+ self.IDXONE = True
+
+class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ super(GUIDIndexedSearchTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(GUIDIndexedSearchTestsLmdb, self).tearDown()
+
+
+class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
+
+
+class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
+
+
+class AddModifyTests(LdbBaseTest):
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(AddModifyTests, self).tearDown()
+
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.l)
+
+ def setUp(self):
+ super(AddModifyTests, self).setUp()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "add_test.ldb")
+ self.l = ldb.Ldb(self.url(),
+ flags=self.flags(),
+ options=["modules:rdn_name"])
+ try:
+ self.l.add(self.index)
+ except AttributeError:
+ pass
+
+ self.l.add({"dn": "DC=SAMBA,DC=ORG",
+ "name": b"samba.org",
+ "objectUUID": b"0123456789abcdef"})
+ self.l.add({"dn": "@ATTRIBUTES",
+ "objectUUID": "UNIQUE_INDEX"})
+
+ def test_add_dup(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ try:
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+ self.fail("Should have failed adding dupliate entry")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+
+ def test_add_del_add(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+
+ def test_add_move_add(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+
+ def test_add_move_fail_move_move(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+
+ res2 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(objectUUID=0123456789abcde1)")
+ self.assertEqual(len(res2), 1)
+ self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
+
+ res3 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(objectUUID=0123456789abcde2)")
+ self.assertEqual(len(res3), 1)
+ self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
+
+ try:
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+ self.fail("Should have failed on duplicate DN")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+
+ self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
+ "OU=DUP3,DC=SAMBA,DC=ORG")
+
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+
+ res2 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(objectUUID=0123456789abcde1)")
+ self.assertEqual(len(res2), 1)
+ self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
+
+ res3 = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(objectUUID=0123456789abcde2)")
+ self.assertEqual(len(res3), 1)
+ self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
+
+ def test_move_missing(self):
+ try:
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+ self.fail("Should have failed on missing")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ def test_move_missing2(self):
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+
+ try:
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+ self.fail("Should have failed on missing")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
+
+ def test_move_fail_move_add(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+ try:
+ self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
+ "OU=DUP2,DC=SAMBA,DC=ORG")
+ self.fail("Should have failed on duplicate DN")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+
+ self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
+ "OU=DUP3,DC=SAMBA,DC=ORG")
+
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde3"})
+
+
+class AddModifyTestsLmdb(AddModifyTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ self.index = MDB_INDEX_OBJ
+ super(AddModifyTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(AddModifyTestsLmdb, self).tearDown()
+
+class IndexedAddModifyTests(AddModifyTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things"""
+ def setUp(self):
+ if not hasattr(self, 'index'):
+ self.index = {"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
+ "@IDXONE": [b"1"]}
+ super(IndexedAddModifyTests, self).setUp()
+
+ def test_duplicate_GUID(self):
+ try:
+ self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcdef"})
+ self.fail("Should have failed adding dupliate GUID")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
+
+ def test_duplicate_name_dup_GUID(self):
+ self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"a123456789abcdef"})
+ try:
+ self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"a123456789abcdef"})
+ self.fail("Should have failed adding dupliate GUID")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+
+ def test_duplicate_name_dup_GUID2(self):
+ self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"abc3456789abcdef"})
+ try:
+ self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"aaa3456789abcdef"})
+ self.fail("Should have failed adding dupliate DN")
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+
+ # Checking the GUID didn't stick in the index
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"aaa3456789abcdef"})
+
+ def test_add_dup_guid_add(self):
+ self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ try:
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde1"})
+ self.fail("Should have failed on duplicate GUID")
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
+
+ self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
+ "name": b"Admins",
+ "x": "z", "y": "a",
+ "objectUUID": b"0123456789abcde2"})
+
+class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
+ """Test searches using the index, to ensure the index doesn't
+ break things"""
+ def setUp(self):
+ self.index = {"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXONE": [b"1"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]}
+ super(GUIDIndexedAddModifyTests, self).setUp()
+
+
+class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
+ """Test GUID index behaviour insdie the transaction"""
+ def setUp(self):
+ super(GUIDTransIndexedAddModifyTests, self).setUp()
+ self.l.transaction_start()
+
+ def tearDown(self):
+ self.l.transaction_commit()
+ super(GUIDTransIndexedAddModifyTests, self).tearDown()
+
+class TransIndexedAddModifyTests(IndexedAddModifyTests):
+ """Test index behaviour insdie the transaction"""
+ def setUp(self):
+ super(TransIndexedAddModifyTests, self).setUp()
+ self.l.transaction_start()
+
+ def tearDown(self):
+ self.l.transaction_commit()
+ super(TransIndexedAddModifyTests, self).tearDown()
+
+class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ super(GuidIndexedAddModifyTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
+
+class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
+
+class BadIndexTests(LdbBaseTest):
+ def setUp(self):
+ super(BadIndexTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "test.ldb")
- self.ldb = ldb.Ldb(self.filename)
+ self.ldb = ldb.Ldb(self.url(), flags=self.flags())
+ if hasattr(self, 'IDXGUID'):
+ self.ldb.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]})
+ else:
+ self.ldb.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"x", b"y", b"ou"]})
+
+ super(BadIndexTests, self).setUp()
+
+ def test_unique(self):
+ self.ldb.add({"dn": "x=x,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde1",
+ "y": "1"})
+ self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde2",
+ "y": "1"})
+ self.ldb.add({"dn": "x=z,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde3",
+ "y": "1"})
+
+ res = self.ldb.search(expression="(y=1)",
+ base="dc=samba,dc=org")
+ self.assertEquals(len(res), 3)
+
+ # Now set this to unique index, but forget to check the result
+ try:
+ self.ldb.add({"dn": "@ATTRIBUTES",
+ "y": "UNIQUE_INDEX"})
+ self.fail()
+ except ldb.LdbError:
+ pass
+
+ # We must still have a working index
+ res = self.ldb.search(expression="(y=1)",
+ base="dc=samba,dc=org")
+ self.assertEquals(len(res), 3)
+
+ def test_unique_transaction(self):
+ self.ldb.add({"dn": "x=x,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde1",
+ "y": "1"})
+ self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde2",
+ "y": "1"})
+ self.ldb.add({"dn": "x=z,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde3",
+ "y": "1"})
+
+ res = self.ldb.search(expression="(y=1)",
+ base="dc=samba,dc=org")
+ self.assertEquals(len(res), 3)
+
+ self.ldb.transaction_start()
+
+ # Now set this to unique index, but forget to check the result
+ try:
+ self.ldb.add({"dn": "@ATTRIBUTES",
+ "y": "UNIQUE_INDEX"})
+ except ldb.LdbError:
+ pass
+
+ try:
+ self.ldb.transaction_commit()
+ self.fail()
+
+ except ldb.LdbError as err:
+ enum = err.args[0]
+ self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
+
+ # We must still have a working index
+ res = self.ldb.search(expression="(y=1)",
+ base="dc=samba,dc=org")
+
+ self.assertEquals(len(res), 3)
+
+ def test_casefold(self):
+ self.ldb.add({"dn": "x=x,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde1",
+ "y": "a"})
+ self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde2",
+ "y": "A"})
+ self.ldb.add({"dn": "x=z,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde3",
+ "y": ["a", "A"]})
+
+ res = self.ldb.search(expression="(y=a)",
+ base="dc=samba,dc=org")
+ self.assertEquals(len(res), 2)
+
+ self.ldb.add({"dn": "@ATTRIBUTES",
+ "y": "CASE_INSENSITIVE"})
+
+ # We must still have a working index
+ res = self.ldb.search(expression="(y=a)",
+ base="dc=samba,dc=org")
+
+ if hasattr(self, 'IDXGUID'):
+ self.assertEquals(len(res), 3)
+ else:
+ # We should not return this entry twice, but sadly
+ # we have not yet fixed
+ # https://bugzilla.samba.org/show_bug.cgi?id=13361
+ self.assertEquals(len(res), 4)
+
+ def test_casefold_transaction(self):
+ self.ldb.add({"dn": "x=x,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde1",
+ "y": "a"})
+ self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde2",
+ "y": "A"})
+ self.ldb.add({"dn": "x=z,dc=samba,dc=org",
+ "objectUUID": b"0123456789abcde3",
+ "y": ["a", "A"]})
+
+ res = self.ldb.search(expression="(y=a)",
+ base="dc=samba,dc=org")
+ self.assertEquals(len(res), 2)
+
+ self.ldb.transaction_start()
+
+ self.ldb.add({"dn": "@ATTRIBUTES",
+ "y": "CASE_INSENSITIVE"})
+
+ self.ldb.transaction_commit()
+
+ # We must still have a working index
+ res = self.ldb.search(expression="(y=a)",
+ base="dc=samba,dc=org")
+
+ if hasattr(self, 'IDXGUID'):
+ self.assertEquals(len(res), 3)
+ else:
+ # We should not return this entry twice, but sadly
+ # we have not yet fixed
+ # https://bugzilla.samba.org/show_bug.cgi?id=13361
+ self.assertEquals(len(res), 4)
+
+
+ def tearDown(self):
+ super(BadIndexTests, self).tearDown()
+
+
+class GUIDBadIndexTests(BadIndexTests):
+ """Test Bad index things with GUID index mode"""
+ def setUp(self):
+ self.IDXGUID = True
+
+ super(GUIDBadIndexTests, self).setUp()
+
+class DnTests(TestCase):
+
+ def setUp(self):
+ super(DnTests, self).setUp()
+ self.ldb = ldb.Ldb()
def tearDown(self):
- shutil.rmtree(self.testdir)
super(DnTests, self).tearDown()
+ del(self.ldb)
def test_set_dn_invalid(self):
x = ldb.Message()
dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
+ self.assertTrue(dn1.is_child_of(dn1))
self.assertTrue(dn2.is_child_of(dn1))
self.assertTrue(dn4.is_child_of(dn1))
self.assertTrue(dn4.is_child_of(dn3))
+ self.assertTrue(dn4.is_child_of(dn4))
self.assertFalse(dn3.is_child_of(dn2))
self.assertFalse(dn1.is_child_of(dn4))
dn3 = ldb.Dn(self.ldb, dn3_str)
dn4 = ldb.Dn(self.ldb, dn4_str)
+ self.assertTrue(dn1.is_child_of(dn1_str))
self.assertTrue(dn2.is_child_of(dn1_str))
self.assertTrue(dn4.is_child_of(dn1_str))
self.assertTrue(dn4.is_child_of(dn3_str))
+ self.assertTrue(dn4.is_child_of(dn4_str))
self.assertFalse(dn3.is_child_of(dn2_str))
self.assertFalse(dn1.is_child_of(dn4_str))
def setUp(self):
super(LdbMsgTests, self).setUp()
self.msg = ldb.Message()
- self.testdir = tempdir()
- self.filename = os.path.join(self.testdir, "test.ldb")
-
- def tearDown(self):
- shutil.rmtree(self.testdir)
- super(LdbMsgTests, self).tearDown()
def test_init_dn(self):
self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
def test_iter_items(self):
self.assertEqual(0, len(self.msg.items()))
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo28")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
self.assertEqual(1, len(self.msg.items()))
def test_repr(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo29")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
self.msg["dc"] = b"foo"
if PY3:
self.assertIn(repr(self.msg), [
self.assertEqual(["bar"], list(self.msg.text["foo"]))
def test_keys(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.msg["foo"] = [b"bla"]
self.msg["bar"] = [b"bla"]
- self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
+ self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
def test_keys_text(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.msg["foo"] = ["bla"]
self.msg["bar"] = ["bla"]
- self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
+ self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
def test_dn(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.assertEqual("@BASEINFO", self.msg.dn.__str__())
def test_get_dn(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
def test_dn_text(self):
- self.msg.text.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.assertEqual("@BASEINFO", str(self.msg.dn))
self.assertEqual("@BASEINFO", str(self.msg.text.dn))
def test_get_dn_text(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
def test_get_invalid(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
self.assertRaises(TypeError, self.msg.get, 42)
def test_get_other(self):
self.assertEqual(msg1, msg2)
def test_equal_simplel(self):
- db = ldb.Ldb(self.filename)
+ db = ldb.Ldb()
msg1 = ldb.Message()
msg1.dn = ldb.Dn(db, "foo=bar")
msg2 = ldb.Message()
l = ldb.Ldb(self.filename)
self.assertEqual(["init"], ops)
-class LdbResultTests(TestCase):
+class LdbResultTests(LdbBaseTest):
def setUp(self):
super(LdbResultTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "test.ldb")
- self.l = ldb.Ldb(self.filename)
- self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
- self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
- self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
- self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
- self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
- self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
- self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
- self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
- self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
- self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
- self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
- self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
- self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
+ self.l = ldb.Ldb(self.url(), flags=self.flags())
+ try:
+ self.l.add(self.index)
+ except AttributeError:
+ pass
+ self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
+ "objectUUID": b"0123456789abcde0"})
+ self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
+ "objectUUID": b"0123456789abcde1"})
+ self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
+ "objectUUID": b"0123456789abcde2"})
+ self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
+ "objectUUID": b"0123456789abcde3"})
+ self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
+ "objectUUID": b"0123456789abcde4"})
+ self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
+ "objectUUID": b"0123456789abcde5"})
+ self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
+ "objectUUID": b"0123456789abcde6"})
+ self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
+ "objectUUID": b"0123456789abcde7"})
+ self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
+ "objectUUID": b"0123456789abcde8"})
+ self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
+ "objectUUID": b"0123456789abcde9"})
+ self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
+ "objectUUID": b"0123456789abcdea"})
+ self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
+ "objectUUID": b"0123456789abcdeb"})
+ self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
+ "objectUUID": b"0123456789abcdec"})
def tearDown(self):
shutil.rmtree(self.testdir)
super(LdbResultTests, self).tearDown()
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.l)
def test_return_type(self):
res = self.l.search()
del(self.l)
gc.collect()
- child_ldb = ldb.Ldb(self.filename)
+ child_ldb = ldb.Ldb(self.url(), flags=self.flags())
# start a transaction
child_ldb.transaction_start()
# write to it
child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
- "name": b"samba.org"})
+ "name": b"samba.org",
+ "objectUUID": b"o123456789acbdef"})
os.write(w1, b"added")
del(self.l)
gc.collect()
- child_ldb = ldb.Ldb(self.filename)
+ child_ldb = ldb.Ldb(self.url(), flags=self.flags())
# start a transaction
child_ldb.transaction_start()
# write to it
child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
- "name": b"samba.org"})
+ "name": b"samba.org",
+ "objectUUID": b"o123456789acbdef"})
os.write(w1, b"added")
self.assertEqual(got_pid, pid)
+class LdbResultTestsLmdb(LdbResultTests):
+
+ def setUp(self):
+ self.prefix = MDB_PREFIX
+ self.index = MDB_INDEX_OBJ
+ super(LdbResultTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(LdbResultTestsLmdb, self).tearDown()
+
+
class BadTypeTests(TestCase):
def test_control(self):
l = ldb.Ldb()