ldb: make key/value backends expose if there is an active transaction
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index 2ac1967ee15bd45014786cc710acb93e1f3c5157..31b4a7d9abb4c3e1f2f9856a1aa45a3c910d0258 100644 (file)
@@ -94,26 +94,37 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
 /*
   lock the database for read - use by ltdb_search and ltdb_sequence_number
 */
-int ltdb_lock_read(struct ldb_module *module)
+static int ltdb_lock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       int ret = 0;
+       int tdb_ret = 0;
+       int ret;
 
        if (ltdb->in_transaction == 0 &&
            ltdb->read_lock_count == 0) {
-               ret = tdb_lockall_read(ltdb->tdb);
+               tdb_ret = tdb_lockall_read(ltdb->tdb);
        }
-       if (ret == 0) {
+       if (tdb_ret == 0) {
                ltdb->read_lock_count++;
+               return LDB_SUCCESS;
+       }
+       ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
        }
+       ldb_debug_set(ldb_module_get_ctx(module),
+                     LDB_DEBUG_FATAL,
+                     "Failure during ltdb_lock_read(): %s -> %s",
+                     tdb_errorstr(ltdb->tdb),
+                     ldb_strerror(ret));
        return ret;
 }
 
 /*
   unlock the database after a ltdb_lock_read()
 */
-int ltdb_unlock_read(struct ldb_module *module)
+static int ltdb_unlock_read(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
@@ -127,6 +138,36 @@ int ltdb_unlock_read(struct ldb_module *module)
 }
 
 
+/* 
+ * Determine if this key could hold a record.  We allow the new GUID
+ * index, the old DN index and a possible future ID=
+ */
+bool ltdb_key_is_record(TDB_DATA key)
+{
+       if (key.dsize < 4) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, "DN=", 3) == 0) {
+               return true;
+       }
+       
+       if (memcmp(key.dptr, "ID=", 3) == 0) {
+               return true;
+       }
+
+       if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
+               return false;
+       }
+
+       if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
+                  sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
+               return true;
+       }
+       
+       return false;
+}
+
 /*
   form a TDB_DATA for a record key
   caller frees
@@ -134,9 +175,9 @@ int ltdb_unlock_read(struct ldb_module *module)
   note that the key for a record can depend on whether the
   dn refers to a case sensitive index record or not
 */
-TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
+TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                    struct ldb_dn *dn)
 {
-       struct ldb_context *ldb = ldb_module_get_ctx(module);
        TDB_DATA key;
        char *key_str = NULL;
        const char *dn_folded = NULL;
@@ -158,7 +199,7 @@ TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
                goto failed;
        }
 
-       key_str = talloc_strdup(ldb, "DN=");
+       key_str = talloc_strdup(mem_ctx, "DN=");
        if (!key_str) {
                goto failed;
        }
@@ -180,6 +221,121 @@ failed:
        return key;
 }
 
+/* The caller is to provide a correctly sized key */
+int ltdb_guid_to_key(struct ldb_module *module,
+                    struct ltdb_private *ltdb,
+                    const struct ldb_val *GUID_val,
+                    TDB_DATA *key)
+{
+       const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
+       const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
+
+       if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
+       memcpy(&key->dptr[GUID_prefix_len],
+              GUID_val->data, GUID_val->length);
+       return LDB_SUCCESS;
+}
+
+/*
+ * The caller is to provide a correctly sized key, used only in
+ * the GUID index mode
+ */
+int ltdb_idx_to_key(struct ldb_module *module,
+                   struct ltdb_private *ltdb,
+                   TALLOC_CTX *mem_ctx,
+                   const struct ldb_val *idx_val,
+                   TDB_DATA *key)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_dn *dn;
+
+       if (ltdb->cache->GUID_index_attribute != NULL) {
+               return ltdb_guid_to_key(module, ltdb,
+                                       idx_val, key);
+       }
+
+       dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
+       if (dn == NULL) {
+               /*
+                * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
+                * to the caller, as this in an invalid index value
+                */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       /* form the key */
+       *key = ltdb_key_dn(module, mem_ctx, dn);
+       TALLOC_FREE(dn);
+       if (!key->dptr) {
+               return ldb_module_oom(module);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+  form a TDB_DATA for a record key
+  caller frees mem_ctx, which may or may not have the key
+  as a child.
+
+  note that the key for a record can depend on whether a
+  GUID index is in use, or the DN is used as the key
+*/
+TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                     const struct ldb_message *msg)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       TDB_DATA key;
+       const struct ldb_val *guid_val;
+       int ret;
+
+       if (ltdb->cache->GUID_index_attribute == NULL) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       if (ldb_dn_is_special(msg->dn)) {
+               return ltdb_key_dn(module, mem_ctx, msg->dn);
+       }
+
+       guid_val = ldb_msg_find_ldb_val(msg,
+                                      ltdb->cache->GUID_index_attribute);
+       if (guid_val == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Did not find GUID attribute %s "
+                                      "in %s, required for TDB record "
+                                      "key in " LTDB_IDXGUID " mode.",
+                                      ltdb->cache->GUID_index_attribute,
+                                      ldb_dn_get_linearized(msg->dn));
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+
+       /* In this case, allocate with talloc */
+       key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
+       if (key.dptr == NULL) {
+               errno = ENOMEM;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       key.dsize = talloc_get_size(key.dptr);
+
+       ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
+
+       if (ret != LDB_SUCCESS) {
+               errno = EINVAL;
+               key.dptr = NULL;
+               key.dsize = 0;
+               return key;
+       }
+       return key;
+}
+
 /*
   check special dn's have valid attributes
   currently only @ATTRIBUTES is checked
@@ -235,7 +391,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                if (ltdb->warn_reindex) {
                        ldb_debug(ldb_module_get_ctx(module),
                                LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
-                               tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+                               ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
                }
                ret = ltdb_reindex(module);
        }
@@ -254,9 +410,41 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
                ret = ltdb_cache_reload(module);
        }
 
+       if (ret != LDB_SUCCESS) {
+               ltdb->reindex_failed = true;
+       }
+
        return ret;
 }
 
+static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
+                         struct ldb_val ldb_data, int flags)
+{
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_store(ltdb->tdb, key, data, flags);
+}
+
+static int ltdb_error(struct ltdb_private *ltdb)
+{
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char *ltdb_errorstr(struct ltdb_private *ltdb)
+{
+       return tdb_errorstr(ltdb->tdb);
+}
+
 /*
   store a record into the db
 */
@@ -264,33 +452,55 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
+       TDB_DATA tdb_key;
+       struct ldb_val ldb_key;
        struct ldb_val ldb_data;
        int ret = LDB_SUCCESS;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
 
-       tdb_key = ltdb_key(module, msg->dn);
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (tdb_key.dptr == NULL) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
        ret = ldb_pack_data(ldb_module_get_ctx(module),
                            msg, &ldb_data);
        if (ret == -1) {
-               talloc_free(tdb_key.dptr);
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       tdb_data.dptr = ldb_data.data;
-       tdb_data.dsize = ldb_data.length;
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
 
-       ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
+       ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               bool is_special = ldb_dn_is_special(msg->dn);
+               ret = ltdb->kv_ops->error(ltdb);
+
+               /*
+                * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
+                * the GUID, so re-map
+                */
+               if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
+                   && !is_special
+                   && ltdb->cache->GUID_index_attribute != NULL) {
+                       ret = LDB_ERR_CONSTRAINT_VIOLATION;
+               }
                goto done;
        }
 
 done:
-       talloc_free(tdb_key.dptr);
+       TALLOC_FREE(tdb_key_ctx);
        talloc_free(ldb_data.data);
 
        return ret;
@@ -326,6 +536,7 @@ static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
 }
 
 static int ltdb_add_internal(struct ldb_module *module,
+                            struct ltdb_private *ltdb,
                             const struct ldb_message *msg,
                             bool check_single_value)
 {
@@ -382,6 +593,24 @@ static int ltdb_add_internal(struct ldb_module *module,
 
        ret = ltdb_store(module, msg, TDB_INSERT);
        if (ret != LDB_SUCCESS) {
+               /*
+                * Try really hard to get the right error code for
+                * a re-add situation, as this can matter!
+                */
+               if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
+                       int ret2;
+                       struct ldb_dn *dn2 = NULL;
+                       TALLOC_CTX *mem_ctx = talloc_new(module);
+                       if (mem_ctx == NULL) {
+                               return ldb_module_operr(module);
+                       }
+                       ret2 = ltdb_search_base(module, module,
+                                               msg->dn, &dn2);
+                       TALLOC_FREE(mem_ctx);
+                       if (ret2 == LDB_SUCCESS) {
+                               ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       }
+               }
                if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
                        ldb_asprintf_errstring(ldb,
                                               "Entry %s already exists",
@@ -390,8 +619,19 @@ static int ltdb_add_internal(struct ldb_module *module,
                return ret;
        }
 
-       ret = ltdb_index_add_new(module, msg);
+       ret = ltdb_index_add_new(module, ltdb, msg);
        if (ret != LDB_SUCCESS) {
+               /*
+                * If we failed to index, delete the message again.
+                *
+                * This is particularly important for the GUID index
+                * case, which will only fail for a duplicate DN
+                * in the index add.
+                *
+                * Note that the caller may not cancel the transation
+                * and this means the above add might really show up!
+                */
+               ltdb_delete_noindex(module, msg);
                return ret;
        }
 
@@ -407,8 +647,20 @@ static int ltdb_add(struct ltdb_context *ctx)
 {
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        int ret = LDB_SUCCESS;
 
+       if (ltdb->max_key_length != 0 &&
+           ltdb->cache->GUID_index_attribute == NULL &&
+           !ldb_dn_is_special(req->op.add.message->dn))
+       {
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Must operate ldb_mdb in GUID "
+                                 "index mode, but " LTDB_IDXGUID " not set.");
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
        ret = ltdb_check_special_dn(module, req->op.add.message);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -420,32 +672,61 @@ static int ltdb_add(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_add_internal(module, req->op.add.message, true);
+       ret = ltdb_add_internal(module, ltdb,
+                               req->op.add.message, true);
 
        return ret;
 }
 
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
+{
+       TDB_DATA tdb_key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       bool transaction_active = tdb_transaction_active(ltdb->tdb);
+       if (transaction_active == false){
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+       return tdb_delete(ltdb->tdb, tdb_key);
+}
+
 /*
   delete a record from the database, not updating indexes (used for deleting
   index records)
 */
-int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
+int ltdb_delete_noindex(struct ldb_module *module,
+                       const struct ldb_message *msg)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ldb_val ldb_key;
        TDB_DATA tdb_key;
        int ret;
+       TALLOC_CTX *tdb_key_ctx = talloc_new(module);
 
-       tdb_key = ltdb_key(module, dn);
+       if (tdb_key_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
        if (!tdb_key.dptr) {
+               TALLOC_FREE(tdb_key_ctx);
                return LDB_ERR_OTHER;
        }
 
-       ret = tdb_delete(ltdb->tdb, tdb_key);
-       talloc_free(tdb_key.dptr);
+       ldb_key.data = tdb_key.dptr;
+       ldb_key.length = tdb_key.dsize;
+
+       ret = ltdb->kv_ops->delete(ltdb, ldb_key);
+       TALLOC_FREE(tdb_key_ctx);
 
        if (ret != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb->kv_ops->error(ltdb);
        }
 
        return ret;
@@ -469,7 +750,7 @@ static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
                goto done;
        }
 
-       ret = ltdb_delete_noindex(module, dn);
+       ret = ltdb_delete_noindex(module, msg);
        if (ret != LDB_SUCCESS) {
                goto done;
        }
@@ -580,11 +861,23 @@ static int ltdb_msg_add_element(struct ldb_message *msg,
   delete all elements having a specified attribute name
 */
 static int msg_delete_attribute(struct ldb_module *module,
+                               struct ltdb_private *ltdb,
                                struct ldb_message *msg, const char *name)
 {
        unsigned int i;
        int ret;
        struct ldb_message_element *el;
+       bool is_special = ldb_dn_is_special(msg->dn);
+
+       if (!is_special
+           && ltdb->cache->GUID_index_attribute != NULL
+           && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "Must not modify GUID "
+                                      "attribute %s (used as DB index)",
+                                      ltdb->cache->GUID_index_attribute);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
 
        el = ldb_msg_find_element(msg, name);
        if (el == NULL) {
@@ -592,7 +885,7 @@ static int msg_delete_attribute(struct ldb_module *module,
        }
        i = el - msg->elements;
 
-       ret = ltdb_index_del_element(module, msg->dn, el);
+       ret = ltdb_index_del_element(module, ltdb, msg, el);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
@@ -614,6 +907,7 @@ static int msg_delete_attribute(struct ldb_module *module,
   return LDB Error on failure
 */
 static int msg_delete_element(struct ldb_module *module,
+                             struct ltdb_private *ltdb,
                              struct ldb_message *msg,
                              const char *name,
                              const struct ldb_val *val)
@@ -646,10 +940,11 @@ static int msg_delete_element(struct ldb_module *module,
                }
                if (matched) {
                        if (el->num_values == 1) {
-                               return msg_delete_attribute(module, msg, name);
+                               return msg_delete_attribute(module,
+                                                           ltdb, msg, name);
                        }
 
-                       ret = ltdb_index_del_value(module, msg->dn, el, i);
+                       ret = ltdb_index_del_value(module, ltdb, msg, el, i);
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
@@ -671,7 +966,6 @@ static int msg_delete_element(struct ldb_module *module,
        return LDB_ERR_NO_SUCH_ATTRIBUTE;
 }
 
-
 /*
   modify a record - internal interface
 
@@ -688,50 +982,34 @@ int ltdb_modify_internal(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       TDB_DATA tdb_key, tdb_data;
-       struct ldb_val ldb_data;
        struct ldb_message *msg2;
        unsigned int i, j;
        int ret = LDB_SUCCESS, idx;
        struct ldb_control *control_permissive = NULL;
+       TALLOC_CTX *mem_ctx = talloc_new(req);
 
+       if (mem_ctx == NULL) {
+               return ldb_module_oom(module);
+       }
+       
        if (req) {
                control_permissive = ldb_request_get_control(req,
                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
        }
 
-       tdb_key = ltdb_key(module, msg->dn);
-       if (!tdb_key.dptr) {
-               return LDB_ERR_OTHER;
-       }
-
-       tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
-       if (!tdb_data.dptr) {
-               talloc_free(tdb_key.dptr);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
-       }
-
-       msg2 = ldb_msg_new(tdb_key.dptr);
+       msg2 = ldb_msg_new(mem_ctx);
        if (msg2 == NULL) {
-               free(tdb_data.dptr);
                ret = LDB_ERR_OTHER;
                goto done;
        }
 
-       ldb_data.data = tdb_data.dptr;
-       ldb_data.length = tdb_data.dsize;
-
-       ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
-       free(tdb_data.dptr);
-       if (ret == -1) {
-               ret = LDB_ERR_OTHER;
+       ret = ltdb_search_dn1(module, msg->dn,
+                             msg2,
+                             LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
+       if (ret != LDB_SUCCESS) {
                goto done;
        }
 
-       if (!msg2->dn) {
-               msg2->dn = msg->dn;
-       }
-
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i], *el2;
                struct ldb_val *vals;
@@ -789,7 +1067,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                        ret = LDB_ERR_OTHER;
                                        goto done;
                                }
-                               ret = ltdb_index_add_element(module, msg2->dn,
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2,
                                                             el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
@@ -870,7 +1149,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                el2->values = vals;
                                el2->num_values += el->num_values;
 
-                               ret = ltdb_index_add_element(module, msg2->dn, el);
+                               ret = ltdb_index_add_element(module, ltdb,
+                                                            msg2, el);
                                if (ret != LDB_SUCCESS) {
                                        goto done;
                                }
@@ -934,7 +1214,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                }
 
                                /* Delete the attribute if it exists in the DB */
-                               if (msg_delete_attribute(module, msg2,
+                               if (msg_delete_attribute(module, ltdb,
+                                                        msg2,
                                                         el->name) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
@@ -947,7 +1228,8 @@ int ltdb_modify_internal(struct ldb_module *module,
                                goto done;
                        }
 
-                       ret = ltdb_index_add_element(module, msg2->dn, el);
+                       ret = ltdb_index_add_element(module, ltdb,
+                                                    msg2, el);
                        if (ret != LDB_SUCCESS) {
                                goto done;
                        }
@@ -963,7 +1245,9 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                        if (msg->elements[i].num_values == 0) {
                                /* Delete the whole attribute */
-                               ret = msg_delete_attribute(module, msg2,
+                               ret = msg_delete_attribute(module,
+                                                          ltdb,
+                                                          msg2,
                                                           msg->elements[i].name);
                                if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                    control_permissive) {
@@ -980,6 +1264,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                                /* Delete specified values from an attribute */
                                for (j=0; j < msg->elements[i].num_values; j++) {
                                        ret = msg_delete_element(module,
+                                                                ltdb,
                                                                 msg2,
                                                                 msg->elements[i].name,
                                                                 &msg->elements[i].values[j]);
@@ -1018,7 +1303,7 @@ int ltdb_modify_internal(struct ldb_module *module,
        }
 
 done:
-       talloc_free(tdb_key.dptr);
+       TALLOC_FREE(mem_ctx);
        return ret;
 }
 
@@ -1059,6 +1344,7 @@ static int ltdb_rename(struct ltdb_context *ctx)
        struct ldb_message *msg;
        int ret = LDB_SUCCESS;
        TDB_DATA tdb_key, tdb_key_old;
+       struct ldb_dn *db_dn;
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
@@ -1081,33 +1367,54 @@ static int ltdb_rename(struct ltdb_context *ctx)
 
        /* We need to, before changing the DB, check if the new DN
         * exists, so we can return this error to the caller with an
-        * unmodified DB */
-       tdb_key = ltdb_key(module, req->op.rename.newdn);
+        * unmodified DB
+        *
+        * Even in GUID index mode we use ltdb_key_dn() as we are
+        * trying to figure out if this is just a case rename
+        */
+       tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
        if (!tdb_key.dptr) {
                talloc_free(msg);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       tdb_key_old = ltdb_key(module, req->op.rename.olddn);
+       tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
        if (!tdb_key_old.dptr) {
                talloc_free(msg);
                talloc_free(tdb_key.dptr);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
-       if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
-               if (tdb_exists(ltdb->tdb, tdb_key)) {
-                       talloc_free(tdb_key_old.dptr);
-                       talloc_free(tdb_key.dptr);
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module),
-                                              "Entry %s already exists",
-                                              ldb_dn_get_linearized(req->op.rename.newdn));
-                       /* finding the new record already in the DB is an error */
-                       talloc_free(msg);
-                       return LDB_ERR_ENTRY_ALREADY_EXISTS;
+       /*
+        * Only declare a conflict if the new DN already exists,
+        * and it isn't a case change on the old DN
+        */
+       if (tdb_key_old.dsize != tdb_key.dsize
+           || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
+               ret = ltdb_search_base(module, msg,
+                                      req->op.rename.newdn,
+                                      &db_dn);
+               if (ret == LDB_SUCCESS) {
+                       ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
+               } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
                }
        }
+
+       /* finding the new record already in the DB is an error */
+
+       if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Entry %s already exists",
+                                      ldb_dn_get_linearized(req->op.rename.newdn));
+       }
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tdb_key_old.dptr);
+               talloc_free(tdb_key.dptr);
+               talloc_free(msg);
+               return ret;
+       }
+
        talloc_free(tdb_key_old.dptr);
        talloc_free(tdb_key.dptr);
 
@@ -1131,29 +1438,62 @@ static int ltdb_rename(struct ltdb_context *ctx)
         * deleted attributes. We could go through all elements but that's
         * maybe not the most efficient way
         */
-       ret = ltdb_add_internal(module, msg, false);
+       ret = ltdb_add_internal(module, ltdb, msg, false);
 
        talloc_free(msg);
 
        return ret;
 }
 
+static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_start(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_cancel(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_prepare_commit(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_commit(ltdb->tdb);
+}
+
 static int ltdb_start_trans(struct ldb_module *module)
 {
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-       if (tdb_transaction_start(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+       /* Do not take out the transaction lock on a read-only DB */
+       if (ltdb->read_only) {
+               return LDB_ERR_UNWILLING_TO_PERFORM;
+       }
+
+       if (ltdb->kv_ops->begin_write(ltdb) != 0) {
+               return ltdb->kv_ops->error(ltdb);
        }
 
        ltdb->in_transaction++;
 
        ltdb_index_transaction_start(module);
 
+       ltdb->reindex_failed = false;
+
        return LDB_SUCCESS;
 }
 
+/*
+ * Forward declaration to allow prepare_commit to in fact abort the
+ * transaction
+ */
+static int ltdb_del_trans(struct ldb_module *module);
+
 static int ltdb_prepare_commit(struct ldb_module *module)
 {
        int ret;
@@ -1164,20 +1504,40 @@ static int ltdb_prepare_commit(struct ldb_module *module)
                return LDB_SUCCESS;
        }
 
+       /*
+        * Check if the last re-index failed.
+        *
+        * This can happen if for example a duplicate value was marked
+        * unique.  We must not write a partial re-index into the DB.
+        */
+       if (ltdb->reindex_failed) {
+               /*
+                * We must instead abort the transaction so we get the
+                * old values and old index back
+                */
+               ltdb_del_trans(module);
+               ldb_set_errstring(ldb_module_get_ctx(module),
+                                 "Failure during re-index, so "
+                                 "transaction must be aborted.");
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        ret = ltdb_index_transaction_commit(module);
        if (ret != LDB_SUCCESS) {
-               tdb_transaction_cancel(ltdb->tdb);
+               ltdb->kv_ops->abort_write(ltdb);
                ltdb->in_transaction--;
                return ret;
        }
 
-       if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ltdb->in_transaction--;
-               ldb_asprintf_errstring(ldb_module_get_ctx(module),
-                                      "Failure during tdb_transaction_prepare_commit(): %s -> %s",
-                                      tdb_errorstr(ltdb->tdb),
-                                      ldb_strerror(ret));
+               ldb_debug_set(ldb_module_get_ctx(module),
+                             LDB_DEBUG_FATAL,
+                             "Failure during "
+                             "prepare_write): %s -> %s",
+                             ltdb->kv_ops->errorstr(ltdb),
+                             ldb_strerror(ret));
                return ret;
        }
 
@@ -1202,11 +1562,11 @@ static int ltdb_end_trans(struct ldb_module *module)
        ltdb->in_transaction--;
        ltdb->prepared_commit = false;
 
-       if (tdb_transaction_commit(ltdb->tdb) != 0) {
-               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+       if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+               ret = ltdb->kv_ops->error(ltdb);
                ldb_asprintf_errstring(ldb_module_get_ctx(module),
                                       "Failure during tdb_transaction_commit(): %s -> %s",
-                                      tdb_errorstr(ltdb->tdb),
+                                      ltdb->kv_ops->errorstr(ltdb),
                                       ldb_strerror(ret));
                return ret;
        }
@@ -1222,11 +1582,11 @@ static int ltdb_del_trans(struct ldb_module *module)
        ltdb->in_transaction--;
 
        if (ltdb_index_transaction_cancel(module) != 0) {
-               tdb_transaction_cancel(ltdb->tdb);
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ltdb->kv_ops->abort_write(ltdb);
+               return ltdb->kv_ops->error(ltdb);
        }
 
-       tdb_transaction_cancel(ltdb->tdb);
+       ltdb->kv_ops->abort_write(ltdb);
        return LDB_SUCCESS;
 }
 
@@ -1239,6 +1599,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
        struct ldb_context *ldb;
        struct ldb_module *module = ctx->module;
        struct ldb_request *req = ctx->req;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TALLOC_CTX *tmp_ctx = NULL;
        struct ldb_seqnum_request *seq;
        struct ldb_seqnum_result *res;
@@ -1257,7 +1619,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-       if (ltdb_lock_read(module) != 0) {
+       if (ltdb->kv_ops->lock_read(module) != 0) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1319,7 +1681,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 done:
        talloc_free(tmp_ctx);
-       ltdb_unlock_read(module);
+
+       ltdb->kv_ops->unlock_read(module);
        return ret;
 }
 
@@ -1416,6 +1779,176 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
        ltdb_request_extended_done(ctx, ext, ret);
 }
 
+struct kv_ctx {
+       ldb_kv_traverse_fn kv_traverse_fn;
+       void *ctx;
+       struct ltdb_private *ltdb;
+       int (*parser)(struct ldb_val key,
+                     struct ldb_val data,
+                     void *private_data);
+};
+
+static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+       return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .kv_traverse_fn = fn,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       if (ltdb->in_transaction != 0) {
+               return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       } else {
+               return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+       }
+}
+
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
+                                     struct ldb_val ldb_key,
+                                     struct ldb_val ldb_key2,
+                                     struct ldb_val ldb_data, void *state)
+{
+       int tdb_ret;
+       struct ldb_context *ldb;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       TDB_DATA key2 = {
+               .dptr = ldb_key2.data,
+               .dsize = ldb_key2.length
+       };
+       TDB_DATA data = {
+               .dptr = ldb_data.data,
+               .dsize = ldb_data.length
+       };
+
+       ldb = ldb_module_get_ctx(module);
+
+       tdb_ret = tdb_delete(ltdb->tdb, key);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to delete %*.*s "
+                         "for rekey as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
+       if (tdb_ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "Failed to rekey %*.*s as %*.*s: %s",
+                         (int)key.dsize, (int)key.dsize,
+                         (const char *)key.dptr,
+                         (int)key2.dsize, (int)key2.dsize,
+                         (const char *)key.dptr,
+                         tdb_errorstr(ltdb->tdb));
+               ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+               return -1;
+       }
+       return tdb_ret;
+}
+
+static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
+                                        void *ctx)
+{
+       struct kv_ctx *kv_ctx = ctx;
+       struct ldb_val key = {
+               .length = tdb_key.dsize,
+               .data = tdb_key.dptr,
+       };
+       struct ldb_val data = {
+               .length = tdb_data.dsize,
+               .data = tdb_data.dptr,
+       };
+
+       return kv_ctx->parser(key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
+                                struct ldb_val ldb_key,
+                                int (*parser)(struct ldb_val key,
+                                              struct ldb_val data,
+                                              void *private_data),
+                                void *ctx)
+{
+       struct kv_ctx kv_ctx = {
+               .parser = parser,
+               .ctx = ctx,
+               .ltdb = ltdb
+       };
+       TDB_DATA key = {
+               .dptr = ldb_key.data,
+               .dsize = ldb_key.length
+       };
+       int ret;
+
+       ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+                              &kv_ctx);
+       if (ret == 0) {
+               return LDB_SUCCESS;
+       }
+       return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
+static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
+{
+       return tdb_name(ltdb->tdb);
+}
+
+static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
+{
+       int seq = tdb_get_seqnum(ltdb->tdb);
+       bool has_changed = (seq != ltdb->tdb_seqnum);
+
+       ltdb->tdb_seqnum = seq;
+
+       return has_changed;
+}
+
+static bool ltdb_transaction_active(struct ltdb_private *ltdb)
+{
+       return tdb_transaction_active(ltdb->tdb);
+}
+
+static const struct kv_db_ops key_value_ops = {
+       .store = ltdb_tdb_store,
+       .delete = ltdb_tdb_delete,
+       .iterate = ltdb_tdb_traverse_fn,
+       .update_in_iterate = ltdb_tdb_update_in_iterate,
+       .fetch_and_parse = ltdb_tdb_parse_record,
+       .lock_read = ltdb_lock_read,
+       .unlock_read = ltdb_unlock_read,
+       .begin_write = ltdb_tdb_transaction_start,
+       .prepare_write = ltdb_tdb_transaction_prepare_commit,
+       .finish_write = ltdb_tdb_transaction_commit,
+       .abort_write = ltdb_tdb_transaction_cancel,
+       .error = ltdb_error,
+       .errorstr = ltdb_errorstr,
+       .name = ltdb_tdb_name,
+       .has_changed = ltdb_tdb_changed,
+       .transaction_active = ltdb_transaction_active,
+};
+
 static void ltdb_callback(struct tevent_context *ev,
                          struct tevent_timer *te,
                          struct timeval t,
@@ -1564,6 +2097,21 @@ static int ltdb_init_rootdse(struct ldb_module *module)
        return LDB_SUCCESS;
 }
 
+
+static int generic_lock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->lock_read(module);
+}
+
+static int generic_unlock_read(struct ldb_module *module)
+{
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       return ltdb->kv_ops->unlock_read(module);
+}
+
 static const struct ldb_module_ops ltdb_ops = {
        .name              = "tdb",
        .init_context      = ltdb_init_rootdse,
@@ -1577,18 +2125,74 @@ static const struct ldb_module_ops ltdb_ops = {
        .end_transaction   = ltdb_end_trans,
        .prepare_commit    = ltdb_prepare_commit,
        .del_transaction   = ltdb_del_trans,
-       .read_lock         = ltdb_lock_read,
-       .read_unlock       = ltdb_unlock_read,
+       .read_lock         = generic_lock_read,
+       .read_unlock       = generic_unlock_read,
 };
 
+int init_store(struct ltdb_private *ltdb,
+                     const char *name,
+                     struct ldb_context *ldb,
+                     const char *options[],
+                     struct ldb_module **_module)
+{
+       struct ldb_module *module;
+
+       if (getenv("LDB_WARN_UNINDEXED")) {
+               ltdb->warn_unindexed = true;
+       }
+
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
+       ltdb->sequence_number = 0;
+
+       module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
+       if (!module) {
+               ldb_oom(ldb);
+               talloc_free(ltdb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_module_set_private(module, ltdb);
+       talloc_steal(module, ltdb);
+
+       if (ltdb_cache_load(module) != 0) {
+               ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
+                                      "records for backend '%s'", name);
+               talloc_free(module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *_module = module;
+       /*
+        * Set or override the maximum key length
+        *
+        * The ldb_mdb code will have set this to 511, but our tests
+        * set this even smaller (to make the tests more practical).
+        *
+        * This must only be used for the selftest as the length
+        * becomes encoded in the index keys.
+        */
+       {
+               const char *len_str =
+                       ldb_options_find(ldb, options,
+                                        "max_key_len_for_self_test");
+               if (len_str != NULL) {
+                       unsigned len = strtoul(len_str, NULL, 0);
+                       ltdb->max_key_length = len;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   connect to the database
 */
-static int ltdb_connect(struct ldb_context *ldb, const char *url,
-                       unsigned int flags, const char *options[],
-                       struct ldb_module **_module)
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+                unsigned int flags, const char *options[],
+                struct ldb_module **_module)
 {
-       struct ldb_module *module;
        const char *path;
        int tdb_flags, open_flags;
        struct ltdb_private *ltdb;
@@ -1597,7 +2201,6 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
         * We hold locks, so we must use a private event context
         * on each returned handle
         */
-
        ldb_set_require_private_event_context(ldb);
 
        /* parse the url */
@@ -1624,19 +2227,37 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                tdb_flags |= TDB_NOMMAP;
        }
 
+       ltdb = talloc_zero(ldb, struct ltdb_private);
+       if (!ltdb) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        if (flags & LDB_FLG_RDONLY) {
-               open_flags = O_RDONLY;
+               /*
+                * This is weird, but because we can only have one tdb
+                * in this process, and the other one could be
+                * read-write, we can't use the tdb readonly.  Plus a
+                * read only tdb prohibits the all-record lock.
+                */
+               open_flags = O_RDWR;
+
+               ltdb->read_only = true;
+
        } else if (flags & LDB_FLG_DONT_CREATE_DB) {
+               /*
+                * This is used by ldbsearch to prevent creation of the database
+                * if the name is wrong
+                */
                open_flags = O_RDWR;
        } else {
+               /*
+                * This is the normal case
+                */
                open_flags = O_CREAT | O_RDWR;
        }
 
-       ltdb = talloc_zero(ldb, struct ltdb_private);
-       if (!ltdb) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       ltdb->kv_ops = &key_value_ops;
 
        /* note that we use quite a large default hash size */
        ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
@@ -1654,38 +2275,5 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (getenv("LDB_WARN_UNINDEXED")) {
-               ltdb->warn_unindexed = true;
-       }
-
-       if (getenv("LDB_WARN_REINDEX")) {
-               ltdb->warn_reindex = true;
-       }
-
-       ltdb->sequence_number = 0;
-
-       module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
-       if (!module) {
-               ldb_oom(ldb);
-               talloc_free(ltdb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ldb_module_set_private(module, ltdb);
-       talloc_steal(module, ltdb);
-
-       if (ltdb_cache_load(module) != 0) {
-               ldb_asprintf_errstring(ldb,
-                                      "Unable to load ltdb cache records of tdb '%s'", path);
-               talloc_free(module);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       *_module = module;
-       return LDB_SUCCESS;
-}
-
-int ldb_tdb_init(const char *version)
-{
-       LDB_MODULE_CHECK_VERSION(version);
-       return ldb_register_backend("tdb", ltdb_connect, false);
+       return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }