import gc
import time
import ldb
+import shutil
PY3 = sys.version_info > (3, 0)
-def filename():
+def tempdir():
import tempfile
try:
dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
except KeyError:
dir_prefix = None
- return tempfile.mktemp(dir=dir_prefix)
+ return tempfile.mkdtemp(dir=dir_prefix)
class NoContextTests(TestCase):
class SimpleLdb(TestCase):
+ def setUp(self):
+ super(SimpleLdb, self).setUp()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "test.ldb")
+ self.ldb = ldb.Ldb(self.filename)
+
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(SimpleLdb, self).setUp()
+
def test_connect(self):
- ldb.Ldb(filename())
+ ldb.Ldb(self.filename)
def test_connect_none(self):
ldb.Ldb()
def test_connect_later(self):
x = ldb.Ldb()
- x.connect(filename())
+ x.connect(self.filename)
def test_repr(self):
x = ldb.Ldb()
self.assertEqual([], x.modules())
def test_modules_tdb(self):
- x = ldb.Ldb(filename())
+ x = ldb.Ldb(self.filename)
self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
def test_firstmodule_none(self):
self.assertEqual(x.firstmodule, None)
def test_firstmodule_tdb(self):
- x = ldb.Ldb(filename())
+ x = ldb.Ldb(self.filename)
mod = x.firstmodule
self.assertEqual(repr(mod), "<ldb module 'tdb'>")
def test_search(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search()), 0)
def test_search_controls(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
def test_search_attrs(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_string_dn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
def test_search_attr_string(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertRaises(TypeError, l.search, attrs="dc")
self.assertRaises(TypeError, l.search, attrs=b"dc")
def test_opaque(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
l.set_opaque("my_opaque", l)
self.assertTrue(l.get_opaque("my_opaque") is not None)
self.assertEqual(None, l.get_opaque("unknown"))
def test_search_scope_base_empty_db(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_BASE)), 0)
def test_search_scope_onelevel_empty_db(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
ldb.SCOPE_ONELEVEL)), 0)
def test_delete(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
def test_delete_w_unhandled_ctrl(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo1")
m["b"] = [b"a"]
l.delete(m.dn)
def test_contains(self):
- name = filename()
+ name = self.filename
l = ldb.Ldb(name)
self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
l = ldb.Ldb(name)
l.delete(m.dn)
def test_get_config_basedn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(None, l.get_config_basedn())
def test_get_root_basedn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(None, l.get_root_basedn())
def test_get_schema_basedn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(None, l.get_schema_basedn())
def test_get_default_basedn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(None, l.get_default_basedn())
def test_add(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=foo4"))
def test_search_iterator(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
s = l.search_iterator()
s.abandon()
try:
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = "bla"
l.delete(ldb.Dn(l, "dc=foo4"))
def test_add_w_unhandled_ctrl(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
def test_add_dict(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = {"dn": ldb.Dn(l, "dc=foo5"),
"bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = {"dn": ldb.Dn(l, "dc=foo5"),
"bla": "bla"}
self.assertEqual(len(l.search()), 0)
l.delete(ldb.Dn(l, "dc=foo5"))
def test_add_dict_string_dn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = {"dn": "dc=foo6", "bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.add(m)
l.delete(ldb.Dn(l, "dc=foo6"))
def test_add_dict_bytes_dn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = {"dn": b"dc=foo6", "bla": b"bla"}
self.assertEqual(len(l.search()), 0)
l.add(m)
l.delete(ldb.Dn(l, "dc=foo6"))
def test_rename(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo7")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=bar"))
def test_rename_string_dns(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=foo8")
m["bla"] = b"bla"
l.delete(ldb.Dn(l, "dc=bar"))
def test_empty_dn(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertEqual(0, len(l.search()))
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=empty")
self.assertEqual(0, len(rm[0]))
def test_modify_delete(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_delete_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modifydelete")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=modifydelete"))
def test_modify_add(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_add_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_replace(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m["bla"] = [b"1234", b"456"]
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_replace_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=modify2")
m.text["bla"] = ["1234", "456"]
l.delete(ldb.Dn(l, "dc=modify2"))
def test_modify_flags_change(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m["bla"] = [b"1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_modify_flags_change_text(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
m = ldb.Message()
m.dn = ldb.Dn(l, "dc=add")
m.text["bla"] = ["1234"]
l.delete(ldb.Dn(l, "dc=add"))
def test_transaction_commit(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo9"))
m["foo"] = [b"bar"]
l.delete(m.dn)
def test_transaction_cancel(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
l.transaction_start()
m = ldb.Message(ldb.Dn(l, "dc=foo10"))
m["foo"] = [b"bar"]
def test_set_debug(self):
def my_report_fn(level, text):
pass
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
l.set_debug(my_report_fn)
def test_zero_byte_string(self):
"""Testing we do not get trapped in the \0 byte in a property string."""
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
l.add({
"dn" : b"dc=somedn",
"objectclass" : b"user",
self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
def test_no_crash_broken_expr(self):
- l = ldb.Ldb(filename())
+ l = ldb.Ldb(self.filename)
self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
def setUp(self):
super(DnTests, self).setUp()
- self.ldb = ldb.Ldb(filename())
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "test.ldb")
+ self.ldb = ldb.Ldb(self.filename)
+
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(DnTests, self).setUp()
def test_set_dn_invalid(self):
x = ldb.Message()
def setUp(self):
super(LdbMsgTests, self).setUp()
self.msg = ldb.Message()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "test.ldb")
+
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(LdbMsgTests, self).tearDown()
def test_init_dn(self):
self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
def test_iter_items(self):
self.assertEqual(0, len(self.msg.items()))
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "dc=foo28")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo28")
self.assertEqual(1, len(self.msg.items()))
def test_repr(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "dc=foo29")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo29")
self.msg["dc"] = b"foo"
if PY3:
self.assertIn(repr(self.msg), [
self.assertEqual(["bar"], list(self.msg.text["foo"]))
def test_keys(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.msg["foo"] = [b"bla"]
self.msg["bar"] = [b"bla"]
self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
def test_keys_text(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.msg["foo"] = ["bla"]
self.msg["bar"] = ["bla"]
self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
def test_dn(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.assertEqual("@BASEINFO", self.msg.dn.__str__())
def test_get_dn(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
def test_dn_text(self):
- self.msg.text.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.text.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.assertEqual("@BASEINFO", str(self.msg.dn))
self.assertEqual("@BASEINFO", str(self.msg.text.dn))
def test_get_dn_text(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
def test_get_invalid(self):
- self.msg.dn = ldb.Dn(ldb.Ldb(filename()), "@BASEINFO")
+ self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
self.assertRaises(TypeError, self.msg.get, 42)
def test_get_other(self):
self.assertEqual(msg1, msg2)
def test_equal_simplel(self):
- db = ldb.Ldb(filename())
+ db = ldb.Ldb(self.filename)
msg1 = ldb.Message()
msg1.dn = ldb.Dn(db, "foo=bar")
msg2 = ldb.Message()
class ModuleTests(TestCase):
+ def setUp(self):
+ super(ModuleTests, self).setUp()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "test.ldb")
+ self.ldb = ldb.Ldb(self.filename)
+
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(ModuleTests, self).setUp()
+
def test_register_module(self):
class ExampleModule:
name = "example"
def request(self, *args, **kwargs):
pass
- name = filename()
ldb.register_module(ExampleModule)
- if os.path.exists(name):
- os.unlink(name)
- l = ldb.Ldb(name)
+ l = ldb.Ldb(self.filename)
l.add({"dn": "@MODULES", "@LIST": "bla"})
self.assertEqual([], ops)
- l = ldb.Ldb(name)
+ l = ldb.Ldb(self.filename)
self.assertEqual(["init"], ops)
class LdbResultTests(TestCase):
def setUp(self):
super(LdbResultTests, self).setUp()
- name = filename()
- self.name = name
- if os.path.exists(name):
- os.unlink(name)
- self.l = ldb.Ldb(name)
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "test.ldb")
+ self.l = ldb.Ldb(self.filename)
self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
def tearDown(self):
+ shutil.rmtree(self.testdir)
super(LdbResultTests, self).tearDown()
- if os.path.exists(self.name):
- os.unlink(self.name)
def test_return_type(self):
res = self.l.search()
del(self.l)
gc.collect()
- child_ldb = ldb.Ldb(self.name)
+ child_ldb = ldb.Ldb(self.filename)
# start a transaction
child_ldb.transaction_start()