ldb: Add tests for when we should expect a full scan
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index 3dfd1ccf652a0dc979f78e91e4d1393fb7ab9895..85816040ac76f1fdade38fa5acbdbbce5888faa5 100644 (file)
@@ -94,31 +94,65 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
 /*
   lock the database for read - use by ltdb_search and ltdb_sequence_number
 */
-int ltdb_lock_read(struct ldb_module *module)
+static int ltdb_lock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       int ret = 0;
+       int tdb_ret = 0;
+       int ret;
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
 
-       if (ltdb->in_transaction == 0 &&
+       if (tdb_transaction_active(ltdb->tdb) == false &&
            ltdb->read_lock_count == 0) {
-               ret = tdb_lockall_read(ltdb->tdb);
+               tdb_ret = tdb_lockall_read(ltdb->tdb);
        }
-       if (ret == 0) {
+       if (tdb_ret == 0) {
                ltdb->read_lock_count++;
+               return LDB_SUCCESS;
+       }
+       ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
        }
+       ldb_debug_set(ldb_module_get_ctx(module),
+                     LDB_DEBUG_FATAL,
+                     "Failure during ltdb_lock_read(): %s -> %s",
+                     tdb_errorstr(ltdb->tdb),
+                     ldb_strerror(ret));
        return ret;
 }
 
 /*
   unlock the database after a ltdb_lock_read()
 */
-int ltdb_unlock_read(struct ldb_module *module)
+static int ltdb_unlock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       if (!tdb_transaction_active(ltdb->tdb) && ltdb->read_lock_count == 1) {
                tdb_unlockall_read(ltdb->tdb);
+               ltdb->read_lock_count--;
                return 0;
        }
        ltdb->read_lock_count--;
@@ -126,6 +160,36 @@ int ltdb_unlock_read(struct ldb_module *module)
 }
 
 
+/* 
+ * Determine if this key could hold a record.  We allow the new GUID
+ * index, the old DN index and a possible future ID=
+ */
+bool ltdb_key_is_record(TDB_DATA key)
+{
+       if (key.dsize < 4) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, "DN=", 3) == 0) {
+               return true;
+       }
+       
+       if (memcmp(key.dptr, "ID=", 3) == 0) {
+               return true;
+       }
+
+       if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
+                  sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
+               return true;
+       }
+       
+       return false;
+}
+
 /*
   form a TDB_DATA for a record key
   caller frees
@@ -133,9 +197,9 @@ int ltdb_unlock_read(struct ldb_module *module)
   note that the key for a record can depend on whether the
   dn refers to a case sensitive index record or not
 */
-TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
+TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                    struct ldb_dn *dn)
 {
-       struct ldb_context *ldb = ldb_module_get_ctx(module);
        TDB_DATA key;
        char *key_str = NULL;
        const char *dn_folded = NULL;
@@ -157,7 +221,7 @@ TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
                goto failed;
        }
 
-       key_str = talloc_strdup(ldb, "DN=");
+       key_str = talloc_strdup(mem_ctx, "DN=");
        if (!key_str) {
                goto failed;
        }
@@ -179,6 +243,121 @@ failed:
        return key;
 }
 
+/* The caller is to provide a correctly sized key */
+int ltdb_guid_to_key(struct ldb_module *module,
+                    struct ltdb_private *ltdb,
+                    const struct ldb_val *GUID_val,
+                    TDB_DATA *key)
+{
+       const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
+       const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
+
+       if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
+       memcpy(&key->dptr[GUID_prefix_len],
+              GUID_val->data, GUID_val->length);
+       return LDB_SUCCESS;
+}
+
+/*
+ * The caller is to provide a correctly sized key, used only in
+ * the GUID index mode
+ */
+int ltdb_idx_to_key(struct ldb_module *module,
+                   struct ltdb_private *ltdb,
+                   TALLOC_CTX *mem_ctx,
+                   const struct ldb_val *idx_val,
+                   TDB_DATA *key)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_dn *dn;
+
+       if (ltdb->cache->GUID_index_attribute != NULL) {
+               return ltdb_guid_to_key(module, ltdb,
+                                       idx_val, key);
+       }
+
+       dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
+       if (dn == NULL) {
+               /*
+                * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
+                * to the caller, as this in an invalid index value
+                */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       /* form the key */
+       *key = ltdb_key_dn(module, mem_ctx, dn);
+       TALLOC_FREE(dn);
+       if (!key->dptr) {
+               return ldb_module_oom(module);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+  form a TDB_DATA for a record key
+  caller frees mem_ctx, which may or may not have the key
+  as a child.
+
+  note that the key for a record can depend on whether a
+  GUID index is in use, or the DN is used as the key
+*/
+TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                     const struct ldb_message *msg)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       TDB_DATA key;
+       const struct ldb_val *guid_val;
+       int ret;
+
+       if (ltdb->cache->GUID_index_attribute == NULL) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       if (ldb_dn_is_special(msg->dn)) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       guid_val = ldb_msg_find_ldb_val(msg,
+                                      ltdb->cache->GUID_index_attribute);
+       if (guid_val == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Did not find GUID attribute %s "
+                                      "in %s, required for TDB record "
+                                      "key in " LTDB_IDXGUID " mode.",
+                                      ltdb->cache->GUID_index_attribute,
+                                      ldb_dn_get_linearized(msg->dn));
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+
+       /* In this case, allocate with talloc */
+       key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
+       if (key.dptr == NULL) {
+               errno = ENOMEM;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       key.dsize = talloc_get_size(key.dptr);
+
+       ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
+
+       if (ret != LDB_SUCCESS) {
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       return key;
+}
+
 /*
   check special dn's have valid attributes
   currently only @ATTRIBUTES is checked
@@ -222,7 +401,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
        /* only allow modifies inside a transaction, otherwise the
         * ldb is unsafe */
-       if (ltdb->in_transaction == 0) {
+       if (ltdb->kv_ops->transaction_active(ltdb) == false) {
                ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -234,7 +413,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                if (ltdb->warn_reindex) {
                        ldb_debug(ldb_module_get_ctx(module),
                                LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
-                               tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+                               ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
                }
                ret = ltdb_reindex(module);
        }
@@ -253,9 +432,41 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                ret = ltdb_cache_reload(module);
        }
 
+       if (ret != LDB_SUCCESS) {
+               ltdb->reindex_failed = true;
+       }
+
        return ret;
 }
 
+static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
+                         struct ldb_val ldb_data, int flags)
+{
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_store(ltdb->tdb, key, data, flags);
+}
+
+static int ltdb_error(struct ltdb_private *ltdb)
+{
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char *ltdb_errorstr(struct ltdb_private *ltdb)
+{
+       return tdb_errorstr(ltdb->tdb);
+}
+
 /*
   store a record into the db
 */
@@ -263,33 +474,55 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
+       TDB_DATA tdb_key;
+       struct ldb_val ldb_key;
        struct ldb_val ldb_data;
        int ret = LDB_SUCCESS;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
+
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
 
-       tdb_key = ltdb_key(module, msg->dn);
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (tdb_key.dptr == NULL) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
        ret = ldb_pack_data(ldb_module_get_ctx(module),
                            msg, &ldb_data);
        if (ret == -1) {
-               talloc_free(tdb_key.dptr);
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       tdb_data.dptr = ldb_data.data;
-       tdb_data.dsize = ldb_data.length;
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
 
-       ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
+       ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               bool is_special = ldb_dn_is_special(msg->dn);
+               ret = ltdb->kv_ops->error(ltdb);
+
+               /*
+                * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
+                * the GUID, so re-map
+                */
+               if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
+                   && !is_special
+                   && ltdb->cache->GUID_index_attribute != NULL) {
+                       ret = LDB_ERR_CONSTRAINT_VIOLATION;
+               }
                goto done;
        }
 
 done:
-       talloc_free(tdb_key.dptr);
+       TALLOC_FREE(tdb_key_ctx);
        talloc_free(ldb_data.data);
 
        return ret;
@@ -325,12 +558,13 @@ static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
 }
 
 static int ltdb_add_internal(struct ldb_module *module,
+                            struct ltdb_private *ltdb,
                             const struct ldb_message *msg,
                             bool check_single_value)
 {
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        int ret = LDB_SUCCESS;
-       unsigned int i, j;
+       unsigned int i;
 
        for (i=0;i<msg->num_elements;i++) {
                struct ldb_message_element *el = &msg->elements[i];
@@ -355,23 +589,50 @@ static int ltdb_add_internal(struct ldb_module *module,
                        continue;
                }
 
-               if (check_single_value) {
-                       /* TODO: This is O(n^2) - replace with more efficient check */
-                       for (j=0; j<el->num_values; j++) {
-                               if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                       ldb_asprintf_errstring(ldb,
-                                                              "attribute '%s': value #%u on '%s' "
-                                                              "provided more than once in ADD object",
-                                                              el->name, j, 
-                                                              ldb_dn_get_linearized(msg->dn));
-                                       return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                               }
+               if (check_single_value &&
+                   !(el->flags &
+                     LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                       struct ldb_val *duplicate = NULL;
+
+                       ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
+                                                        el, &duplicate, 0);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
+                       }
+                       if (duplicate != NULL) {
+                               ldb_asprintf_errstring(
+                                       ldb,
+                                       "attribute '%s': value '%.*s' on '%s' "
+                                       "provided more than once in ADD object",
+                                       el->name,
+                                       (int)duplicate->length,
+                                       duplicate->data,
+                                       ldb_dn_get_linearized(msg->dn));
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                        }
                }
        }
 
        ret = ltdb_store(module, msg, TDB_INSERT);
        if (ret != LDB_SUCCESS) {
+               /*
+                * Try really hard to get the right error code for
+                * a re-add situation, as this can matter!
+                */
+               if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
+                       int ret2;
+                       struct ldb_dn *dn2 = NULL;
+                       TALLOC_CTX *mem_ctx = talloc_new(module);
+                       if (mem_ctx == NULL) {
+                               return ldb_module_operr(module);
+                       }
+                       ret2 = ltdb_search_base(module, module,
+                                               msg->dn, &dn2);
+                       TALLOC_FREE(mem_ctx);
+                       if (ret2 == LDB_SUCCESS) {
+                               ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       }
+               }
                if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
                        ldb_asprintf_errstring(ldb,
                                               "Entry %s already exists",
@@ -380,8 +641,19 @@ static int ltdb_add_internal(struct ldb_module *module,
                return ret;
        }
 
-       ret = ltdb_index_add_new(module, msg);
+       ret = ltdb_index_add_new(module, ltdb, msg);
        if (ret != LDB_SUCCESS) {
+               /*
+                * If we failed to index, delete the message again.
+                *
+                * This is particularly important for the GUID index
+                * case, which will only fail for a duplicate DN
+                * in the index add.
+                *
+                * Note that the caller may not cancel the transation
+                * and this means the above add might really show up!
+                */
+               ltdb_delete_noindex(module, msg);
                return ret;
        }
 
@@ -397,8 +669,20 @@ static int ltdb_add(struct ltdb_context *ctx)
 {
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        int ret = LDB_SUCCESS;
 
+       if (ltdb->max_key_length != 0 &&
+           ltdb->cache->GUID_index_attribute == NULL &&
+           !ldb_dn_is_special(req->op.add.message->dn))
+       {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Must operate ldb_mdb in GUID "
+                                 "index mode, but " LTDB_IDXGUID " not set.");
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
        ret = ltdb_check_special_dn(module, req->op.add.message);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -410,32 +694,61 @@ static int ltdb_add(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_add_internal(module, req->op.add.message, true);
+       ret = ltdb_add_internal(module, ltdb,
+                               req->op.add.message, true);
 
        return ret;
 }
 
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
+{
+       TDB_DATA tdb_key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_delete(ltdb->tdb, tdb_key);
+}
+
 /*
   delete a record from the database, not updating indexes (used for deleting
   index records)
 */
-int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
+int ltdb_delete_noindex(struct ldb_module *module,
+                       const struct ldb_message *msg)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ldb_val ldb_key;
        TDB_DATA tdb_key;
        int ret;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
+
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
 
-       tdb_key = ltdb_key(module, dn);
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (!tdb_key.dptr) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       ret = tdb_delete(ltdb->tdb, tdb_key);
-       talloc_free(tdb_key.dptr);
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
+
+       ret = ltdb->kv_ops->delete(ltdb, ldb_key);
+       TALLOC_FREE(tdb_key_ctx);
 
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb->kv_ops->error(ltdb);
        }
 
        return ret;
@@ -459,7 +772,7 @@ static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
                goto done;
        }
 
-       ret = ltdb_delete_noindex(module, dn);
+       ret = ltdb_delete_noindex(module, msg);
        if (ret != LDB_SUCCESS) {
                goto done;
        }
@@ -570,11 +883,23 @@ static int ltdb_msg_add_element(struct ldb_message *msg,
   delete all elements having a specified attribute name
 */
 static int msg_delete_attribute(struct ldb_module *module,
+                               struct ltdb_private *ltdb,
                                struct ldb_message *msg, const char *name)
 {
        unsigned int i;
        int ret;
        struct ldb_message_element *el;
+       bool is_special = ldb_dn_is_special(msg->dn);
+
+       if (!is_special
+           && ltdb->cache->GUID_index_attribute != NULL
+           && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "Must not modify GUID "
+                                      "attribute %s (used as DB index)",
+                                      ltdb->cache->GUID_index_attribute);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
 
        el = ldb_msg_find_element(msg, name);
        if (el == NULL) {
@@ -582,7 +907,7 @@ static int msg_delete_attribute(struct ldb_module *module,
        }
        i = el - msg->elements;
 
-       ret = ltdb_index_del_element(module, msg->dn, el);
+       ret = ltdb_index_del_element(module, ltdb, msg, el);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
@@ -604,6 +929,7 @@ static int msg_delete_attribute(struct ldb_module *module,
   return LDB Error on failure
 */
 static int msg_delete_element(struct ldb_module *module,
+                             struct ltdb_private *ltdb,
                              struct ldb_message *msg,
                              const char *name,
                              const struct ldb_val *val)
@@ -636,10 +962,11 @@ static int msg_delete_element(struct ldb_module *module,
                }
                if (matched) {
                        if (el->num_values == 1) {
-                               return msg_delete_attribute(module, msg, name);
+                               return msg_delete_attribute(module,
+                                                           ltdb, msg, name);
                        }
 
-                       ret = ltdb_index_del_value(module, msg->dn, el, i);
+                       ret = ltdb_index_del_value(module, ltdb, msg, el, i);
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
@@ -661,7 +988,6 @@ static int msg_delete_element(struct ldb_module *module,
        return LDB_ERR_NO_SUCH_ATTRIBUTE;
 }
 
-
 /*
   modify a record - internal interface
 
@@ -678,55 +1004,43 @@ int ltdb_modify_internal(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
-       struct ldb_val ldb_data;
        struct ldb_message *msg2;
-       unsigned int i, j, k;
+       unsigned int i, j;
        int ret = LDB_SUCCESS, idx;
        struct ldb_control *control_permissive = NULL;
+       TALLOC_CTX *mem_ctx = talloc_new(req);
 
+       if (mem_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+       
        if (req) {
                control_permissive = ldb_request_get_control(req,
                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
        }
 
-       tdb_key = ltdb_key(module, msg->dn);
-       if (!tdb_key.dptr) {
-               return LDB_ERR_OTHER;
-       }
-
-       tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
-       if (!tdb_data.dptr) {
-               talloc_free(tdb_key.dptr);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
-       }
-
-       msg2 = ldb_msg_new(tdb_key.dptr);
+       msg2 = ldb_msg_new(mem_ctx);
        if (msg2 == NULL) {
-               free(tdb_data.dptr);
                ret = LDB_ERR_OTHER;
                goto done;
        }
 
-       ldb_data.data = tdb_data.dptr;
-       ldb_data.length = tdb_data.dsize;
-
-       ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
-       free(tdb_data.dptr);
-       if (ret == -1) {
-               ret = LDB_ERR_OTHER;
+       ret = ltdb_search_dn1(module, msg->dn,
+                             msg2,
+                             LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
+       if (ret != LDB_SUCCESS) {
                goto done;
        }
 
-       if (!msg2->dn) {
-               msg2->dn = msg->dn;
-       }
-
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i], *el2;
                struct ldb_val *vals;
                const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
                const char *dn;
+               uint32_t options = 0;
+               if (control_permissive != NULL) {
+                       options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
+               }
 
                switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
                case LDB_FLAG_MOD_ADD:
@@ -775,7 +1089,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                        ret = LDB_ERR_OTHER;
                                        goto done;
                                }
-                               ret = ltdb_index_add_element(module, msg2->dn,
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2,
                                                             el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
@@ -795,34 +1110,45 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                                /* Check that values don't exist yet on multi-
                                   valued attributes or aren't provided twice */
-                               /* TODO: This is O(n^2) - replace with more efficient check */
-                               if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
-                                       for (j = 0; j < el->num_values; j++) {
-                                               if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
-                                                       if (control_permissive) {
-                                                               /* remove this one as if it was never added */
-                                                               el->num_values--;
-                                                               for (k = j; k < el->num_values; k++) {
-                                                                       el->values[k] = el->values[k + 1];
-                                                               }
-                                                               j--; /* rewind */
-
-                                                               continue;
-                                                       }
-
-                                                       ldb_asprintf_errstring(ldb,
-                                                                              "attribute '%s': value #%u on '%s' already exists",
-                                                                              el->name, j, ldb_dn_get_linearized(msg2->dn));
-                                                       ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                                                       goto done;
-                                               }
-                                               if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                                       ldb_asprintf_errstring(ldb,
-                                                                              "attribute '%s': value #%u on '%s' provided more than once in ADD",
-                                                                              el->name, j, ldb_dn_get_linearized(msg2->dn));
-                                                       ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                                                       goto done;
-                                               }
+                               if (!(el->flags &
+                                     LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                                       struct ldb_val *duplicate = NULL;
+                                       ret = ldb_msg_find_common_values(ldb,
+                                                                        msg2,
+                                                                        el,
+                                                                        el2,
+                                                                        options);
+
+                                       if (ret ==
+                                           LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
+                                               ldb_asprintf_errstring(ldb,
+                                                       "attribute '%s': value "
+                                                       "#%u on '%s' already "
+                                                       "exists", el->name, j,
+                                                       ldb_dn_get_linearized(msg2->dn));
+                                               goto done;
+                                       } else if (ret != LDB_SUCCESS) {
+                                               goto done;
+                                       }
+
+                                       ret = ldb_msg_find_duplicate_val(
+                                               ldb, msg2, el, &duplicate, 0);
+                                       if (ret != LDB_SUCCESS) {
+                                               goto done;
+                                       }
+                                       if (duplicate != NULL) {
+                                               ldb_asprintf_errstring(
+                                                       ldb,
+                                                       "attribute '%s': value "
+                                                       "'%.*s' on '%s' "
+                                                       "provided more than "
+                                                       "once in ADD",
+                                                       el->name,
+                                                       (int)duplicate->length,
+                                                       duplicate->data,
+                                                       ldb_dn_get_linearized(msg->dn));
+                                               ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                                               goto done;
                                        }
                                }
 
@@ -845,7 +1171,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                el2->values = vals;
                                el2->num_values += el->num_values;
 
-                               ret = ltdb_index_add_element(module, msg2->dn, el);
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2, el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
                                }
@@ -868,16 +1195,26 @@ int ltdb_modify_internal(struct ldb_module *module,
                         * in Samba, or someone else who can claim to
                         * know what they are doing. 
                         */
-                       if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) { 
-                               /* TODO: This is O(n^2) - replace with more efficient check */
-                               for (j=0; j<el->num_values; j++) {
-                                       if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                               ldb_asprintf_errstring(ldb,
-                                                                      "attribute '%s': value #%u on '%s' provided more than once in REPLACE",
-                                                                      el->name, j, ldb_dn_get_linearized(msg2->dn));
-                                               ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                                               goto done;
-                                       }
+                       if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                               struct ldb_val *duplicate = NULL;
+
+                               ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
+                                                                &duplicate, 0);
+                               if (ret != LDB_SUCCESS) {
+                                       goto done;
+                               }
+                               if (duplicate != NULL) {
+                                       ldb_asprintf_errstring(
+                                               ldb,
+                                               "attribute '%s': value '%.*s' "
+                                               "on '%s' provided more than "
+                                               "once in REPLACE",
+                                               el->name,
+                                               (int)duplicate->length,
+                                               duplicate->data,
+                                               ldb_dn_get_linearized(msg2->dn));
+                                       ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                                       goto done;
                                }
                        }
 
@@ -899,7 +1236,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                }
 
                                /* Delete the attribute if it exists in the DB */
-                               if (msg_delete_attribute(module, msg2,
+                               if (msg_delete_attribute(module, ltdb,
+                                                        msg2,
                                                         el->name) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
@@ -912,7 +1250,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                goto done;
                        }
 
-                       ret = ltdb_index_add_element(module, msg2->dn, el);
+                       ret = ltdb_index_add_element(module, ltdb,
+                                                    msg2, el);
                        if (ret != LDB_SUCCESS) {
                                goto done;
                        }
@@ -928,7 +1267,9 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                        if (msg->elements[i].num_values == 0) {
                                /* Delete the whole attribute */
-                               ret = msg_delete_attribute(module, msg2,
+                               ret = msg_delete_attribute(module,
+                                                          ltdb,
+                                                          msg2,
                                                           msg->elements[i].name);
                                if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                    control_permissive) {
@@ -945,6 +1286,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                                /* Delete specified values from an attribute */
                                for (j=0; j < msg->elements[i].num_values; j++) {
                                        ret = msg_delete_element(module,
+                                                                ltdb,
                                                                 msg2,
                                                                 msg->elements[i].name,
                                                                 &msg->elements[i].values[j]);
@@ -983,7 +1325,7 @@ int ltdb_modify_internal(struct ldb_module *module,
        }
 
 done:
-       talloc_free(tdb_key.dptr);
+       TALLOC_FREE(mem_ctx);
        return ret;
 }
 
@@ -1024,6 +1366,7 @@ static int ltdb_rename(struct ltdb_context *ctx)
        struct ldb_message *msg;
        int ret = LDB_SUCCESS;
        TDB_DATA tdb_key, tdb_key_old;
+       struct ldb_dn *db_dn;
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
@@ -1046,33 +1389,54 @@ static int ltdb_rename(struct ltdb_context *ctx)
 
        /* We need to, before changing the DB, check if the new DN
         * exists, so we can return this error to the caller with an
-        * unmodified DB */
-       tdb_key = ltdb_key(module, req->op.rename.newdn);
+        * unmodified DB
+        *
+        * Even in GUID index mode we use ltdb_key_dn() as we are
+        * trying to figure out if this is just a case rename
+        */
+       tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
        if (!tdb_key.dptr) {
                talloc_free(msg);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       tdb_key_old = ltdb_key(module, req->op.rename.olddn);
+       tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
        if (!tdb_key_old.dptr) {
                talloc_free(msg);
                talloc_free(tdb_key.dptr);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
-       if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
-               if (tdb_exists(ltdb->tdb, tdb_key)) {
-                       talloc_free(tdb_key_old.dptr);
-                       talloc_free(tdb_key.dptr);
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module),
-                                              "Entry %s already exists",
-                                              ldb_dn_get_linearized(req->op.rename.newdn));
-                       /* finding the new record already in the DB is an error */
-                       talloc_free(msg);
-                       return LDB_ERR_ENTRY_ALREADY_EXISTS;
+       /*
+        * Only declare a conflict if the new DN already exists,
+        * and it isn't a case change on the old DN
+        */
+       if (tdb_key_old.dsize != tdb_key.dsize
+           || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
+               ret = ltdb_search_base(module, msg,
+                                      req->op.rename.newdn,
+                                      &db_dn);
+               if (ret == LDB_SUCCESS) {
+                       ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+               } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
                }
        }
+
+       /* finding the new record already in the DB is an error */
+
+       if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Entry %s already exists",
+                                      ldb_dn_get_linearized(req->op.rename.newdn));
+       }
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tdb_key_old.dptr);
+               talloc_free(tdb_key.dptr);
+               talloc_free(msg);
+               return ret;
+       }
+
        talloc_free(tdb_key_old.dptr);
        talloc_free(tdb_key.dptr);
 
@@ -1096,53 +1460,177 @@ static int ltdb_rename(struct ltdb_context *ctx)
         * deleted attributes. We could go through all elements but that's
         * maybe not the most efficient way
         */
-       ret = ltdb_add_internal(module, msg, false);
+       ret = ltdb_add_internal(module, ltdb, msg, false);
 
        talloc_free(msg);
 
        return ret;
 }
 
+static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_start(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_cancel(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_prepare_commit(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_commit(ltdb->tdb);
+}
+
 static int ltdb_start_trans(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-       if (tdb_transaction_start(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       /* Do not take out the transaction lock on a read-only DB */
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       if (ltdb->kv_ops->begin_write(ltdb) != 0) {
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       ltdb->in_transaction++;
 
        ltdb_index_transaction_start(module);
 
+       ltdb->reindex_failed = false;
+
        return LDB_SUCCESS;
 }
 
+/*
+ * Forward declaration to allow prepare_commit to in fact abort the
+ * transaction
+ */
+static int ltdb_del_trans(struct ldb_module *module);
+
 static int ltdb_prepare_commit(struct ldb_module *module)
 {
        int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
 
-       if (ltdb->in_transaction != 1) {
-               return LDB_SUCCESS;
+       if (!ltdb->kv_ops->transaction_active(ltdb)) {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "ltdb_prepare_commit() called "
+                                 "without transaction active");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * Check if the last re-index failed.
+        *
+        * This can happen if for example a duplicate value was marked
+        * unique.  We must not write a partial re-index into the DB.
+        */
+       if (ltdb->reindex_failed) {
+               /*
+                * We must instead abort the transaction so we get the
+                * old values and old index back
+                */
+               ltdb_del_trans(module);
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Failure during re-index, so "
+                                 "transaction must be aborted.");
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ret = ltdb_index_transaction_commit(module);
        if (ret != LDB_SUCCESS) {
-               tdb_transaction_cancel(ltdb->tdb);
-               ltdb->in_transaction--;
+               ltdb->kv_ops->abort_write(ltdb);
                return ret;
        }
 
-       if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
-               ltdb->in_transaction--;
-               ldb_asprintf_errstring(ldb_module_get_ctx(module),
-                                      "Failure during tdb_transaction_prepare_commit(): %s -> %s",
-                                      tdb_errorstr(ltdb->tdb),
-                                      ldb_strerror(ret));
+       if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
+               ldb_debug_set(ldb_module_get_ctx(module),
+                             LDB_DEBUG_FATAL,
+                             "Failure during "
+                             "prepare_write): %s -> %s",
+                             ltdb->kv_ops->errorstr(ltdb),
+                             ldb_strerror(ret));
                return ret;
        }
 
@@ -1164,14 +1652,13 @@ static int ltdb_end_trans(struct ldb_module *module)
                }
        }
 
-       ltdb->in_transaction--;
        ltdb->prepared_commit = false;
 
-       if (tdb_transaction_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ldb_asprintf_errstring(ldb_module_get_ctx(module),
                                       "Failure during tdb_transaction_commit(): %s -> %s",
-                                      tdb_errorstr(ltdb->tdb),
+                                      ltdb->kv_ops->errorstr(ltdb),
                                       ldb_strerror(ret));
                return ret;
        }
@@ -1184,14 +1671,13 @@ static int ltdb_del_trans(struct ldb_module *module)
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-       ltdb->in_transaction--;
 
        if (ltdb_index_transaction_cancel(module) != 0) {
-               tdb_transaction_cancel(ltdb->tdb);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ltdb->kv_ops->abort_write(ltdb);
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       tdb_transaction_cancel(ltdb->tdb);
+       ltdb->kv_ops->abort_write(ltdb);
        return LDB_SUCCESS;
 }
 
@@ -1204,6 +1690,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
        struct ldb_context *ldb;
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TALLOC_CTX *tmp_ctx = NULL;
        struct ldb_seqnum_request *seq;
        struct ldb_seqnum_result *res;
@@ -1222,7 +1710,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-       if (ltdb_lock_read(module) != 0) {
+       if (ltdb->kv_ops->lock_read(module) != 0) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1284,7 +1772,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 done:
        talloc_free(tmp_ctx);
-       ltdb_unlock_read(module);
+
+       ltdb->kv_ops->unlock_read(module);
        return ret;
 }
 
@@ -1381,6 +1870,181 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
        ltdb_request_extended_done(ctx, ext, ret);
 }
 
+struct kv_ctx {
+       ldb_kv_traverse_fn kv_traverse_fn;
+       void *ctx;
+       struct ltdb_private *ltdb;
+       int (*parser)(struct ldb_val key,
+                     struct ldb_val data,
+                     void *private_data);
+};
+
+static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+       return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .kv_traverse_fn = fn,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       if (tdb_transaction_active(ltdb->tdb)) {
+               return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       } else {
+               return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       }
+}
+
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
+                                     struct ldb_val ldb_key,
+                                     struct ldb_val ldb_key2,
+                                     struct ldb_val ldb_data, void *state)
+{
+       int tdb_ret;
+       struct ldb_context *ldb;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA key2 = {
+               .dptr = ldb_key2.data,
+               .dsize = ldb_key2.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+
+       ldb = ldb_module_get_ctx(module);
+
+       tdb_ret = tdb_delete(ltdb->tdb, key);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to delete %*.*s "
+                         "for rekey as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to rekey %*.*s as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       return tdb_ret;
+}
+
+static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
+                                        void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+
+       return kv_ctx->parser(key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
+                                struct ldb_val ldb_key,
+                                int (*parser)(struct ldb_val key,
+                                              struct ldb_val data,
+                                              void *private_data),
+                                void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .parser = parser,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       int ret;
+
+       if (tdb_transaction_active(ltdb->tdb) == false &&
+           ltdb->read_lock_count == 0) {
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+                              &kv_ctx);
+       if (ret == 0) {
+               return LDB_SUCCESS;
+       }
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
+{
+       return tdb_name(ltdb->tdb);
+}
+
+static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
+{
+       int seq = tdb_get_seqnum(ltdb->tdb);
+       bool has_changed = (seq != ltdb->tdb_seqnum);
+
+       ltdb->tdb_seqnum = seq;
+
+       return has_changed;
+}
+
+static bool ltdb_transaction_active(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_active(ltdb->tdb);
+}
+
+static const struct kv_db_ops key_value_ops = {
+       .store = ltdb_tdb_store,
+       .delete = ltdb_tdb_delete,
+       .iterate = ltdb_tdb_traverse_fn,
+       .update_in_iterate = ltdb_tdb_update_in_iterate,
+       .fetch_and_parse = ltdb_tdb_parse_record,
+       .lock_read = ltdb_lock_read,
+       .unlock_read = ltdb_unlock_read,
+       .begin_write = ltdb_tdb_transaction_start,
+       .prepare_write = ltdb_tdb_transaction_prepare_commit,
+       .finish_write = ltdb_tdb_transaction_commit,
+       .abort_write = ltdb_tdb_transaction_cancel,
+       .error = ltdb_error,
+       .errorstr = ltdb_errorstr,
+       .name = ltdb_tdb_name,
+       .has_changed = ltdb_tdb_changed,
+       .transaction_active = ltdb_transaction_active,
+};
+
 static void ltdb_callback(struct tevent_context *ev,
                          struct tevent_timer *te,
                          struct timeval t,
@@ -1476,7 +2140,7 @@ static int ltdb_handle_request(struct ldb_module *module,
                return LDB_ERR_TIME_LIMIT_EXCEEDED;
        }
 
-       ev = ldb_get_event_context(ldb);
+       ev = ldb_handle_get_event_context(req->handle);
 
        ac = talloc_zero(ldb, struct ltdb_context);
        if (ac == NULL) {
@@ -1529,6 +2193,21 @@ static int ltdb_init_rootdse(struct ldb_module *module)
        return LDB_SUCCESS;
 }
 
+
+static int generic_lock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->lock_read(module);
+}
+
+static int generic_unlock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->unlock_read(module);
+}
+
 static const struct ldb_module_ops ltdb_ops = {
        .name              = "tdb",
        .init_context      = ltdb_init_rootdse,
@@ -1542,20 +2221,100 @@ static const struct ldb_module_ops ltdb_ops = {
        .end_transaction   = ltdb_end_trans,
        .prepare_commit    = ltdb_prepare_commit,
        .del_transaction   = ltdb_del_trans,
+       .read_lock         = generic_lock_read,
+       .read_unlock       = generic_unlock_read,
 };
 
+int init_store(struct ltdb_private *ltdb,
+                     const char *name,
+                     struct ldb_context *ldb,
+                     const char *options[],
+                     struct ldb_module **_module)
+{
+       if (getenv("LDB_WARN_UNINDEXED")) {
+               ltdb->warn_unindexed = true;
+       }
+
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
+       ltdb->sequence_number = 0;
+
+       ltdb->pid = getpid();
+
+       ltdb->module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
+       if (!ltdb->module) {
+               ldb_oom(ldb);
+               talloc_free(ltdb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_module_set_private(ltdb->module, ltdb);
+       talloc_steal(ltdb->module, ltdb);
+
+       if (ltdb_cache_load(ltdb->module) != 0) {
+               ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
+                                      "records for backend '%s'", name);
+               talloc_free(ltdb->module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *_module = ltdb->module;
+       /*
+        * Set or override the maximum key length
+        *
+        * The ldb_mdb code will have set this to 511, but our tests
+        * set this even smaller (to make the tests more practical).
+        *
+        * This must only be used for the selftest as the length
+        * becomes encoded in the index keys.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "max_key_len_for_self_test");
+               if (len_str != NULL) {
+                       unsigned len = strtoul(len_str, NULL, 0);
+                       ltdb->max_key_length = len;
+               }
+       }
+
+       /*
+        * Override full DB scans
+        *
+        * A full DB scan is expensive on a large database.  This
+        * option is for testing to show that the full DB scan is not
+        * triggered.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "disable_full_db_scan_for_self_test");
+               if (len_str != NULL) {
+                       ltdb->disable_full_db_scan = true;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   connect to the database
 */
-static int ltdb_connect(struct ldb_context *ldb, const char *url,
-                       unsigned int flags, const char *options[],
-                       struct ldb_module **_module)
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+                unsigned int flags, const char *options[],
+                struct ldb_module **_module)
 {
-       struct ldb_module *module;
        const char *path;
        int tdb_flags, open_flags;
        struct ltdb_private *ltdb;
 
+       /*
+        * We hold locks, so we must use a private event context
+        * on each returned handle
+        */
+       ldb_set_require_private_event_context(ldb);
+
        /* parse the url */
        if (strchr(url, ':')) {
                if (strncmp(url, "tdb://", 6) != 0) {
@@ -1568,7 +2327,7 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                path = url;
        }
 
-       tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
+       tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
 
        /* check for the 'nosync' option */
        if (flags & LDB_FLG_NOSYNC) {
@@ -1580,20 +2339,39 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                tdb_flags |= TDB_NOMMAP;
        }
 
+       ltdb = talloc_zero(ldb, struct ltdb_private);
+       if (!ltdb) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        if (flags & LDB_FLG_RDONLY) {
-               open_flags = O_RDONLY;
+               /*
+                * This is weird, but because we can only have one tdb
+                * in this process, and the other one could be
+                * read-write, we can't use the tdb readonly.  Plus a
+                * read only tdb prohibits the all-record lock.
+                */
+               open_flags = O_RDWR;
+
+               ltdb->read_only = true;
+
        } else if (flags & LDB_FLG_DONT_CREATE_DB) {
+               /*
+                * This is used by ldbsearch to prevent creation of the database
+                * if the name is wrong
+                */
                open_flags = O_RDWR;
        } else {
+               /*
+                * This is the normal case
+                */
                open_flags = O_CREAT | O_RDWR;
        }
 
-       ltdb = talloc_zero(ldb, struct ltdb_private);
-       if (!ltdb) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       ltdb->kv_ops = &key_value_ops;
 
+       errno = 0;
        /* note that we use quite a large default hash size */
        ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
                                   tdb_flags, open_flags,
@@ -1610,38 +2388,5 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (getenv("LDB_WARN_UNINDEXED")) {
-               ltdb->warn_unindexed = true;
-       }
-
-       if (getenv("LDB_WARN_REINDEX")) {
-               ltdb->warn_reindex = true;
-       }
-
-       ltdb->sequence_number = 0;
-
-       module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
-       if (!module) {
-               ldb_oom(ldb);
-               talloc_free(ltdb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ldb_module_set_private(module, ltdb);
-       talloc_steal(module, ltdb);
-
-       if (ltdb_cache_load(module) != 0) {
-               ldb_asprintf_errstring(ldb,
-                                      "Unable to load ltdb cache records of tdb '%s'", path);
-               talloc_free(module);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       *_module = module;
-       return LDB_SUCCESS;
-}
-
-int ldb_tdb_init(const char *version)
-{
-       LDB_MODULE_CHECK_VERSION(version);
-       return ldb_register_backend("tdb", ltdb_connect, false);
+       return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }