lib ldb key value: fix index buffering
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
index 5697c2f9c43dc0ac2e49c8b87a7f68a2357cba5b..2671d55a4d348c5aac989acf2ade60bda0152859 100644 (file)
@@ -163,6 +163,11 @@ struct dn_list {
 };
 
 struct ldb_kv_idxptr {
+       /*
+        * In memory tdb to cache the index updates performed during a
+        * transaction.  This improves the performance of operations like
+        * re-index and join
+        */
        struct tdb_context *itdb;
        int error;
 };
@@ -347,6 +352,11 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
        return list;
 }
 
+enum dn_list_will_be_read_only {
+       DN_LIST_MUTABLE = 0,
+       DN_LIST_WILL_BE_READ_ONLY = 1,
+};
+
 /*
   return the @IDX list in an index entry for a dn as a
   struct dn_list
@@ -354,27 +364,44 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
 static int ldb_kv_dn_list_load(struct ldb_module *module,
                               struct ldb_kv_private *ldb_kv,
                               struct ldb_dn *dn,
-                              struct dn_list *list)
+                              struct dn_list *list,
+                              enum dn_list_will_be_read_only read_only)
 {
        struct ldb_message *msg;
        int ret, version;
        struct ldb_message_element *el;
-       TDB_DATA rec;
+       TDB_DATA rec = {0};
        struct dn_list *list2;
-       TDB_DATA key;
+       bool from_primary_cache = false;
+       TDB_DATA key = {0};
 
        list->dn = NULL;
        list->count = 0;
+       list->strict = false;
 
-       /* see if we have any in-memory index entries */
-       if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
+       /*
+        * See if we have an in memory index cache
+        */
+       if (ldb_kv->idxptr == NULL) {
                goto normal_index;
        }
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * Have we cached this index record?
+        * If we have a nested transaction cache try that first.
+        * then try the transaction cache.
+        * if the record is not cached it will need to be read from disk.
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+       }
+       if (rec.dptr == NULL) {
+               from_primary_cache = true;
+               rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       }
        if (rec.dptr == NULL) {
                goto normal_index;
        }
@@ -387,9 +414,72 @@ static int ldb_kv_dn_list_load(struct ldb_module *module,
        }
        free(rec.dptr);
 
-       *list = *list2;
+       /*
+        * If this is a read only transaction the indexes will not be
+        * changed so we don't need a copy in the event of a rollback
+        *
+        * In this case make an early return
+        */
+       if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * record was read from the sub transaction cache, so we have
+        * already copied the primary cache record
+        */
+       if (!from_primary_cache) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * No index sub transaction active, so no need to cache a copy
+        */
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * There is an active index sub transaction, and the record was
+        * found in the primary index transaction cache.  A copy of the
+        * record needs be taken to prevent the original entry being
+        * altered, until the index sub transaction is committed.
+        */
+
+       {
+               struct ldb_val *dns = NULL;
+               size_t x = 0;
+
+               dns = talloc_array(
+                       list,
+                       struct ldb_val,
+                       list2->count);
+               if (dns == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               for (x = 0; x < list2->count; x++) {
+                       dns[x].length = list2->dn[x].length;
+                       dns[x].data = talloc_memdup(
+                               dns,
+                               list2->dn[x].data,
+                               list2->dn[x].length);
+                       if (dns[x].data == NULL) {
+                               TALLOC_FREE(dns);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+               }
+               list->dn = dns;
+               list->count = list2->count;
+       }
        return LDB_SUCCESS;
 
+       /*
+        * Index record not found in the caches, read it from the
+        * database.
+        */
 normal_index:
        msg = ldb_msg_new(list);
        if (msg == NULL) {
@@ -718,13 +808,12 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 {
        struct ldb_kv_private *ldb_kv = talloc_get_type(
            ldb_module_get_private(module), struct ldb_kv_private);
-       TDB_DATA rec, key;
-       int ret;
-       struct dn_list *list2;
+       TDB_DATA rec = {0};
+       TDB_DATA key = {0};
 
-       if (ldb_kv->idxptr == NULL) {
-               return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
-       }
+       int ret = LDB_SUCCESS;
+       struct dn_list *list2 = NULL;
+       struct ldb_kv_idxptr *idxptr = NULL;
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        if (key.dptr == NULL) {
@@ -732,7 +821,24 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        }
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * If there is an index sub transaction active, update the
+        * sub transaction index cache.  Otherwise update the
+        * primary index cache
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               idxptr = ldb_kv->nested_idx_ptr;
+       } else {
+               idxptr = ldb_kv->idxptr;
+       }
+       /*
+        * Get the cache entry for the index
+        *
+        * As the value in the cache is a pointer to a dn_list we update
+        * the dn_list directly.
+        *
+        */
+       rec = tdb_fetch(idxptr->itdb, key);
        if (rec.dptr != NULL) {
                list2 = ldb_kv_index_idxptr(module, rec);
                if (list2 == NULL) {
@@ -740,12 +846,18 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
                free(rec.dptr);
-               list2->dn = talloc_steal(list2, list->dn);
-               list2->count = list->count;
+               /* Now put the updated pointer back in the cache */
+               if (list->dn == NULL) {
+                       list2->dn = NULL;
+                       list2->count = 0;
+               } else {
+                       list2->dn = talloc_steal(list2, list->dn);
+                       list2->count = list->count;
+               }
                return LDB_SUCCESS;
        }
 
-       list2 = talloc(ldb_kv->idxptr, struct dn_list);
+       list2 = talloc(idxptr, struct dn_list);
        if (list2 == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -759,10 +871,13 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        /*
         * This is not a store into the main DB, but into an in-memory
         * TDB, so we don't need a guard on ltdb->read_only
+        *
+        * Also as we directly update the in memory dn_list for existing
+        * cache entries we must be adding a new entry to the cache.
         */
-       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
        if (ret != 0) {
-               return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+               return ltdb_err_map( tdb_error(idxptr->itdb));
        }
        return LDB_SUCCESS;
 }
@@ -846,8 +961,11 @@ int ldb_kv_index_transaction_cancel(struct ldb_module *module)
        if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
                tdb_close(ldb_kv->idxptr->itdb);
        }
-       talloc_free(ldb_kv->idxptr);
-       ldb_kv->idxptr = NULL;
+       TALLOC_FREE(ldb_kv->idxptr);
+       if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
        return LDB_SUCCESS;
 }
 
@@ -1166,7 +1284,8 @@ static int ldb_kv_index_dn_simple(struct ldb_module *module,
         */
        if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(dn);
        return ret;
 }
@@ -1971,7 +2090,8 @@ static int ldb_kv_index_dn_attr(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(key);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -2560,7 +2680,8 @@ static int ldb_kv_index_add1(struct ldb_module *module,
        }
        talloc_steal(list, dn_key);
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(list);
                return ret;
@@ -3075,7 +3196,8 @@ int ldb_kv_index_del_value(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
@@ -3523,6 +3645,9 @@ int ldb_kv_reindex(struct ldb_module *module)
         * re-index
         */
        ldb_kv_index_transaction_cancel(module);
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               ldb_kv_index_sub_transaction_cancel(ldb_kv);
+       }
 
        /*
         * Calculate the size of the index cache that we'll need for
@@ -3533,6 +3658,9 @@ int ldb_kv_reindex(struct ldb_module *module)
                index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
        }
 
+       /*
+        * Note that we don't start an index sub transaction for re-indexing
+        */
        ret = ldb_kv_index_transaction_start(module, index_cache_size);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -3593,3 +3721,172 @@ int ldb_kv_reindex(struct ldb_module *module)
        }
        return LDB_SUCCESS;
 }
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ *
+ * During this 'commit' of the subtransaction to the main transaction
+ * (cache), care must be taken to free any existing index at the top
+ * level because otherwise we would leak memory.
+ */
+static int ldb_kv_sub_transaction_traverse(
+       struct tdb_context *tdb,
+       TDB_DATA key,
+       TDB_DATA data,
+       void *state)
+{
+       struct ldb_module *module = state;
+       struct ldb_kv_private *ldb_kv = talloc_get_type(
+           ldb_module_get_private(module), struct ldb_kv_private);
+       TDB_DATA rec = {0};
+       struct dn_list *index_in_subtransaction = NULL;
+       struct dn_list *index_in_top_level = NULL;
+       int ret = 0;
+
+       /*
+        * This unwraps the pointer in the DB into a pointer in
+        * memory, we are abusing TDB as a hash map, not a linearised
+        * database store
+        */
+       index_in_subtransaction = ldb_kv_index_idxptr(module, data);
+       if (index_in_subtransaction == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+
+       /*
+        * Do we already have an entry in the primary transaction cache
+        * If so free it's dn_list and replace it with the dn_list from
+        * the secondary cache
+        *
+        * The TDB and so the fetched rec contains NO DATA, just a
+        * pointer to data held in memory.
+        */
+       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       if (rec.dptr != NULL) {
+               index_in_top_level = ldb_kv_index_idxptr(module, rec);
+               free(rec.dptr);
+               if (index_in_top_level == NULL) {
+                       abort();
+               }
+               /*
+                * We had this key at the top level.  However we made a copy
+                * at the sub-transaction level so that we could possibly
+                * roll back.  We have to free the top level index memory
+                * otherwise we would leak
+                */
+               if (index_in_top_level->count > 0) {
+                       TALLOC_FREE(index_in_top_level->dn);
+               }
+               index_in_top_level->dn
+                       = talloc_steal(index_in_top_level,
+                                      index_in_subtransaction->dn);
+               index_in_top_level->count = index_in_subtransaction->count;
+               return 0;
+       }
+
+       index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
+       if (index_in_top_level == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+       index_in_top_level->dn
+               = talloc_steal(index_in_top_level,
+                              index_in_subtransaction->dn);
+       index_in_top_level->count = index_in_subtransaction->count;
+
+       rec.dptr = (uint8_t *)&index_in_top_level;
+       rec.dsize = sizeof(void *);
+
+
+       /*
+        * This is not a store into the main DB, but into an in-memory
+        * TDB, so we don't need a guard on ltdb->read_only
+        */
+       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       if (ret != 0) {
+               ldb_kv->idxptr->error = ltdb_err_map(
+                   tdb_error(ldb_kv->idxptr->itdb));
+               return -1;
+       }
+       return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+       ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * We use a tiny hash size for the sub-database (11).
+        *
+        * The sub-transaction is only for one record at a time, we
+        * would use a linked list but that would make the code even
+        * more complex when manipulating the index, as it would have
+        * to know if we were in a nested transaction (normal
+        * operations) or the top one (a reindex).
+        */
+       ldb_kv->nested_idx_ptr->itdb =
+               tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+               TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+       int ret = 0;
+
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_SUCCESS;
+       }
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_SUCCESS;
+       }
+       tdb_traverse(
+           ldb_kv->nested_idx_ptr->itdb,
+           ldb_kv_sub_transaction_traverse,
+           ldb_kv->module);
+       tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       ldb_kv->nested_idx_ptr->itdb = NULL;
+
+       ret = ldb_kv->nested_idx_ptr->error;
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+               if (!ldb_errstring(ldb)) {
+                       ldb_set_errstring(ldb, ldb_strerror(ret));
+               }
+               ldb_asprintf_errstring(
+                       ldb,
+                       __location__": Failed to update index records in "
+                       "sub transaction commit: %s",
+                       ldb_errstring(ldb));
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       return ret;
+}