ldb: Add tests for when we should expect a full scan
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index d2d8ee8b151965faaa988e5f888dda7e992bd875..85816040ac76f1fdade38fa5acbdbbce5888faa5 100644 (file)
@@ -94,14 +94,25 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
 /*
   lock the database for read - use by ltdb_search and ltdb_sequence_number
 */
-int ltdb_lock_read(struct ldb_module *module)
+static int ltdb_lock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        int tdb_ret = 0;
        int ret;
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
 
-       if (ltdb->in_transaction == 0 &&
+       if (tdb_transaction_active(ltdb->tdb) == false &&
            ltdb->read_lock_count == 0) {
                tdb_ret = tdb_lockall_read(ltdb->tdb);
        }
@@ -124,11 +135,22 @@ int ltdb_lock_read(struct ldb_module *module)
 /*
   unlock the database after a ltdb_lock_read()
 */
-int ltdb_unlock_read(struct ldb_module *module)
+static int ltdb_unlock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       if (!tdb_transaction_active(ltdb->tdb) && ltdb->read_lock_count == 1) {
                tdb_unlockall_read(ltdb->tdb);
                ltdb->read_lock_count--;
                return 0;
@@ -221,62 +243,58 @@ failed:
        return key;
 }
 
-TDB_DATA ltdb_guid_to_key(struct ldb_module *module,
-                         struct ltdb_private *ltdb,
-                         TALLOC_CTX *mem_ctx,
-                         const struct ldb_val *GUID_val)
+/* The caller is to provide a correctly sized key */
+int ltdb_guid_to_key(struct ldb_module *module,
+                    struct ltdb_private *ltdb,
+                    const struct ldb_val *GUID_val,
+                    TDB_DATA *key)
 {
-       TDB_DATA key;
-       const char *GUID_prefix = "GUID=";
-       const int GUID_prefix_len = strlen(GUID_prefix);
-
-       key.dptr = talloc_size(mem_ctx,
-                              GUID_val->length+GUID_prefix_len);
+       const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
+       const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
 
-       if (key.dptr == NULL) {
-               errno = ENOMEM;
-               key.dptr = NULL;
-               key.dsize = 0;
-               return key;
+       if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
-       memcpy(key.dptr, "GUID=", GUID_prefix_len);
-       memcpy(&key.dptr[GUID_prefix_len],
-              GUID_val->data, GUID_val->length);
 
-       key.dsize = talloc_get_size(key.dptr);
-       return key;
+       memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
+       memcpy(&key->dptr[GUID_prefix_len],
+              GUID_val->data, GUID_val->length);
+       return LDB_SUCCESS;
 }
 
-TDB_DATA ltdb_idx_to_key(struct ldb_module *module,
-                        struct ltdb_private *ltdb,
-                        TALLOC_CTX *mem_ctx,
-                        const struct ldb_val *idx_val)
+/*
+ * The caller is to provide a correctly sized key, used only in
+ * the GUID index mode
+ */
+int ltdb_idx_to_key(struct ldb_module *module,
+                   struct ltdb_private *ltdb,
+                   TALLOC_CTX *mem_ctx,
+                   const struct ldb_val *idx_val,
+                   TDB_DATA *key)
 {
-       TDB_DATA key;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_dn *dn;
 
        if (ltdb->cache->GUID_index_attribute != NULL) {
-               return ltdb_guid_to_key(module, ltdb, mem_ctx, idx_val);
+               return ltdb_guid_to_key(module, ltdb,
+                                       idx_val, key);
        }
 
        dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
        if (dn == NULL) {
-               errno = EINVAL;
-               key.dptr = NULL;
-               key.dsize = 0;
-               return key;
+               /*
+                * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
+                * to the caller, as this in an invalid index value
+                */
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        /* form the key */
-       key = ltdb_key_dn(module, mem_ctx, dn);
+       *key = ltdb_key_dn(module, mem_ctx, dn);
        TALLOC_FREE(dn);
-       if (!key.dptr) {
-               errno = ENOMEM;
-               key.dptr = NULL;
-               key.dsize = 0;
-               return key;
+       if (!key->dptr) {
+               return ldb_module_oom(module);
        }
-       return key;
+       return LDB_SUCCESS;
 }
 
 /*
@@ -294,6 +312,7 @@ TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TDB_DATA key;
        const struct ldb_val *guid_val;
+       int ret;
 
        if (ltdb->cache->GUID_index_attribute == NULL) {
                return ltdb_key_dn(module, mem_ctx, msg->dn);
@@ -306,14 +325,37 @@ TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
        guid_val = ldb_msg_find_ldb_val(msg,
                                       ltdb->cache->GUID_index_attribute);
        if (guid_val == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Did not find GUID attribute %s "
+                                      "in %s, required for TDB record "
+                                      "key in " LTDB_IDXGUID " mode.",
+                                      ltdb->cache->GUID_index_attribute,
+                                      ldb_dn_get_linearized(msg->dn));
                errno = EINVAL;
                key.dptr = NULL;
                key.dsize = 0;
                return key;
        }
 
-       return ltdb_guid_to_key(module, ltdb, mem_ctx, guid_val);
+       /* In this case, allocate with talloc */
+       key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
+       if (key.dptr == NULL) {
+               errno = ENOMEM;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       key.dsize = talloc_get_size(key.dptr);
+
+       ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
 
+       if (ret != LDB_SUCCESS) {
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       return key;
 }
 
 /*
@@ -359,7 +401,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
        /* only allow modifies inside a transaction, otherwise the
         * ldb is unsafe */
-       if (ltdb->in_transaction == 0) {
+       if (ltdb->kv_ops->transaction_active(ltdb) == false) {
                ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -371,7 +413,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                if (ltdb->warn_reindex) {
                        ldb_debug(ldb_module_get_ctx(module),
                                LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
-                               tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+                               ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
                }
                ret = ltdb_reindex(module);
        }
@@ -390,9 +432,41 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                ret = ltdb_cache_reload(module);
        }
 
+       if (ret != LDB_SUCCESS) {
+               ltdb->reindex_failed = true;
+       }
+
        return ret;
 }
 
+static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
+                         struct ldb_val ldb_data, int flags)
+{
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_store(ltdb->tdb, key, data, flags);
+}
+
+static int ltdb_error(struct ltdb_private *ltdb)
+{
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char *ltdb_errorstr(struct ltdb_private *ltdb)
+{
+       return tdb_errorstr(ltdb->tdb);
+}
+
 /*
   store a record into the db
 */
@@ -400,7 +474,8 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
+       TDB_DATA tdb_key;
+       struct ldb_val ldb_key;
        struct ldb_val ldb_data;
        int ret = LDB_SUCCESS;
        TALLOC_CTX *tdb_key_ctx = talloc_new(module);
@@ -426,13 +501,13 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
                return LDB_ERR_OTHER;
        }
 
-       tdb_data.dptr = ldb_data.data;
-       tdb_data.dsize = ldb_data.length;
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
 
-       ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
+       ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
        if (ret != 0) {
                bool is_special = ldb_dn_is_special(msg->dn);
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb->kv_ops->error(ltdb);
 
                /*
                 * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
@@ -598,6 +673,16 @@ static int ltdb_add(struct ltdb_context *ctx)
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        int ret = LDB_SUCCESS;
 
+       if (ltdb->max_key_length != 0 &&
+           ltdb->cache->GUID_index_attribute == NULL &&
+           !ldb_dn_is_special(req->op.add.message->dn))
+       {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Must operate ldb_mdb in GUID "
+                                 "index mode, but " LTDB_IDXGUID " not set.");
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
        ret = ltdb_check_special_dn(module, req->op.add.message);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -615,6 +700,19 @@ static int ltdb_add(struct ltdb_context *ctx)
        return ret;
 }
 
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
+{
+       TDB_DATA tdb_key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_delete(ltdb->tdb, tdb_key);
+}
+
 /*
   delete a record from the database, not updating indexes (used for deleting
   index records)
@@ -624,6 +722,7 @@ int ltdb_delete_noindex(struct ldb_module *module,
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ldb_val ldb_key;
        TDB_DATA tdb_key;
        int ret;
        TALLOC_CTX *tdb_key_ctx = talloc_new(module);
@@ -642,11 +741,14 @@ int ltdb_delete_noindex(struct ldb_module *module,
                return LDB_ERR_OTHER;
        }
 
-       ret = tdb_delete(ltdb->tdb, tdb_key);
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
+
+       ret = ltdb->kv_ops->delete(ltdb, ldb_key);
        TALLOC_FREE(tdb_key_ctx);
 
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb->kv_ops->error(ltdb);
        }
 
        return ret;
@@ -886,7 +988,6 @@ static int msg_delete_element(struct ldb_module *module,
        return LDB_ERR_NO_SUCH_ATTRIBUTE;
 }
 
-
 /*
   modify a record - internal interface
 
@@ -1366,52 +1467,169 @@ static int ltdb_rename(struct ltdb_context *ctx)
        return ret;
 }
 
+static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_start(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_cancel(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_prepare_commit(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
+{
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       return tdb_transaction_commit(ltdb->tdb);
+}
+
 static int ltdb_start_trans(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(ltdb->module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
        /* Do not take out the transaction lock on a read-only DB */
        if (ltdb->read_only) {
                return LDB_ERR_UNWILLING_TO_PERFORM;
        }
 
-       if (tdb_transaction_start(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->begin_write(ltdb) != 0) {
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       ltdb->in_transaction++;
 
        ltdb_index_transaction_start(module);
 
+       ltdb->reindex_failed = false;
+
        return LDB_SUCCESS;
 }
 
+/*
+ * Forward declaration to allow prepare_commit to in fact abort the
+ * transaction
+ */
+static int ltdb_del_trans(struct ldb_module *module);
+
 static int ltdb_prepare_commit(struct ldb_module *module)
 {
        int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       pid_t pid = getpid();
+
+       if (ltdb->pid != pid) {
+               ldb_asprintf_errstring(
+                       ldb_module_get_ctx(module),
+                       __location__": Reusing ldb opend by pid %d in "
+                       "process %d\n",
+                       ltdb->pid,
+                       pid);
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
 
-       if (ltdb->in_transaction != 1) {
-               return LDB_SUCCESS;
+       if (!ltdb->kv_ops->transaction_active(ltdb)) {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "ltdb_prepare_commit() called "
+                                 "without transaction active");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * Check if the last re-index failed.
+        *
+        * This can happen if for example a duplicate value was marked
+        * unique.  We must not write a partial re-index into the DB.
+        */
+       if (ltdb->reindex_failed) {
+               /*
+                * We must instead abort the transaction so we get the
+                * old values and old index back
+                */
+               ltdb_del_trans(module);
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Failure during re-index, so "
+                                 "transaction must be aborted.");
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ret = ltdb_index_transaction_commit(module);
        if (ret != LDB_SUCCESS) {
-               tdb_transaction_cancel(ltdb->tdb);
-               ltdb->in_transaction--;
+               ltdb->kv_ops->abort_write(ltdb);
                return ret;
        }
 
-       if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
-               ltdb->in_transaction--;
+       if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ldb_debug_set(ldb_module_get_ctx(module),
                              LDB_DEBUG_FATAL,
                              "Failure during "
-                             "tdb_transaction_prepare_commit(): %s -> %s",
-                             tdb_errorstr(ltdb->tdb),
+                             "prepare_write): %s -> %s",
+                             ltdb->kv_ops->errorstr(ltdb),
                              ldb_strerror(ret));
                return ret;
        }
@@ -1434,14 +1652,13 @@ static int ltdb_end_trans(struct ldb_module *module)
                }
        }
 
-       ltdb->in_transaction--;
        ltdb->prepared_commit = false;
 
-       if (tdb_transaction_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ldb_asprintf_errstring(ldb_module_get_ctx(module),
                                       "Failure during tdb_transaction_commit(): %s -> %s",
-                                      tdb_errorstr(ltdb->tdb),
+                                      ltdb->kv_ops->errorstr(ltdb),
                                       ldb_strerror(ret));
                return ret;
        }
@@ -1454,14 +1671,13 @@ static int ltdb_del_trans(struct ldb_module *module)
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-       ltdb->in_transaction--;
 
        if (ltdb_index_transaction_cancel(module) != 0) {
-               tdb_transaction_cancel(ltdb->tdb);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ltdb->kv_ops->abort_write(ltdb);
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       tdb_transaction_cancel(ltdb->tdb);
+       ltdb->kv_ops->abort_write(ltdb);
        return LDB_SUCCESS;
 }
 
@@ -1474,6 +1690,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
        struct ldb_context *ldb;
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TALLOC_CTX *tmp_ctx = NULL;
        struct ldb_seqnum_request *seq;
        struct ldb_seqnum_result *res;
@@ -1492,7 +1710,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-       if (ltdb_lock_read(module) != 0) {
+       if (ltdb->kv_ops->lock_read(module) != 0) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1554,7 +1772,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 done:
        talloc_free(tmp_ctx);
-       ltdb_unlock_read(module);
+
+       ltdb->kv_ops->unlock_read(module);
        return ret;
 }
 
@@ -1651,6 +1870,181 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
        ltdb_request_extended_done(ctx, ext, ret);
 }
 
+struct kv_ctx {
+       ldb_kv_traverse_fn kv_traverse_fn;
+       void *ctx;
+       struct ltdb_private *ltdb;
+       int (*parser)(struct ldb_val key,
+                     struct ldb_val data,
+                     void *private_data);
+};
+
+static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+       return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .kv_traverse_fn = fn,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       if (tdb_transaction_active(ltdb->tdb)) {
+               return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       } else {
+               return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       }
+}
+
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
+                                     struct ldb_val ldb_key,
+                                     struct ldb_val ldb_key2,
+                                     struct ldb_val ldb_data, void *state)
+{
+       int tdb_ret;
+       struct ldb_context *ldb;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA key2 = {
+               .dptr = ldb_key2.data,
+               .dsize = ldb_key2.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+
+       ldb = ldb_module_get_ctx(module);
+
+       tdb_ret = tdb_delete(ltdb->tdb, key);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to delete %*.*s "
+                         "for rekey as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to rekey %*.*s as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       return tdb_ret;
+}
+
+static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
+                                        void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+
+       return kv_ctx->parser(key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
+                                struct ldb_val ldb_key,
+                                int (*parser)(struct ldb_val key,
+                                              struct ldb_val data,
+                                              void *private_data),
+                                void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .parser = parser,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       int ret;
+
+       if (tdb_transaction_active(ltdb->tdb) == false &&
+           ltdb->read_lock_count == 0) {
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+                              &kv_ctx);
+       if (ret == 0) {
+               return LDB_SUCCESS;
+       }
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
+{
+       return tdb_name(ltdb->tdb);
+}
+
+static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
+{
+       int seq = tdb_get_seqnum(ltdb->tdb);
+       bool has_changed = (seq != ltdb->tdb_seqnum);
+
+       ltdb->tdb_seqnum = seq;
+
+       return has_changed;
+}
+
+static bool ltdb_transaction_active(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_active(ltdb->tdb);
+}
+
+static const struct kv_db_ops key_value_ops = {
+       .store = ltdb_tdb_store,
+       .delete = ltdb_tdb_delete,
+       .iterate = ltdb_tdb_traverse_fn,
+       .update_in_iterate = ltdb_tdb_update_in_iterate,
+       .fetch_and_parse = ltdb_tdb_parse_record,
+       .lock_read = ltdb_lock_read,
+       .unlock_read = ltdb_unlock_read,
+       .begin_write = ltdb_tdb_transaction_start,
+       .prepare_write = ltdb_tdb_transaction_prepare_commit,
+       .finish_write = ltdb_tdb_transaction_commit,
+       .abort_write = ltdb_tdb_transaction_cancel,
+       .error = ltdb_error,
+       .errorstr = ltdb_errorstr,
+       .name = ltdb_tdb_name,
+       .has_changed = ltdb_tdb_changed,
+       .transaction_active = ltdb_transaction_active,
+};
+
 static void ltdb_callback(struct tevent_context *ev,
                          struct tevent_timer *te,
                          struct timeval t,
@@ -1799,6 +2193,21 @@ static int ltdb_init_rootdse(struct ldb_module *module)
        return LDB_SUCCESS;
 }
 
+
+static int generic_lock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->lock_read(module);
+}
+
+static int generic_unlock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->unlock_read(module);
+}
+
 static const struct ldb_module_ops ltdb_ops = {
        .name              = "tdb",
        .init_context      = ltdb_init_rootdse,
@@ -1812,18 +2221,90 @@ static const struct ldb_module_ops ltdb_ops = {
        .end_transaction   = ltdb_end_trans,
        .prepare_commit    = ltdb_prepare_commit,
        .del_transaction   = ltdb_del_trans,
-       .read_lock         = ltdb_lock_read,
-       .read_unlock       = ltdb_unlock_read,
+       .read_lock         = generic_lock_read,
+       .read_unlock       = generic_unlock_read,
 };
 
+int init_store(struct ltdb_private *ltdb,
+                     const char *name,
+                     struct ldb_context *ldb,
+                     const char *options[],
+                     struct ldb_module **_module)
+{
+       if (getenv("LDB_WARN_UNINDEXED")) {
+               ltdb->warn_unindexed = true;
+       }
+
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
+       ltdb->sequence_number = 0;
+
+       ltdb->pid = getpid();
+
+       ltdb->module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
+       if (!ltdb->module) {
+               ldb_oom(ldb);
+               talloc_free(ltdb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_module_set_private(ltdb->module, ltdb);
+       talloc_steal(ltdb->module, ltdb);
+
+       if (ltdb_cache_load(ltdb->module) != 0) {
+               ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
+                                      "records for backend '%s'", name);
+               talloc_free(ltdb->module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *_module = ltdb->module;
+       /*
+        * Set or override the maximum key length
+        *
+        * The ldb_mdb code will have set this to 511, but our tests
+        * set this even smaller (to make the tests more practical).
+        *
+        * This must only be used for the selftest as the length
+        * becomes encoded in the index keys.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "max_key_len_for_self_test");
+               if (len_str != NULL) {
+                       unsigned len = strtoul(len_str, NULL, 0);
+                       ltdb->max_key_length = len;
+               }
+       }
+
+       /*
+        * Override full DB scans
+        *
+        * A full DB scan is expensive on a large database.  This
+        * option is for testing to show that the full DB scan is not
+        * triggered.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "disable_full_db_scan_for_self_test");
+               if (len_str != NULL) {
+                       ltdb->disable_full_db_scan = true;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   connect to the database
 */
-static int ltdb_connect(struct ldb_context *ldb, const char *url,
-                       unsigned int flags, const char *options[],
-                       struct ldb_module **_module)
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+                unsigned int flags, const char *options[],
+                struct ldb_module **_module)
 {
-       struct ldb_module *module;
        const char *path;
        int tdb_flags, open_flags;
        struct ltdb_private *ltdb;
@@ -1832,7 +2313,6 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
         * We hold locks, so we must use a private event context
         * on each returned handle
         */
-
        ldb_set_require_private_event_context(ldb);
 
        /* parse the url */
@@ -1847,7 +2327,7 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                path = url;
        }
 
-       tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
+       tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
 
        /* check for the 'nosync' option */
        if (flags & LDB_FLG_NOSYNC) {
@@ -1889,6 +2369,9 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                open_flags = O_CREAT | O_RDWR;
        }
 
+       ltdb->kv_ops = &key_value_ops;
+
+       errno = 0;
        /* note that we use quite a large default hash size */
        ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
                                   tdb_flags, open_flags,
@@ -1905,38 +2388,5 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (getenv("LDB_WARN_UNINDEXED")) {
-               ltdb->warn_unindexed = true;
-       }
-
-       if (getenv("LDB_WARN_REINDEX")) {
-               ltdb->warn_reindex = true;
-       }
-
-       ltdb->sequence_number = 0;
-
-       module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
-       if (!module) {
-               ldb_oom(ldb);
-               talloc_free(ltdb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ldb_module_set_private(module, ltdb);
-       talloc_steal(module, ltdb);
-
-       if (ltdb_cache_load(module) != 0) {
-               ldb_asprintf_errstring(ldb,
-                                      "Unable to load ltdb cache records of tdb '%s'", path);
-               talloc_free(module);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       *_module = module;
-       return LDB_SUCCESS;
-}
-
-int ldb_tdb_init(const char *version)
-{
-       LDB_MODULE_CHECK_VERSION(version);
-       return ldb_register_backend("tdb", ltdb_connect, false);
+       return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }