ldb: make key/value backends expose if there is an active transaction
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index 3541f9109fa16daf97f078a79cb6643d4eb653e9..31b4a7d9abb4c3e1f2f9856a1aa45a3c910d0258 100644 (file)
@@ -50,7 +50,8 @@
  */
 
 #include "ldb_tdb.h"
-#include <lib/tdb_compat/tdb_compat.h>
+#include "ldb_private.h"
+#include <tdb.h>
 
 /*
   prevent memory errors on callbacks
@@ -93,31 +94,43 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
 /*
   lock the database for read - use by ltdb_search and ltdb_sequence_number
 */
-int ltdb_lock_read(struct ldb_module *module)
+static int ltdb_lock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       int ret = 0;
+       int tdb_ret = 0;
+       int ret;
 
        if (ltdb->in_transaction == 0 &&
            ltdb->read_lock_count == 0) {
-               ret = tdb_lockall_read(ltdb->tdb);
+               tdb_ret = tdb_lockall_read(ltdb->tdb);
        }
-       if (ret == 0) {
+       if (tdb_ret == 0) {
                ltdb->read_lock_count++;
+               return LDB_SUCCESS;
        }
+       ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_debug_set(ldb_module_get_ctx(module),
+                     LDB_DEBUG_FATAL,
+                     "Failure during ltdb_lock_read(): %s -> %s",
+                     tdb_errorstr(ltdb->tdb),
+                     ldb_strerror(ret));
        return ret;
 }
 
 /*
   unlock the database after a ltdb_lock_read()
 */
-int ltdb_unlock_read(struct ldb_module *module)
+static int ltdb_unlock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
                tdb_unlockall_read(ltdb->tdb);
+               ltdb->read_lock_count--;
                return 0;
        }
        ltdb->read_lock_count--;
@@ -125,6 +138,36 @@ int ltdb_unlock_read(struct ldb_module *module)
 }
 
 
+/* 
+ * Determine if this key could hold a record.  We allow the new GUID
+ * index, the old DN index and a possible future ID=
+ */
+bool ltdb_key_is_record(TDB_DATA key)
+{
+       if (key.dsize < 4) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, "DN=", 3) == 0) {
+               return true;
+       }
+       
+       if (memcmp(key.dptr, "ID=", 3) == 0) {
+               return true;
+       }
+
+       if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
+                  sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
+               return true;
+       }
+       
+       return false;
+}
+
 /*
   form a TDB_DATA for a record key
   caller frees
@@ -132,9 +175,9 @@ int ltdb_unlock_read(struct ldb_module *module)
   note that the key for a record can depend on whether the
   dn refers to a case sensitive index record or not
 */
-TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
+TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                    struct ldb_dn *dn)
 {
-       struct ldb_context *ldb = ldb_module_get_ctx(module);
        TDB_DATA key;
        char *key_str = NULL;
        const char *dn_folded = NULL;
@@ -156,7 +199,7 @@ TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
                goto failed;
        }
 
-       key_str = talloc_strdup(ldb, "DN=");
+       key_str = talloc_strdup(mem_ctx, "DN=");
        if (!key_str) {
                goto failed;
        }
@@ -178,6 +221,121 @@ failed:
        return key;
 }
 
+/* The caller is to provide a correctly sized key */
+int ltdb_guid_to_key(struct ldb_module *module,
+                    struct ltdb_private *ltdb,
+                    const struct ldb_val *GUID_val,
+                    TDB_DATA *key)
+{
+       const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
+       const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
+
+       if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
+       memcpy(&key->dptr[GUID_prefix_len],
+              GUID_val->data, GUID_val->length);
+       return LDB_SUCCESS;
+}
+
+/*
+ * The caller is to provide a correctly sized key, used only in
+ * the GUID index mode
+ */
+int ltdb_idx_to_key(struct ldb_module *module,
+                   struct ltdb_private *ltdb,
+                   TALLOC_CTX *mem_ctx,
+                   const struct ldb_val *idx_val,
+                   TDB_DATA *key)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_dn *dn;
+
+       if (ltdb->cache->GUID_index_attribute != NULL) {
+               return ltdb_guid_to_key(module, ltdb,
+                                       idx_val, key);
+       }
+
+       dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
+       if (dn == NULL) {
+               /*
+                * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
+                * to the caller, as this in an invalid index value
+                */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       /* form the key */
+       *key = ltdb_key_dn(module, mem_ctx, dn);
+       TALLOC_FREE(dn);
+       if (!key->dptr) {
+               return ldb_module_oom(module);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+  form a TDB_DATA for a record key
+  caller frees mem_ctx, which may or may not have the key
+  as a child.
+
+  note that the key for a record can depend on whether a
+  GUID index is in use, or the DN is used as the key
+*/
+TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                     const struct ldb_message *msg)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       TDB_DATA key;
+       const struct ldb_val *guid_val;
+       int ret;
+
+       if (ltdb->cache->GUID_index_attribute == NULL) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       if (ldb_dn_is_special(msg->dn)) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       guid_val = ldb_msg_find_ldb_val(msg,
+                                      ltdb->cache->GUID_index_attribute);
+       if (guid_val == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Did not find GUID attribute %s "
+                                      "in %s, required for TDB record "
+                                      "key in " LTDB_IDXGUID " mode.",
+                                      ltdb->cache->GUID_index_attribute,
+                                      ldb_dn_get_linearized(msg->dn));
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+
+       /* In this case, allocate with talloc */
+       key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
+       if (key.dptr == NULL) {
+               errno = ENOMEM;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       key.dsize = talloc_get_size(key.dptr);
+
+       ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
+
+       if (ret != LDB_SUCCESS) {
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       return key;
+}
+
 /*
   check special dn's have valid attributes
   currently only @ATTRIBUTES is checked
@@ -228,7 +386,13 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
        if (ldb_dn_is_special(dn) &&
            (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
-            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
+            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
+       {
+               if (ltdb->warn_reindex) {
+                       ldb_debug(ldb_module_get_ctx(module),
+                               LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
+                               ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
+               }
                ret = ltdb_reindex(module);
        }
 
@@ -246,9 +410,41 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                ret = ltdb_cache_reload(module);
        }
 
+       if (ret != LDB_SUCCESS) {
+               ltdb->reindex_failed = true;
+       }
+
        return ret;
 }
 
+static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
+                         struct ldb_val ldb_data, int flags)
+{
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_store(ltdb->tdb, key, data, flags);
+}
+
+static int ltdb_error(struct ltdb_private *ltdb)
+{
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char *ltdb_errorstr(struct ltdb_private *ltdb)
+{
+       return tdb_errorstr(ltdb->tdb);
+}
+
 /*
   store a record into the db
 */
@@ -256,29 +452,56 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
+       TDB_DATA tdb_key;
+       struct ldb_val ldb_key;
+       struct ldb_val ldb_data;
        int ret = LDB_SUCCESS;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
 
-       tdb_key = ltdb_key(module, msg->dn);
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (tdb_key.dptr == NULL) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       ret = ltdb_pack_data(module, msg, &tdb_data);
+       ret = ldb_pack_data(ldb_module_get_ctx(module),
+                           msg, &ldb_data);
        if (ret == -1) {
-               talloc_free(tdb_key.dptr);
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
+
+       ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               bool is_special = ldb_dn_is_special(msg->dn);
+               ret = ltdb->kv_ops->error(ltdb);
+
+               /*
+                * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
+                * the GUID, so re-map
+                */
+               if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
+                   && !is_special
+                   && ltdb->cache->GUID_index_attribute != NULL) {
+                       ret = LDB_ERR_CONSTRAINT_VIOLATION;
+               }
                goto done;
        }
 
 done:
-       talloc_free(tdb_key.dptr);
-       talloc_free(tdb_data.dptr);
+       TALLOC_FREE(tdb_key_ctx);
+       talloc_free(ldb_data.data);
 
        return ret;
 }
@@ -313,6 +536,7 @@ static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
 }
 
 static int ltdb_add_internal(struct ldb_module *module,
+                            struct ltdb_private *ltdb,
                             const struct ldb_message *msg,
                             bool check_single_value)
 {
@@ -336,10 +560,57 @@ static int ltdb_add_internal(struct ldb_module *module,
                                               el->name, ldb_dn_get_linearized(msg->dn));
                        return LDB_ERR_CONSTRAINT_VIOLATION;
                }
+
+               /* Do not check "@ATTRIBUTES" for duplicated values */
+               if (ldb_dn_is_special(msg->dn) &&
+                   ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
+                       continue;
+               }
+
+               if (check_single_value &&
+                   !(el->flags &
+                     LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                       struct ldb_val *duplicate = NULL;
+
+                       ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
+                                                        el, &duplicate, 0);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
+                       }
+                       if (duplicate != NULL) {
+                               ldb_asprintf_errstring(
+                                       ldb,
+                                       "attribute '%s': value '%.*s' on '%s' "
+                                       "provided more than once in ADD object",
+                                       el->name,
+                                       (int)duplicate->length,
+                                       duplicate->data,
+                                       ldb_dn_get_linearized(msg->dn));
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
        }
 
        ret = ltdb_store(module, msg, TDB_INSERT);
        if (ret != LDB_SUCCESS) {
+               /*
+                * Try really hard to get the right error code for
+                * a re-add situation, as this can matter!
+                */
+               if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
+                       int ret2;
+                       struct ldb_dn *dn2 = NULL;
+                       TALLOC_CTX *mem_ctx = talloc_new(module);
+                       if (mem_ctx == NULL) {
+                               return ldb_module_operr(module);
+                       }
+                       ret2 = ltdb_search_base(module, module,
+                                               msg->dn, &dn2);
+                       TALLOC_FREE(mem_ctx);
+                       if (ret2 == LDB_SUCCESS) {
+                               ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       }
+               }
                if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
                        ldb_asprintf_errstring(ldb,
                                               "Entry %s already exists",
@@ -348,8 +619,19 @@ static int ltdb_add_internal(struct ldb_module *module,
                return ret;
        }
 
-       ret = ltdb_index_add_new(module, msg);
+       ret = ltdb_index_add_new(module, ltdb, msg);
        if (ret != LDB_SUCCESS) {
+               /*
+                * If we failed to index, delete the message again.
+                *
+                * This is particularly important for the GUID index
+                * case, which will only fail for a duplicate DN
+                * in the index add.
+                *
+                * Note that the caller may not cancel the transation
+                * and this means the above add might really show up!
+                */
+               ltdb_delete_noindex(module, msg);
                return ret;
        }
 
@@ -365,8 +647,20 @@ static int ltdb_add(struct ltdb_context *ctx)
 {
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        int ret = LDB_SUCCESS;
 
+       if (ltdb->max_key_length != 0 &&
+           ltdb->cache->GUID_index_attribute == NULL &&
+           !ldb_dn_is_special(req->op.add.message->dn))
+       {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Must operate ldb_mdb in GUID "
+                                 "index mode, but " LTDB_IDXGUID " not set.");
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
        ret = ltdb_check_special_dn(module, req->op.add.message);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -378,32 +672,61 @@ static int ltdb_add(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_add_internal(module, req->op.add.message, true);
+       ret = ltdb_add_internal(module, ltdb,
+                               req->op.add.message, true);
 
        return ret;
 }
 
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
+{
+       TDB_DATA tdb_key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_delete(ltdb->tdb, tdb_key);
+}
+
 /*
   delete a record from the database, not updating indexes (used for deleting
   index records)
 */
-int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
+int ltdb_delete_noindex(struct ldb_module *module,
+                       const struct ldb_message *msg)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ldb_val ldb_key;
        TDB_DATA tdb_key;
        int ret;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
 
-       tdb_key = ltdb_key(module, dn);
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (!tdb_key.dptr) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       ret = tdb_delete(ltdb->tdb, tdb_key);
-       talloc_free(tdb_key.dptr);
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
+
+       ret = ltdb->kv_ops->delete(ltdb, ldb_key);
+       TALLOC_FREE(tdb_key_ctx);
 
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb->kv_ops->error(ltdb);
        }
 
        return ret;
@@ -421,13 +744,13 @@ static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
 
        /* in case any attribute of the message was indexed, we need
           to fetch the old record */
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
        if (ret != LDB_SUCCESS) {
                /* not finding the old record is an error */
                goto done;
        }
 
-       ret = ltdb_delete_noindex(module, dn);
+       ret = ltdb_delete_noindex(module, msg);
        if (ret != LDB_SUCCESS) {
                goto done;
        }
@@ -494,8 +817,7 @@ static int find_element(const struct ldb_message *msg, const char *name)
 
   returns 0 on success, -1 on failure (and sets errno)
 */
-static int ltdb_msg_add_element(struct ldb_context *ldb,
-                               struct ldb_message *msg,
+static int ltdb_msg_add_element(struct ldb_message *msg,
                                struct ldb_message_element *el)
 {
        struct ldb_message_element *e2;
@@ -539,12 +861,23 @@ static int ltdb_msg_add_element(struct ldb_context *ldb,
   delete all elements having a specified attribute name
 */
 static int msg_delete_attribute(struct ldb_module *module,
-                               struct ldb_context *ldb,
+                               struct ltdb_private *ltdb,
                                struct ldb_message *msg, const char *name)
 {
        unsigned int i;
        int ret;
        struct ldb_message_element *el;
+       bool is_special = ldb_dn_is_special(msg->dn);
+
+       if (!is_special
+           && ltdb->cache->GUID_index_attribute != NULL
+           && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "Must not modify GUID "
+                                      "attribute %s (used as DB index)",
+                                      ltdb->cache->GUID_index_attribute);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
 
        el = ldb_msg_find_element(msg, name);
        if (el == NULL) {
@@ -552,7 +885,7 @@ static int msg_delete_attribute(struct ldb_module *module,
        }
        i = el - msg->elements;
 
-       ret = ltdb_index_del_element(module, msg->dn, el);
+       ret = ltdb_index_del_element(module, ltdb, msg, el);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
@@ -574,6 +907,7 @@ static int msg_delete_attribute(struct ldb_module *module,
   return LDB Error on failure
 */
 static int msg_delete_element(struct ldb_module *module,
+                             struct ltdb_private *ltdb,
                              struct ldb_message *msg,
                              const char *name,
                              const struct ldb_val *val)
@@ -606,10 +940,11 @@ static int msg_delete_element(struct ldb_module *module,
                }
                if (matched) {
                        if (el->num_values == 1) {
-                               return msg_delete_attribute(module, ldb, msg, name);
+                               return msg_delete_attribute(module,
+                                                           ltdb, msg, name);
                        }
 
-                       ret = ltdb_index_del_value(module, msg->dn, el, i);
+                       ret = ltdb_index_del_value(module, ltdb, msg, el, i);
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
@@ -631,7 +966,6 @@ static int msg_delete_element(struct ldb_module *module,
        return LDB_ERR_NO_SUCH_ATTRIBUTE;
 }
 
-
 /*
   modify a record - internal interface
 
@@ -648,51 +982,43 @@ int ltdb_modify_internal(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
        struct ldb_message *msg2;
-       unsigned int i, j, k;
+       unsigned int i, j;
        int ret = LDB_SUCCESS, idx;
        struct ldb_control *control_permissive = NULL;
+       TALLOC_CTX *mem_ctx = talloc_new(req);
 
+       if (mem_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+       
        if (req) {
                control_permissive = ldb_request_get_control(req,
                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
        }
 
-       tdb_key = ltdb_key(module, msg->dn);
-       if (!tdb_key.dptr) {
-               return LDB_ERR_OTHER;
-       }
-
-       tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
-       if (!tdb_data.dptr) {
-               talloc_free(tdb_key.dptr);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
-       }
-
-       msg2 = ldb_msg_new(tdb_key.dptr);
+       msg2 = ldb_msg_new(mem_ctx);
        if (msg2 == NULL) {
-               free(tdb_data.dptr);
                ret = LDB_ERR_OTHER;
                goto done;
        }
 
-       ret = ltdb_unpack_data(module, &tdb_data, msg2);
-       free(tdb_data.dptr);
-       if (ret == -1) {
-               ret = LDB_ERR_OTHER;
+       ret = ltdb_search_dn1(module, msg->dn,
+                             msg2,
+                             LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
+       if (ret != LDB_SUCCESS) {
                goto done;
        }
 
-       if (!msg2->dn) {
-               msg2->dn = msg->dn;
-       }
-
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i], *el2;
                struct ldb_val *vals;
                const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
                const char *dn;
+               uint32_t options = 0;
+               if (control_permissive != NULL) {
+                       options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
+               }
 
                switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
                case LDB_FLAG_MOD_ADD:
@@ -737,11 +1063,12 @@ int ltdb_modify_internal(struct ldb_module *module,
                        /* Checks if element already exists */
                        idx = find_element(msg2, el->name);
                        if (idx == -1) {
-                               if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
+                               if (ltdb_msg_add_element(msg2, el) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
                                }
-                               ret = ltdb_index_add_element(module, msg2->dn,
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2,
                                                             el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
@@ -761,29 +1088,43 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                                /* Check that values don't exist yet on multi-
                                   valued attributes or aren't provided twice */
-                               for (j = 0; j < el->num_values; j++) {
-                                       if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
-                                               if (control_permissive) {
-                                                       /* remove this one as if it was never added */
-                                                       el->num_values--;
-                                                       for (k = j; k < el->num_values; k++) {
-                                                               el->values[k] = el->values[k + 1];
-                                                       }
-                                                       j--; /* rewind */
-
-                                                       continue;
-                                               }
-
+                               if (!(el->flags &
+                                     LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                                       struct ldb_val *duplicate = NULL;
+                                       ret = ldb_msg_find_common_values(ldb,
+                                                                        msg2,
+                                                                        el,
+                                                                        el2,
+                                                                        options);
+
+                                       if (ret ==
+                                           LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
                                                ldb_asprintf_errstring(ldb,
-                                                                      "attribute '%s': value #%u on '%s' already exists",
-                                                                      el->name, j, ldb_dn_get_linearized(msg2->dn));
-                                               ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                                                       "attribute '%s': value "
+                                                       "#%u on '%s' already "
+                                                       "exists", el->name, j,
+                                                       ldb_dn_get_linearized(msg2->dn));
+                                               goto done;
+                                       } else if (ret != LDB_SUCCESS) {
                                                goto done;
                                        }
-                                       if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                               ldb_asprintf_errstring(ldb,
-                                                                      "attribute '%s': value #%u on '%s' provided more than once",
-                                                                      el->name, j, ldb_dn_get_linearized(msg2->dn));
+
+                                       ret = ldb_msg_find_duplicate_val(
+                                               ldb, msg2, el, &duplicate, 0);
+                                       if (ret != LDB_SUCCESS) {
+                                               goto done;
+                                       }
+                                       if (duplicate != NULL) {
+                                               ldb_asprintf_errstring(
+                                                       ldb,
+                                                       "attribute '%s': value "
+                                                       "'%.*s' on '%s' "
+                                                       "provided more than "
+                                                       "once in ADD",
+                                                       el->name,
+                                                       (int)duplicate->length,
+                                                       duplicate->data,
+                                                       ldb_dn_get_linearized(msg->dn));
                                                ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                                goto done;
                                        }
@@ -808,7 +1149,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                el2->values = vals;
                                el2->num_values += el->num_values;
 
-                               ret = ltdb_index_add_element(module, msg2->dn, el);
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2, el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
                                }
@@ -825,12 +1167,30 @@ int ltdb_modify_internal(struct ldb_module *module,
                                goto done;
                        }
 
-                       /* TODO: This is O(n^2) - replace with more efficient check */
-                       for (j=0; j<el->num_values; j++) {
-                               if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                       ldb_asprintf_errstring(ldb,
-                                                              "attribute '%s': value #%u on '%s' provided more than once",
-                                                              el->name, j, ldb_dn_get_linearized(msg2->dn));
+                       /*
+                        * We don't need to check this if we have been
+                        * pre-screened by the repl_meta_data module
+                        * in Samba, or someone else who can claim to
+                        * know what they are doing. 
+                        */
+                       if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                               struct ldb_val *duplicate = NULL;
+
+                               ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
+                                                                &duplicate, 0);
+                               if (ret != LDB_SUCCESS) {
+                                       goto done;
+                               }
+                               if (duplicate != NULL) {
+                                       ldb_asprintf_errstring(
+                                               ldb,
+                                               "attribute '%s': value '%.*s' "
+                                               "on '%s' provided more than "
+                                               "once in REPLACE",
+                                               el->name,
+                                               (int)duplicate->length,
+                                               duplicate->data,
+                                               ldb_dn_get_linearized(msg2->dn));
                                        ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                        goto done;
                                }
@@ -854,7 +1214,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                }
 
                                /* Delete the attribute if it exists in the DB */
-                               if (msg_delete_attribute(module, ldb, msg2,
+                               if (msg_delete_attribute(module, ltdb,
+                                                        msg2,
                                                         el->name) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
@@ -862,12 +1223,13 @@ int ltdb_modify_internal(struct ldb_module *module,
                        }
 
                        /* Recreate it with the new values */
-                       if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
+                       if (ltdb_msg_add_element(msg2, el) != 0) {
                                ret = LDB_ERR_OTHER;
                                goto done;
                        }
 
-                       ret = ltdb_index_add_element(module, msg2->dn, el);
+                       ret = ltdb_index_add_element(module, ltdb,
+                                                    msg2, el);
                        if (ret != LDB_SUCCESS) {
                                goto done;
                        }
@@ -883,7 +1245,9 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                        if (msg->elements[i].num_values == 0) {
                                /* Delete the whole attribute */
-                               ret = msg_delete_attribute(module, ldb, msg2,
+                               ret = msg_delete_attribute(module,
+                                                          ltdb,
+                                                          msg2,
                                                           msg->elements[i].name);
                                if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                    control_permissive) {
@@ -900,13 +1264,14 @@ int ltdb_modify_internal(struct ldb_module *module,
                                /* Delete specified values from an attribute */
                                for (j=0; j < msg->elements[i].num_values; j++) {
                                        ret = msg_delete_element(module,
+                                                                ltdb,
                                                                 msg2,
                                                                 msg->elements[i].name,
                                                                 &msg->elements[i].values[j]);
                                        if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                            control_permissive) {
                                                ret = LDB_SUCCESS;
-                                       } else {
+                                       } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                                ldb_asprintf_errstring(ldb,
                                                                       "attribute '%s': no matching attribute value while deleting attribute on '%s'",
                                                                       msg->elements[i].name, dn);
@@ -938,7 +1303,7 @@ int ltdb_modify_internal(struct ldb_module *module,
        }
 
 done:
-       talloc_free(tdb_key.dptr);
+       TALLOC_FREE(mem_ctx);
        return ret;
 }
 
@@ -973,9 +1338,13 @@ static int ltdb_modify(struct ltdb_context *ctx)
 static int ltdb_rename(struct ltdb_context *ctx)
 {
        struct ldb_module *module = ctx->module;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        struct ldb_request *req = ctx->req;
        struct ldb_message *msg;
        int ret = LDB_SUCCESS;
+       TDB_DATA tdb_key, tdb_key_old;
+       struct ldb_dn *db_dn;
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
@@ -988,25 +1357,80 @@ static int ltdb_rename(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* in case any attribute of the message was indexed, we need
-          to fetch the old record */
-       ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
+       /* we need to fetch the old record to re-add under the new name */
+       ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
+                             LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
        if (ret != LDB_SUCCESS) {
                /* not finding the old record is an error */
                return ret;
        }
 
+       /* We need to, before changing the DB, check if the new DN
+        * exists, so we can return this error to the caller with an
+        * unmodified DB
+        *
+        * Even in GUID index mode we use ltdb_key_dn() as we are
+        * trying to figure out if this is just a case rename
+        */
+       tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
+       if (!tdb_key.dptr) {
+               talloc_free(msg);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
+       if (!tdb_key_old.dptr) {
+               talloc_free(msg);
+               talloc_free(tdb_key.dptr);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * Only declare a conflict if the new DN already exists,
+        * and it isn't a case change on the old DN
+        */
+       if (tdb_key_old.dsize != tdb_key.dsize
+           || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
+               ret = ltdb_search_base(module, msg,
+                                      req->op.rename.newdn,
+                                      &db_dn);
+               if (ret == LDB_SUCCESS) {
+                       ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+               } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
+               }
+       }
+
+       /* finding the new record already in the DB is an error */
+
+       if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Entry %s already exists",
+                                      ldb_dn_get_linearized(req->op.rename.newdn));
+       }
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tdb_key_old.dptr);
+               talloc_free(tdb_key.dptr);
+               talloc_free(msg);
+               return ret;
+       }
+
+       talloc_free(tdb_key_old.dptr);
+       talloc_free(tdb_key.dptr);
+
        /* Always delete first then add, to avoid conflicts with
         * unique indexes. We rely on the transaction to make this
         * atomic
         */
        ret = ltdb_delete_internal(module, msg->dn);
        if (ret != LDB_SUCCESS) {
+               talloc_free(msg);
                return ret;
        }
 
        msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
        if (msg->dn == NULL) {
+               talloc_free(msg);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1014,29 +1438,65 @@ static int ltdb_rename(struct ltdb_context *ctx)
         * deleted attributes. We could go through all elements but that's
         * maybe not the most efficient way
         */
-       ret = ltdb_add_internal(module, msg, false);
+       ret = ltdb_add_internal(module, ltdb, msg, false);
+
+       talloc_free(msg);
 
        return ret;
 }
 
+static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_start(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_cancel(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_prepare_commit(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_commit(ltdb->tdb);
+}
+
 static int ltdb_start_trans(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-       if (tdb_transaction_start(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+       /* Do not take out the transaction lock on a read-only DB */
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       if (ltdb->kv_ops->begin_write(ltdb) != 0) {
+               return ltdb->kv_ops->error(ltdb);
        }
 
        ltdb->in_transaction++;
 
        ltdb_index_transaction_start(module);
 
+       ltdb->reindex_failed = false;
+
        return LDB_SUCCESS;
 }
 
+/*
+ * Forward declaration to allow prepare_commit to in fact abort the
+ * transaction
+ */
+static int ltdb_del_trans(struct ldb_module *module);
+
 static int ltdb_prepare_commit(struct ldb_module *module)
 {
+       int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
@@ -1044,15 +1504,41 @@ static int ltdb_prepare_commit(struct ldb_module *module)
                return LDB_SUCCESS;
        }
 
-       if (ltdb_index_transaction_commit(module) != 0) {
-               tdb_transaction_cancel(ltdb->tdb);
+       /*
+        * Check if the last re-index failed.
+        *
+        * This can happen if for example a duplicate value was marked
+        * unique.  We must not write a partial re-index into the DB.
+        */
+       if (ltdb->reindex_failed) {
+               /*
+                * We must instead abort the transaction so we get the
+                * old values and old index back
+                */
+               ltdb_del_trans(module);
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Failure during re-index, so "
+                                 "transaction must be aborted.");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_index_transaction_commit(module);
+       if (ret != LDB_SUCCESS) {
+               ltdb->kv_ops->abort_write(ltdb);
                ltdb->in_transaction--;
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               return ret;
        }
 
-       if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
+       if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ltdb->in_transaction--;
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ldb_debug_set(ldb_module_get_ctx(module),
+                             LDB_DEBUG_FATAL,
+                             "Failure during "
+                             "prepare_write): %s -> %s",
+                             ltdb->kv_ops->errorstr(ltdb),
+                             ldb_strerror(ret));
+               return ret;
        }
 
        ltdb->prepared_commit = true;
@@ -1062,11 +1548,12 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 
 static int ltdb_end_trans(struct ldb_module *module)
 {
+       int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
        if (!ltdb->prepared_commit) {
-               int ret = ltdb_prepare_commit(module);
+               ret = ltdb_prepare_commit(module);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -1075,8 +1562,13 @@ static int ltdb_end_trans(struct ldb_module *module)
        ltdb->in_transaction--;
        ltdb->prepared_commit = false;
 
-       if (tdb_transaction_commit(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Failure during tdb_transaction_commit(): %s -> %s",
+                                      ltdb->kv_ops->errorstr(ltdb),
+                                      ldb_strerror(ret));
+               return ret;
        }
 
        return LDB_SUCCESS;
@@ -1090,11 +1582,11 @@ static int ltdb_del_trans(struct ldb_module *module)
        ltdb->in_transaction--;
 
        if (ltdb_index_transaction_cancel(module) != 0) {
-               tdb_transaction_cancel(ltdb->tdb);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ltdb->kv_ops->abort_write(ltdb);
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       tdb_transaction_cancel(ltdb->tdb);
+       ltdb->kv_ops->abort_write(ltdb);
        return LDB_SUCCESS;
 }
 
@@ -1107,6 +1599,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
        struct ldb_context *ldb;
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TALLOC_CTX *tmp_ctx = NULL;
        struct ldb_seqnum_request *seq;
        struct ldb_seqnum_result *res;
@@ -1125,7 +1619,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-       if (ltdb_lock_read(module) != 0) {
+       if (ltdb->kv_ops->lock_read(module) != 0) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1153,7 +1647,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
                goto done;
        }
 
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_search_dn1(module, dn, msg, 0);
        if (ret != LDB_SUCCESS) {
                goto done;
        }
@@ -1187,7 +1681,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 done:
        talloc_free(tmp_ctx);
-       ltdb_unlock_read(module);
+
+       ltdb->kv_ops->unlock_read(module);
        return ret;
 }
 
@@ -1284,6 +1779,176 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
        ltdb_request_extended_done(ctx, ext, ret);
 }
 
+struct kv_ctx {
+       ldb_kv_traverse_fn kv_traverse_fn;
+       void *ctx;
+       struct ltdb_private *ltdb;
+       int (*parser)(struct ldb_val key,
+                     struct ldb_val data,
+                     void *private_data);
+};
+
+static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+       return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .kv_traverse_fn = fn,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       if (ltdb->in_transaction != 0) {
+               return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       } else {
+               return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       }
+}
+
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
+                                     struct ldb_val ldb_key,
+                                     struct ldb_val ldb_key2,
+                                     struct ldb_val ldb_data, void *state)
+{
+       int tdb_ret;
+       struct ldb_context *ldb;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA key2 = {
+               .dptr = ldb_key2.data,
+               .dsize = ldb_key2.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+
+       ldb = ldb_module_get_ctx(module);
+
+       tdb_ret = tdb_delete(ltdb->tdb, key);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to delete %*.*s "
+                         "for rekey as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to rekey %*.*s as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       return tdb_ret;
+}
+
+static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
+                                        void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+
+       return kv_ctx->parser(key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
+                                struct ldb_val ldb_key,
+                                int (*parser)(struct ldb_val key,
+                                              struct ldb_val data,
+                                              void *private_data),
+                                void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .parser = parser,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       int ret;
+
+       ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+                              &kv_ctx);
+       if (ret == 0) {
+               return LDB_SUCCESS;
+       }
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
+{
+       return tdb_name(ltdb->tdb);
+}
+
+static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
+{
+       int seq = tdb_get_seqnum(ltdb->tdb);
+       bool has_changed = (seq != ltdb->tdb_seqnum);
+
+       ltdb->tdb_seqnum = seq;
+
+       return has_changed;
+}
+
+static bool ltdb_transaction_active(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_active(ltdb->tdb);
+}
+
+static const struct kv_db_ops key_value_ops = {
+       .store = ltdb_tdb_store,
+       .delete = ltdb_tdb_delete,
+       .iterate = ltdb_tdb_traverse_fn,
+       .update_in_iterate = ltdb_tdb_update_in_iterate,
+       .fetch_and_parse = ltdb_tdb_parse_record,
+       .lock_read = ltdb_lock_read,
+       .unlock_read = ltdb_unlock_read,
+       .begin_write = ltdb_tdb_transaction_start,
+       .prepare_write = ltdb_tdb_transaction_prepare_commit,
+       .finish_write = ltdb_tdb_transaction_commit,
+       .abort_write = ltdb_tdb_transaction_cancel,
+       .error = ltdb_error,
+       .errorstr = ltdb_errorstr,
+       .name = ltdb_tdb_name,
+       .has_changed = ltdb_tdb_changed,
+       .transaction_active = ltdb_transaction_active,
+};
+
 static void ltdb_callback(struct tevent_context *ev,
                          struct tevent_timer *te,
                          struct timeval t,
@@ -1379,7 +2044,7 @@ static int ltdb_handle_request(struct ldb_module *module,
                return LDB_ERR_TIME_LIMIT_EXCEEDED;
        }
 
-       ev = ldb_get_event_context(ldb);
+       ev = ldb_handle_get_event_context(req->handle);
 
        ac = talloc_zero(ldb, struct ltdb_context);
        if (ac == NULL) {
@@ -1398,11 +2063,15 @@ static int ltdb_handle_request(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       tv.tv_sec = req->starttime + req->timeout;
-       ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
-       if (NULL == ac->timeout_event) {
-               talloc_free(ac);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (req->timeout > 0) {
+               tv.tv_sec = req->starttime + req->timeout;
+               tv.tv_usec = 0;
+               ac->timeout_event = tevent_add_timer(ev, ac, tv,
+                                                    ltdb_timeout, ac);
+               if (NULL == ac->timeout_event) {
+                       talloc_free(ac);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        }
 
        /* set a spy so that we do not try to use the request context
@@ -1428,6 +2097,21 @@ static int ltdb_init_rootdse(struct ldb_module *module)
        return LDB_SUCCESS;
 }
 
+
+static int generic_lock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->lock_read(module);
+}
+
+static int generic_unlock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->unlock_read(module);
+}
+
 static const struct ldb_module_ops ltdb_ops = {
        .name              = "tdb",
        .init_context      = ltdb_init_rootdse,
@@ -1441,20 +2125,84 @@ static const struct ldb_module_ops ltdb_ops = {
        .end_transaction   = ltdb_end_trans,
        .prepare_commit    = ltdb_prepare_commit,
        .del_transaction   = ltdb_del_trans,
+       .read_lock         = generic_lock_read,
+       .read_unlock       = generic_unlock_read,
 };
 
+int init_store(struct ltdb_private *ltdb,
+                     const char *name,
+                     struct ldb_context *ldb,
+                     const char *options[],
+                     struct ldb_module **_module)
+{
+       struct ldb_module *module;
+
+       if (getenv("LDB_WARN_UNINDEXED")) {
+               ltdb->warn_unindexed = true;
+       }
+
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
+       ltdb->sequence_number = 0;
+
+       module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
+       if (!module) {
+               ldb_oom(ldb);
+               talloc_free(ltdb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_module_set_private(module, ltdb);
+       talloc_steal(module, ltdb);
+
+       if (ltdb_cache_load(module) != 0) {
+               ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
+                                      "records for backend '%s'", name);
+               talloc_free(module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *_module = module;
+       /*
+        * Set or override the maximum key length
+        *
+        * The ldb_mdb code will have set this to 511, but our tests
+        * set this even smaller (to make the tests more practical).
+        *
+        * This must only be used for the selftest as the length
+        * becomes encoded in the index keys.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "max_key_len_for_self_test");
+               if (len_str != NULL) {
+                       unsigned len = strtoul(len_str, NULL, 0);
+                       ltdb->max_key_length = len;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   connect to the database
 */
-static int ltdb_connect(struct ldb_context *ldb, const char *url,
-                       unsigned int flags, const char *options[],
-                       struct ldb_module **_module)
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+                unsigned int flags, const char *options[],
+                struct ldb_module **_module)
 {
-       struct ldb_module *module;
        const char *path;
        int tdb_flags, open_flags;
        struct ltdb_private *ltdb;
 
+       /*
+        * We hold locks, so we must use a private event context
+        * on each returned handle
+        */
+       ldb_set_require_private_event_context(ldb);
+
        /* parse the url */
        if (strchr(url, ':')) {
                if (strncmp(url, "tdb://", 6) != 0) {
@@ -1479,59 +2227,53 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                tdb_flags |= TDB_NOMMAP;
        }
 
-       if (flags & LDB_FLG_RDONLY) {
-               open_flags = O_RDONLY;
-       } else {
-               open_flags = O_CREAT | O_RDWR;
-       }
-
        ltdb = talloc_zero(ldb, struct ltdb_private);
        if (!ltdb) {
                ldb_oom(ldb);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       if (flags & LDB_FLG_RDONLY) {
+               /*
+                * This is weird, but because we can only have one tdb
+                * in this process, and the other one could be
+                * read-write, we can't use the tdb readonly.  Plus a
+                * read only tdb prohibits the all-record lock.
+                */
+               open_flags = O_RDWR;
+
+               ltdb->read_only = true;
+
+       } else if (flags & LDB_FLG_DONT_CREATE_DB) {
+               /*
+                * This is used by ldbsearch to prevent creation of the database
+                * if the name is wrong
+                */
+               open_flags = O_RDWR;
+       } else {
+               /*
+                * This is the normal case
+                */
+               open_flags = O_CREAT | O_RDWR;
+       }
+
+       ltdb->kv_ops = &key_value_ops;
+
        /* note that we use quite a large default hash size */
        ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
                                   tdb_flags, open_flags,
                                   ldb_get_create_perms(ldb), ldb);
        if (!ltdb->tdb) {
                ldb_asprintf_errstring(ldb,
-                                      "Unable to open tdb '%s'", path);
+                                      "Unable to open tdb '%s': %s", path, strerror(errno));
                ldb_debug(ldb, LDB_DEBUG_ERROR,
-                         "Unable to open tdb '%s'", path);
-               talloc_free(ltdb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       if (getenv("LDB_WARN_UNINDEXED")) {
-               ltdb->warn_unindexed = true;
-       }
-
-       ltdb->sequence_number = 0;
-
-       module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
-       if (!module) {
-               ldb_oom(ldb);
+                         "Unable to open tdb '%s': %s", path, strerror(errno));
                talloc_free(ltdb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ldb_module_set_private(module, ltdb);
-       talloc_steal(module, ltdb);
-
-       if (ltdb_cache_load(module) != 0) {
-               ldb_asprintf_errstring(ldb,
-                                      "Unable to load ltdb cache records of tdb '%s'", path);
-               talloc_free(module);
+               if (errno == EACCES || errno == EPERM) {
+                       return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
+               }
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       *_module = module;
-       return LDB_SUCCESS;
-}
-
-int ldb_tdb_init(const char *version)
-{
-       LDB_MODULE_CHECK_VERSION(version);
-       return ldb_register_backend("tdb", ltdb_connect, false);
+       return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }