PY3 = sys.version_info > (3, 0)
+TDB_PREFIX = "tdb://"
+MDB_PREFIX = "mdb://"
+
def tempdir():
import tempfile
encoded2 = ldb.binary_encode('test\\x')
self.assertEqual(encoded2, encoded)
-class SimpleLdb(TestCase):
+
+class LdbBaseTest(TestCase):
+ def setUp(self):
+ super(LdbBaseTest, self).setUp()
+ try:
+ if self.prefix is None:
+ self.prefix = TDB_PREFIX
+ except AttributeError:
+ self.prefix = TDB_PREFIX
+
+ def tearDown(self):
+ super(LdbBaseTest, self).tearDown()
+
+ def url(self):
+ return self.prefix + self.filename
+
+ def flags(self):
+ if self.prefix == MDB_PREFIX:
+ return ldb.FLG_NOSYNC
+ else:
+ return 0
+
+
+class SimpleLdb(LdbBaseTest):
def setUp(self):
super(SimpleLdb, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "test.ldb")
- self.ldb = ldb.Ldb(self.filename)
+ self.ldb = ldb.Ldb(self.url(), flags=self.flags())
def tearDown(self):
shutil.rmtree(self.testdir)
# Ensure the LDB is closed now, so we close the FD
del(self.ldb)
-
def test_connect(self):
- ldb.Ldb(self.filename)
+ ldb.Ldb(self.url(), flags=self.flags())
def test_connect_none(self):
ldb.Ldb()
def test_connect_later(self):
x = ldb.Ldb()
- x.connect(self.filename)
+ x.connect(self.url(), flags=self.flags())
def test_repr(self):
x = ldb.Ldb()
self.assertEqual([], x.modules())
def test_modules_tdb(self):
- x = ldb.Ldb(self.filename)
+ x = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
def test_firstmodule_none(self):
self.assertEqual(x.firstmodule, None)
def test_firstmodule_tdb(self):
- x = ldb.Ldb(self.filename)
+ x = ldb.Ldb(self.url(), flags=self.flags())
mod = x.firstmodule
self.assertEqual(repr(mod), "<ldb module 'tdb'>")
def test_search(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search()), 0)
def test_search_controls(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
def test_search_attrs(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_string_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_attr_string(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(TypeError, l.search, attrs="dc")
self.assertRaises(TypeError, l.search, attrs=b"dc")
def test_opaque(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.set_opaque("my_opaque", l)
self.assertTrue(l.get_opaque("my_opaque") is not None)
self.assertEqual(None, l.get_opaque("unknown"))
def test_search_scope_base_empty_db(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_BASE)), 0)
def test_search_scope_onelevel_empty_db(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_ONELEVEL)), 0)
def test_delete(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
def test_delete_w_unhandled_ctrl(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo1")
m["b"] = [b"a"]
l.delete(m.dn)
def test_contains(self):
- name = self.filename
- l = ldb.Ldb(name)
+ name = self.url()
+ l = ldb.Ldb(name, flags=self.flags())
self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
- l = ldb.Ldb(name)
+ l = ldb.Ldb(name, flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo3")
m["b"] = ["a"]
l.delete(m.dn)
def test_get_config_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_config_basedn())
def test_get_root_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_root_basedn())
def test_get_schema_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_schema_basedn())
def test_get_default_basedn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(None, l.get_default_basedn())
def test_add(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=foo4"))
def test_search_iterator(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
s = l.search_iterator()
s.abandon()
try:
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = "bla"
l.delete(ldb.Dn(l, "dc=foo4"))
def test_add_w_unhandled_ctrl(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
def test_add_dict(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": ldb.Dn(l, "dc=foo5"),
"bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": ldb.Dn(l, "dc=foo5"),
"bla": "bla"}
self.assertEqual(len(l.search()), 0)
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_string_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": "dc=foo6", "bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.add(m)
l.delete(ldb.Dn(l, "dc=foo6"))
def test_add_dict_bytes_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = {"dn": b"dc=foo6", "bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.add(m)
l.delete(ldb.Dn(l, "dc=foo6"))
def test_rename(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo7")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=bar"))
def test_rename_string_dns(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo8")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=bar"))
def test_empty_dn(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertEqual(0, len(l.search()))
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=empty")
self.assertEqual(0, len(rm[0]))
def test_modify_delete(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_delete_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_add(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_add_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_replace(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m["bla"] = [b"1234", b"456"]
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_replace_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m.text["bla"] = ["1234", "456"]
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_flags_change(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_flags_change_text(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_transaction_commit(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo9"))
m["foo"] = [b"bar"]
l.delete(m.dn)
def test_transaction_cancel(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo10"))
m["foo"] = [b"bar"]
def test_set_debug(self):
def my_report_fn(level, text):
pass
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.set_debug(my_report_fn)
def test_zero_byte_string(self):
"""Testing we do not get trapped in the \0 byte in a property string."""
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
l.add({
"dn" : b"dc=somedn",
"objectclass" : b"user",
self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
def test_no_crash_broken_expr(self):
- l = ldb.Ldb(self.filename)
+ l = ldb.Ldb(self.url(), flags=self.flags())
self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
-class SearchTests(TestCase):
+class SearchTests(LdbBaseTest):
def tearDown(self):
shutil.rmtree(self.testdir)
super(SearchTests, self).tearDown()
super(SearchTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "search_test.ldb")
- self.l = ldb.Ldb(self.filename, options=["modules:rdn_name"])
+ self.l = ldb.Ldb(self.url(),
+ flags=self.flags(),
+ options=["modules:rdn_name"])
self.l.add({"dn": "@ATTRIBUTES",
"DC": "CASE_INSENSITIVE"})
self.assertEqual(len(res11), 1)
-
class IndexedSearchTests(SearchTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
self.IDXGUID = True
self.IDXONE = True
+
class GUIDIndexedDNFilterSearchTests(SearchTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
self.IDXONE = True
-class AddModifyTests(TestCase):
+class AddModifyTests(LdbBaseTest):
def tearDown(self):
shutil.rmtree(self.testdir)
super(AddModifyTests, self).tearDown()
super(AddModifyTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "add_test.ldb")
- self.l = ldb.Ldb(self.filename, options=["modules:rdn_name"])
+ self.l = ldb.Ldb(self.url(),
+ flags=self.flags(),
+ options=["modules:rdn_name"])
self.l.add({"dn": "DC=SAMBA,DC=ORG",
"name": b"samba.org",
"objectUUID": b"0123456789abcdef"})
"x": "z", "y": "a",
"objectUUID": b"0123456789abcde3"})
+
class IndexedAddModifyTests(AddModifyTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
super(TransIndexedAddModifyTests, self).tearDown()
-
class DnTests(TestCase):
def setUp(self):
l = ldb.Ldb(self.filename)
self.assertEqual(["init"], ops)
-class LdbResultTests(TestCase):
+class LdbResultTests(LdbBaseTest):
def setUp(self):
super(LdbResultTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "test.ldb")
- self.l = ldb.Ldb(self.filename)
+ self.l = ldb.Ldb(self.url(), flags=self.flags())
self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
del(self.l)
gc.collect()
- child_ldb = ldb.Ldb(self.filename)
+ child_ldb = ldb.Ldb(self.url(), flags=self.flags())
# start a transaction
child_ldb.transaction_start()
del(self.l)
gc.collect()
- child_ldb = ldb.Ldb(self.filename)
+ child_ldb = ldb.Ldb(self.url(), flags=self.flags())
# start a transaction
child_ldb.transaction_start()