if session_info is not None:
self.set_session_info(self, session_info)
+ assert misc.ldb_register_samba_handlers(self) == 0
+
if lp is not None:
self.set_loadparm(self, lp)
set_session_info = misc.ldb_set_session_info
set_loadparm = misc.ldb_set_loadparm
- def searchone(self, basedn, attribute, expression=None, scope=ldb.SCOPE_BASE):
- """Search for one attribute as a string."""
+ def searchone(self, basedn, attribute, expression=None,
+ scope=ldb.SCOPE_BASE):
+ """Search for one attribute as a string.
+
+ :param basedn: BaseDN for the search.
+ :param attribute: Name of the attribute
+ :param expression: Optional search expression.
+ :param scope: Search scope (defaults to base).
+ :return: Value of attribute as a string or None if it wasn't found.
+ """
res = self.search(basedn, scope, expression, [attribute])
if len(res) != 1 or res[0][attribute] is None:
return None
return values.pop()
def erase(self):
- """Erase an ldb, removing all records."""
+ """Erase this ldb, removing all records."""
# delete the specials
for attr in ["@INDEXLIST", "@ATTRIBUTES", "@SUBCLASSES", "@MODULES",
"@OPTIONS", "@PARTITION", "@KLUDGEACL"]:
try:
- self.delete(ldb.Dn(self, attr))
+ self.delete(attr)
except ldb.LdbError, (LDB_ERR_NO_SUCH_OBJECT, _):
# Ignore missing dn errors
pass
- basedn = ldb.Dn(self, "")
+ basedn = ""
# and the rest
for msg in self.search(basedn, ldb.SCOPE_SUBTREE,
- "(&(|(objectclass=*)(dn=*))(!(dn=@BASEINFO)))",
+ "(&(|(objectclass=*)(distinguishedName=*))(!(distinguishedName=@BASEINFO)))",
["dn"]):
- self.delete(msg.dn)
+ try:
+ self.delete(msg.dn)
+ except ldb.LdbError, (LDB_ERR_NO_SUCH_OBJECT, _):
+ # Ignor eno such object errors
+ pass
- res = self.search(basedn, ldb.SCOPE_SUBTREE, "(&(|(objectclass=*)(dn=*))(!(dn=@BASEINFO)))", ["dn"])
+ res = self.search(basedn, ldb.SCOPE_SUBTREE, "(&(|(objectclass=*)(distinguishedName=*))(!(distinguisedName=@BASEINFO)))", ["dn"])
assert len(res) == 0
+ def erase_partitions(self):
+ """Erase an ldb, removing all records."""
+ res = self.search("", ldb.SCOPE_BASE, "(objectClass=*)",
+ ["namingContexts"])
+ assert len(res) == 1
+ if not "namingContexts" in res[0]:
+ return
+ for basedn in res[0]["namingContexts"]:
+ previous_remaining = 1
+ current_remaining = 0
+
+ k = 0
+ while ++k < 10 and (previous_remaining != current_remaining):
+ # and the rest
+ res2 = self.search(basedn, ldb.SCOPE_SUBTREE, "(|(objectclass=*)(distinguishedName=*))", ["distinguishedName"])
+ previous_remaining = current_remaining
+ current_remaining = len(res2)
+ for msg in res2:
+ self.delete(msg.dn)
+
+ def load_ldif_file_add(self, ldif_path):
+ """Load a LDIF file.
+
+ :param ldif_path: Path to LDIF file.
+ """
+ self.add_ldif(open(ldif_path, 'r').read())
+
+ def add_ldif(self, ldif):
+ """Add data based on a LDIF string.
+
+ :param ldif: LDIF text.
+ """
+ for changetype, msg in self.parse_ldif(ldif):
+ assert changetype == ldb.CHANGETYPE_NONE
+ self.add(msg)
+
+ def modify_ldif(self, ldif):
+ """Modify database based on a LDIF string.
+
+ :param ldif: LDIF text.
+ """
+ for changetype, msg in self.parse_ldif(ldif):
+ self.modify(msg)
+
def substitute_var(text, values):
"""substitute strings of the form ${NAME} in str, replacing