ldb: Move where we update the pack format version
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
index a2eb570917d6fb37eb1123a32b3e8f70bdaffc0d..eb84a790e004d89f1020ff34e0034de53a9a1764 100644 (file)
@@ -148,6 +148,7 @@ ldb_schema_set_override_GUID_index() must be called.
 #include "../ldb_tdb/ldb_tdb.h"
 #include "ldb_private.h"
 #include "lib/util/binsearch.h"
+#include "lib/util/attr.h"
 
 struct dn_list {
        unsigned int count;
@@ -162,6 +163,11 @@ struct dn_list {
 };
 
 struct ldb_kv_idxptr {
+       /*
+        * In memory tdb to cache the index updates performed during a
+        * transaction.  This improves the performance of operations like
+        * re-index and join
+        */
        struct tdb_context *itdb;
        int error;
 };
@@ -324,8 +330,7 @@ static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
   alignment
  */
 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
-                                          TDB_DATA rec,
-                                          bool check_parent)
+                                          TDB_DATA rec)
 {
        struct dn_list *list;
        if (rec.dsize != sizeof(void *)) {
@@ -344,15 +349,14 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
                                       talloc_get_name(list));
                return NULL;
        }
-       if (check_parent && list->dn && talloc_parent(list->dn) != list) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module),
-                                      "Bad parent '%s' for idxptr",
-                                      talloc_get_name(talloc_parent(list->dn)));
-               return NULL;
-       }
        return list;
 }
 
+enum dn_list_will_be_read_only {
+       DN_LIST_MUTABLE = 0,
+       DN_LIST_WILL_BE_READ_ONLY = 1,
+};
+
 /*
   return the @IDX list in an index entry for a dn as a
   struct dn_list
@@ -360,42 +364,122 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
 static int ldb_kv_dn_list_load(struct ldb_module *module,
                               struct ldb_kv_private *ldb_kv,
                               struct ldb_dn *dn,
-                              struct dn_list *list)
+                              struct dn_list *list,
+                              enum dn_list_will_be_read_only read_only)
 {
        struct ldb_message *msg;
        int ret, version;
        struct ldb_message_element *el;
-       TDB_DATA rec;
+       TDB_DATA rec = {0};
        struct dn_list *list2;
-       TDB_DATA key;
+       bool from_primary_cache = false;
+       TDB_DATA key = {0};
 
        list->dn = NULL;
        list->count = 0;
+       list->strict = false;
 
-       /* see if we have any in-memory index entries */
-       if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
+       /*
+        * See if we have an in memory index cache
+        */
+       if (ldb_kv->idxptr == NULL) {
                goto normal_index;
        }
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * Have we cached this index record?
+        * If we have a nested transaction cache try that first.
+        * then try the transaction cache.
+        * if the record is not cached it will need to be read from disk.
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+       }
+       if (rec.dptr == NULL) {
+               from_primary_cache = true;
+               rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       }
        if (rec.dptr == NULL) {
                goto normal_index;
        }
 
        /* we've found an in-memory index entry */
-       list2 = ldb_kv_index_idxptr(module, rec, true);
+       list2 = ldb_kv_index_idxptr(module, rec);
        if (list2 == NULL) {
                free(rec.dptr);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        free(rec.dptr);
 
-       *list = *list2;
+       /*
+        * If this is a read only transaction the indexes will not be
+        * changed so we don't need a copy in the event of a rollback
+        *
+        * In this case make an early return
+        */
+       if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * record was read from the sub transaction cache, so we have
+        * already copied the primary cache record
+        */
+       if (!from_primary_cache) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * No index sub transaction active, so no need to cache a copy
+        */
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * There is an active index sub transaction, and the record was
+        * found in the primary index transaction cache.  A copy of the
+        * record needs be taken to prevent the original entry being
+        * altered, until the index sub transaction is committed.
+        */
+
+       {
+               struct ldb_val *dns = NULL;
+               size_t x = 0;
+
+               dns = talloc_array(
+                       list,
+                       struct ldb_val,
+                       list2->count);
+               if (dns == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               for (x = 0; x < list2->count; x++) {
+                       dns[x].length = list2->dn[x].length;
+                       dns[x].data = talloc_memdup(
+                               dns,
+                               list2->dn[x].data,
+                               list2->dn[x].length);
+                       if (dns[x].data == NULL) {
+                               TALLOC_FREE(dns);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+               }
+               list->dn = dns;
+               list->count = list2->count;
+       }
        return LDB_SUCCESS;
 
+       /*
+        * Index record not found in the caches, read it from the
+        * database.
+        */
 normal_index:
        msg = ldb_msg_new(list);
        if (msg == NULL) {
@@ -540,7 +624,7 @@ int ldb_kv_key_dn_from_idx(struct ldb_module *module,
                 * DN key has been truncated, need to inspect the actual
                 * records to locate the actual DN
                 */
-               int i;
+               unsigned int i;
                index = -1;
                for (i=0; i < list->count; i++) {
                        uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
@@ -643,7 +727,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        ret = LDB_SUCCESS;
                }
-               talloc_free(msg);
+               TALLOC_FREE(msg);
                return ret;
        }
 
@@ -651,14 +735,14 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
                ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
                                      LDB_KV_INDEXING_VERSION);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(msg);
+                       TALLOC_FREE(msg);
                        return ldb_module_oom(module);
                }
        } else {
                ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
                                      LDB_KV_GUID_INDEXING_VERSION);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(msg);
+                       TALLOC_FREE(msg);
                        return ldb_module_oom(module);
                }
        }
@@ -668,7 +752,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
 
                ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(msg);
+                       TALLOC_FREE(msg);
                        return ldb_module_oom(module);
                }
 
@@ -681,7 +765,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
                        el->values = talloc_array(msg,
                                                  struct ldb_val, 1);
                        if (el->values == NULL) {
-                               talloc_free(msg);
+                               TALLOC_FREE(msg);
                                return ldb_module_oom(module);
                        }
 
@@ -689,7 +773,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
                                                   list->count,
                                                   LDB_KV_GUID_SIZE);
                        if (v.data == NULL) {
-                               talloc_free(msg);
+                               TALLOC_FREE(msg);
                                return ldb_module_oom(module);
                        }
 
@@ -698,7 +782,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
                        for (i = 0; i < list->count; i++) {
                                if (list->dn[i].length !=
                                    LDB_KV_GUID_SIZE) {
-                                       talloc_free(msg);
+                                       TALLOC_FREE(msg);
                                        return ldb_module_operr(module);
                                }
                                memcpy(&v.data[LDB_KV_GUID_SIZE*i],
@@ -711,7 +795,7 @@ static int ldb_kv_dn_list_store_full(struct ldb_module *module,
        }
 
        ret = ldb_kv_store(module, msg, TDB_REPLACE);
-       talloc_free(msg);
+       TALLOC_FREE(msg);
        return ret;
 }
 
@@ -724,13 +808,12 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 {
        struct ldb_kv_private *ldb_kv = talloc_get_type(
            ldb_module_get_private(module), struct ldb_kv_private);
-       TDB_DATA rec, key;
-       int ret;
-       struct dn_list *list2;
+       TDB_DATA rec = {0};
+       TDB_DATA key = {0};
 
-       if (ldb_kv->idxptr == NULL) {
-               return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
-       }
+       int ret = LDB_SUCCESS;
+       struct dn_list *list2 = NULL;
+       struct ldb_kv_idxptr *idxptr = NULL;
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        if (key.dptr == NULL) {
@@ -738,20 +821,43 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        }
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * If there is an index sub transaction active, update the
+        * sub transaction index cache.  Otherwise update the
+        * primary index cache
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               idxptr = ldb_kv->nested_idx_ptr;
+       } else {
+               idxptr = ldb_kv->idxptr;
+       }
+       /*
+        * Get the cache entry for the index
+        *
+        * As the value in the cache is a pointer to a dn_list we update
+        * the dn_list directly.
+        *
+        */
+       rec = tdb_fetch(idxptr->itdb, key);
        if (rec.dptr != NULL) {
-               list2 = ldb_kv_index_idxptr(module, rec, false);
+               list2 = ldb_kv_index_idxptr(module, rec);
                if (list2 == NULL) {
                        free(rec.dptr);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
                free(rec.dptr);
-               list2->dn = talloc_steal(list2, list->dn);
-               list2->count = list->count;
+               /* Now put the updated pointer back in the cache */
+               if (list->dn == NULL) {
+                       list2->dn = NULL;
+                       list2->count = 0;
+               } else {
+                       list2->dn = talloc_steal(list2, list->dn);
+                       list2->count = list->count;
+               }
                return LDB_SUCCESS;
        }
 
-       list2 = talloc(ldb_kv->idxptr, struct dn_list);
+       list2 = talloc(idxptr, struct dn_list);
        if (list2 == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -765,10 +871,13 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        /*
         * This is not a store into the main DB, but into an in-memory
         * TDB, so we don't need a guard on ltdb->read_only
+        *
+        * Also as we directly update the in memory dn_list for existing
+        * cache entries we must be adding a new entry to the cache.
         */
-       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
        if (ret != 0) {
-               return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+               return ltdb_err_map( tdb_error(idxptr->itdb));
        }
        return LDB_SUCCESS;
 }
@@ -776,7 +885,7 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 /*
   traverse function for storing the in-memory index entries on disk
  */
-static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
+static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
                                       TDB_DATA key,
                                       TDB_DATA data,
                                       void *state)
@@ -789,7 +898,7 @@ static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
        struct ldb_val v;
        struct dn_list *list;
 
-       list = ldb_kv_index_idxptr(module, data, true);
+       list = ldb_kv_index_idxptr(module, data);
        if (list == NULL) {
                ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
                return -1;
@@ -852,8 +961,11 @@ int ldb_kv_index_transaction_cancel(struct ldb_module *module)
        if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
                tdb_close(ldb_kv->idxptr->itdb);
        }
-       talloc_free(ldb_kv->idxptr);
-       ldb_kv->idxptr = NULL;
+       TALLOC_FREE(ldb_kv->idxptr);
+       if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
        return LDB_SUCCESS;
 }
 
@@ -1172,7 +1284,8 @@ static int ldb_kv_index_dn_simple(struct ldb_module *module,
         */
        if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(dn);
        return ret;
 }
@@ -1272,8 +1385,7 @@ static int ldb_kv_index_dn_leaf(struct ldb_module *module,
   list intersection
   list = list & list2
 */
-static bool list_intersect(struct ldb_context *ldb,
-                          struct ldb_kv_private *ldb_kv,
+static bool list_intersect(struct ldb_kv_private *ldb_kv,
                           struct dn_list *list,
                           const struct dn_list *list2)
 {
@@ -1500,10 +1612,10 @@ static int ldb_kv_index_dn_or(struct ldb_module *module,
 /*
   NOT an index results
  */
-static int ldb_kv_index_dn_not(struct ldb_module *module,
-                              struct ldb_kv_private *ldb_kv,
-                              const struct ldb_parse_tree *tree,
-                              struct dn_list *list)
+static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
+                              _UNUSED_ struct ldb_kv_private *ldb_kv,
+                              _UNUSED_ const struct ldb_parse_tree *tree,
+                              _UNUSED_ struct dn_list *list)
 {
        /* the only way to do an indexed not would be if we could
           negate the not via another not or if we knew the total
@@ -1620,7 +1732,7 @@ static int ldb_kv_index_dn_and(struct ldb_module *module,
                        list->dn = list2->dn;
                        list->count = list2->count;
                        found = true;
-               } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
+               } else if (!list_intersect(ldb_kv, list, list2)) {
                        talloc_free(list2);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
@@ -1650,8 +1762,8 @@ struct ldb_kv_ordered_index_context {
        struct dn_list *dn_list;
 };
 
-static int traverse_range_index(struct ldb_kv_private *ldb_kv,
-                               struct ldb_val key,
+static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
+                               _UNUSED_ struct ldb_val key,
                                struct ldb_val data,
                                void *state)
 {
@@ -1978,7 +2090,8 @@ static int ldb_kv_index_dn_attr(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(key);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -2452,8 +2565,7 @@ int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
                         * and falling back to a full DB scan).
                         */
                        if (ret == LDB_SUCCESS) {
-                               if (!list_intersect(ldb,
-                                                   ldb_kv,
+                               if (!list_intersect(ldb_kv,
                                                    dn_list,
                                                    indexed_search_result)) {
                                        talloc_free(indexed_search_result);
@@ -2568,7 +2680,8 @@ static int ldb_kv_index_add1(struct ldb_module *module,
        }
        talloc_steal(list, dn_key);
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(list);
                return ret;
@@ -2597,7 +2710,7 @@ static int ldb_kv_index_add1(struct ldb_module *module,
                 *
                 * So need to pull the DN's to check if it's really a duplicate
                 */
-               int i;
+               unsigned int i;
                for (i=0; i < list->count; i++) {
                        uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
                        struct ldb_val key = {
@@ -3083,7 +3196,8 @@ int ldb_kv_index_del_value(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
@@ -3218,7 +3332,7 @@ int ldb_kv_index_delete(struct ldb_module *module,
 */
 static int delete_index(struct ldb_kv_private *ldb_kv,
                        struct ldb_val key,
-                       struct ldb_val data,
+                       _UNUSED_ struct ldb_val data,
                        void *state)
 {
        struct ldb_module *module = state;
@@ -3413,7 +3527,7 @@ static int re_index(struct ldb_kv_private *ldb_kv,
 }
 
 static int re_pack(struct ldb_kv_private *ldb_kv,
-                  struct ldb_val key,
+                  _UNUSED_ struct ldb_val key,
                   struct ldb_val val,
                   void *state)
 {
@@ -3457,7 +3571,7 @@ static int re_pack(struct ldb_kv_private *ldb_kv,
         * want to spam the log.
         */
        if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
-               ldb_debug(ldb, LDB_DEBUG_WARNING,
+               ldb_debug(ldb, LDB_DEBUG_ALWAYS_LOG,
                          "Repacking database with format %#010x",
                          ldb_kv->pack_format_version);
                ctx->normal_record_seen = true;
@@ -3481,10 +3595,13 @@ int ldb_kv_repack(struct ldb_module *module)
        struct ldb_kv_repack_context ctx;
        int ret;
 
+       ctx.old_version = ldb_kv->pack_format_version;
        ctx.count = 0;
        ctx.error = LDB_SUCCESS;
        ctx.normal_record_seen = false;
 
+       ldb_kv->pack_format_version = ldb_kv->target_pack_format_version;
+
        /* Iterate all database records and repack them in the new format */
        ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
        if (ret < 0) {
@@ -3531,16 +3648,28 @@ int ldb_kv_reindex(struct ldb_module *module)
         * re-index
         */
        ldb_kv_index_transaction_cancel(module);
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               ldb_kv_index_sub_transaction_cancel(ldb_kv);
+       }
 
        /*
-        * Calculate the size of the index cache that we'll need for
-        * the re-index
+        * Calculate the size of the index cache needed for
+        * the re-index. If specified always use the
+        * ldb_kv->index_transaction_cache_size otherwise use the maximum
+        * of the size estimate or the DEFAULT_INDEX_CACHE_SIZE
         */
-       index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
-       if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
-               index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
+       if (ldb_kv->index_transaction_cache_size > 0) {
+               index_cache_size = ldb_kv->index_transaction_cache_size;
+       } else {
+               index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
+               if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
+                       index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
+               }
        }
 
+       /*
+        * Note that we don't start an index sub transaction for re-indexing
+        */
        ret = ldb_kv_index_transaction_start(module, index_cache_size);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -3601,3 +3730,172 @@ int ldb_kv_reindex(struct ldb_module *module)
        }
        return LDB_SUCCESS;
 }
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ *
+ * During this 'commit' of the subtransaction to the main transaction
+ * (cache), care must be taken to free any existing index at the top
+ * level because otherwise we would leak memory.
+ */
+static int ldb_kv_sub_transaction_traverse(
+       struct tdb_context *tdb,
+       TDB_DATA key,
+       TDB_DATA data,
+       void *state)
+{
+       struct ldb_module *module = state;
+       struct ldb_kv_private *ldb_kv = talloc_get_type(
+           ldb_module_get_private(module), struct ldb_kv_private);
+       TDB_DATA rec = {0};
+       struct dn_list *index_in_subtransaction = NULL;
+       struct dn_list *index_in_top_level = NULL;
+       int ret = 0;
+
+       /*
+        * This unwraps the pointer in the DB into a pointer in
+        * memory, we are abusing TDB as a hash map, not a linearised
+        * database store
+        */
+       index_in_subtransaction = ldb_kv_index_idxptr(module, data);
+       if (index_in_subtransaction == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+
+       /*
+        * Do we already have an entry in the primary transaction cache
+        * If so free it's dn_list and replace it with the dn_list from
+        * the secondary cache
+        *
+        * The TDB and so the fetched rec contains NO DATA, just a
+        * pointer to data held in memory.
+        */
+       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       if (rec.dptr != NULL) {
+               index_in_top_level = ldb_kv_index_idxptr(module, rec);
+               free(rec.dptr);
+               if (index_in_top_level == NULL) {
+                       abort();
+               }
+               /*
+                * We had this key at the top level.  However we made a copy
+                * at the sub-transaction level so that we could possibly
+                * roll back.  We have to free the top level index memory
+                * otherwise we would leak
+                */
+               if (index_in_top_level->count > 0) {
+                       TALLOC_FREE(index_in_top_level->dn);
+               }
+               index_in_top_level->dn
+                       = talloc_steal(index_in_top_level,
+                                      index_in_subtransaction->dn);
+               index_in_top_level->count = index_in_subtransaction->count;
+               return 0;
+       }
+
+       index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
+       if (index_in_top_level == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+       index_in_top_level->dn
+               = talloc_steal(index_in_top_level,
+                              index_in_subtransaction->dn);
+       index_in_top_level->count = index_in_subtransaction->count;
+
+       rec.dptr = (uint8_t *)&index_in_top_level;
+       rec.dsize = sizeof(void *);
+
+
+       /*
+        * This is not a store into the main DB, but into an in-memory
+        * TDB, so we don't need a guard on ltdb->read_only
+        */
+       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       if (ret != 0) {
+               ldb_kv->idxptr->error = ltdb_err_map(
+                   tdb_error(ldb_kv->idxptr->itdb));
+               return -1;
+       }
+       return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+       ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * We use a tiny hash size for the sub-database (11).
+        *
+        * The sub-transaction is only for one record at a time, we
+        * would use a linked list but that would make the code even
+        * more complex when manipulating the index, as it would have
+        * to know if we were in a nested transaction (normal
+        * operations) or the top one (a reindex).
+        */
+       ldb_kv->nested_idx_ptr->itdb =
+               tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+               TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+       int ret = 0;
+
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_SUCCESS;
+       }
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_SUCCESS;
+       }
+       tdb_traverse(
+           ldb_kv->nested_idx_ptr->itdb,
+           ldb_kv_sub_transaction_traverse,
+           ldb_kv->module);
+       tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       ldb_kv->nested_idx_ptr->itdb = NULL;
+
+       ret = ldb_kv->nested_idx_ptr->error;
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+               if (!ldb_errstring(ldb)) {
+                       ldb_set_errstring(ldb, ldb_strerror(ret));
+               }
+               ldb_asprintf_errstring(
+                       ldb,
+                       __location__": Failed to update index records in "
+                       "sub transaction commit: %s",
+                       ldb_errstring(ldb));
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       return ret;
+}