};
struct ldb_kv_idxptr {
+ /*
+ * In memory tdb to cache the index updates performed during a
+ * transaction. This improves the performance of operations like
+ * re-index and join
+ */
struct tdb_context *itdb;
int error;
};
return list;
}
+enum dn_list_will_be_read_only {
+ DN_LIST_MUTABLE = 0,
+ DN_LIST_WILL_BE_READ_ONLY = 1,
+};
+
/*
return the @IDX list in an index entry for a dn as a
struct dn_list
static int ldb_kv_dn_list_load(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
struct ldb_dn *dn,
- struct dn_list *list)
+ struct dn_list *list,
+ enum dn_list_will_be_read_only read_only)
{
struct ldb_message *msg;
int ret, version;
struct ldb_message_element *el;
- TDB_DATA rec;
+ TDB_DATA rec = {0};
struct dn_list *list2;
- TDB_DATA key;
+ bool from_primary_cache = false;
+ TDB_DATA key = {0};
list->dn = NULL;
list->count = 0;
+ list->strict = false;
- /* see if we have any in-memory index entries */
- if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
+ /*
+ * See if we have an in memory index cache
+ */
+ if (ldb_kv->idxptr == NULL) {
goto normal_index;
}
key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
key.dsize = strlen((char *)key.dptr);
- rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ /*
+ * Have we cached this index record?
+ * If we have a nested transaction cache try that first.
+ * then try the transaction cache.
+ * if the record is not cached it will need to be read from disk.
+ */
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+ }
+ if (rec.dptr == NULL) {
+ from_primary_cache = true;
+ rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ }
if (rec.dptr == NULL) {
goto normal_index;
}
}
free(rec.dptr);
- *list = *list2;
+ /*
+ * If this is a read only transaction the indexes will not be
+ * changed so we don't need a copy in the event of a rollback
+ *
+ * In this case make an early return
+ */
+ if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * record was read from the sub transaction cache, so we have
+ * already copied the primary cache record
+ */
+ if (!from_primary_cache) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * No index sub transaction active, so no need to cache a copy
+ */
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * There is an active index sub transaction, and the record was
+ * found in the primary index transaction cache. A copy of the
+ * record needs be taken to prevent the original entry being
+ * altered, until the index sub transaction is committed.
+ */
+
+ {
+ struct ldb_val *dns = NULL;
+ size_t x = 0;
+
+ dns = talloc_array(
+ list,
+ struct ldb_val,
+ list2->count);
+ if (dns == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ for (x = 0; x < list2->count; x++) {
+ dns[x].length = list2->dn[x].length;
+ dns[x].data = talloc_memdup(
+ dns,
+ list2->dn[x].data,
+ list2->dn[x].length);
+ if (dns[x].data == NULL) {
+ TALLOC_FREE(dns);
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ }
+ list->dn = dns;
+ list->count = list2->count;
+ }
return LDB_SUCCESS;
+ /*
+ * Index record not found in the caches, read it from the
+ * database.
+ */
normal_index:
msg = ldb_msg_new(list);
if (msg == NULL) {
{
struct ldb_kv_private *ldb_kv = talloc_get_type(
ldb_module_get_private(module), struct ldb_kv_private);
- TDB_DATA rec, key;
- int ret;
- struct dn_list *list2;
+ TDB_DATA rec = {0};
+ TDB_DATA key = {0};
- if (ldb_kv->idxptr == NULL) {
- return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
- }
+ int ret = LDB_SUCCESS;
+ struct dn_list *list2 = NULL;
+ struct ldb_kv_idxptr *idxptr = NULL;
key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
if (key.dptr == NULL) {
}
key.dsize = strlen((char *)key.dptr);
- rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ /*
+ * If there is an index sub transaction active, update the
+ * sub transaction index cache. Otherwise update the
+ * primary index cache
+ */
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ idxptr = ldb_kv->nested_idx_ptr;
+ } else {
+ idxptr = ldb_kv->idxptr;
+ }
+ /*
+ * Get the cache entry for the index
+ *
+ * As the value in the cache is a pointer to a dn_list we update
+ * the dn_list directly.
+ *
+ */
+ rec = tdb_fetch(idxptr->itdb, key);
if (rec.dptr != NULL) {
list2 = ldb_kv_index_idxptr(module, rec);
if (list2 == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
free(rec.dptr);
- list2->dn = talloc_steal(list2, list->dn);
- list2->count = list->count;
+ /* Now put the updated pointer back in the cache */
+ if (list->dn == NULL) {
+ list2->dn = NULL;
+ list2->count = 0;
+ } else {
+ list2->dn = talloc_steal(list2, list->dn);
+ list2->count = list->count;
+ }
return LDB_SUCCESS;
}
- list2 = talloc(ldb_kv->idxptr, struct dn_list);
+ list2 = talloc(idxptr, struct dn_list);
if (list2 == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
/*
* This is not a store into the main DB, but into an in-memory
* TDB, so we don't need a guard on ltdb->read_only
+ *
+ * Also as we directly update the in memory dn_list for existing
+ * cache entries we must be adding a new entry to the cache.
*/
- ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+ ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
if (ret != 0) {
- return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+ return ltdb_err_map( tdb_error(idxptr->itdb));
}
return LDB_SUCCESS;
}
if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
tdb_close(ldb_kv->idxptr->itdb);
}
- talloc_free(ldb_kv->idxptr);
- ldb_kv->idxptr = NULL;
+ TALLOC_FREE(ldb_kv->idxptr);
+ if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ }
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
return LDB_SUCCESS;
}
*/
if (!dn) return LDB_ERR_OPERATIONS_ERROR;
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
+ DN_LIST_WILL_BE_READ_ONLY);
talloc_free(dn);
return ret;
}
return LDB_ERR_OPERATIONS_ERROR;
}
- ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
+ DN_LIST_WILL_BE_READ_ONLY);
talloc_free(key);
if (ret != LDB_SUCCESS) {
return ret;
}
talloc_steal(list, dn_key);
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+ DN_LIST_MUTABLE);
if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
talloc_free(list);
return ret;
return LDB_ERR_OPERATIONS_ERROR;
}
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+ DN_LIST_MUTABLE);
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
/* it wasn't indexed. Did we have an earlier error? If we did then
its gone now */
* re-index
*/
ldb_kv_index_transaction_cancel(module);
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ ldb_kv_index_sub_transaction_cancel(ldb_kv);
+ }
/*
* Calculate the size of the index cache that we'll need for
index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
}
+ /*
+ * Note that we don't start an index sub transaction for re-indexing
+ */
ret = ldb_kv_index_transaction_start(module, index_cache_size);
if (ret != LDB_SUCCESS) {
return ret;
}
return LDB_SUCCESS;
}
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ *
+ * During this 'commit' of the subtransaction to the main transaction
+ * (cache), care must be taken to free any existing index at the top
+ * level because otherwise we would leak memory.
+ */
+static int ldb_kv_sub_transaction_traverse(
+ struct tdb_context *tdb,
+ TDB_DATA key,
+ TDB_DATA data,
+ void *state)
+{
+ struct ldb_module *module = state;
+ struct ldb_kv_private *ldb_kv = talloc_get_type(
+ ldb_module_get_private(module), struct ldb_kv_private);
+ TDB_DATA rec = {0};
+ struct dn_list *index_in_subtransaction = NULL;
+ struct dn_list *index_in_top_level = NULL;
+ int ret = 0;
+
+ /*
+ * This unwraps the pointer in the DB into a pointer in
+ * memory, we are abusing TDB as a hash map, not a linearised
+ * database store
+ */
+ index_in_subtransaction = ldb_kv_index_idxptr(module, data);
+ if (index_in_subtransaction == NULL) {
+ ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+ return -1;
+ }
+
+ /*
+ * Do we already have an entry in the primary transaction cache
+ * If so free it's dn_list and replace it with the dn_list from
+ * the secondary cache
+ *
+ * The TDB and so the fetched rec contains NO DATA, just a
+ * pointer to data held in memory.
+ */
+ rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ if (rec.dptr != NULL) {
+ index_in_top_level = ldb_kv_index_idxptr(module, rec);
+ free(rec.dptr);
+ if (index_in_top_level == NULL) {
+ abort();
+ }
+ /*
+ * We had this key at the top level. However we made a copy
+ * at the sub-transaction level so that we could possibly
+ * roll back. We have to free the top level index memory
+ * otherwise we would leak
+ */
+ if (index_in_top_level->count > 0) {
+ TALLOC_FREE(index_in_top_level->dn);
+ }
+ index_in_top_level->dn
+ = talloc_steal(index_in_top_level,
+ index_in_subtransaction->dn);
+ index_in_top_level->count = index_in_subtransaction->count;
+ return 0;
+ }
+
+ index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
+ if (index_in_top_level == NULL) {
+ ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+ return -1;
+ }
+ index_in_top_level->dn
+ = talloc_steal(index_in_top_level,
+ index_in_subtransaction->dn);
+ index_in_top_level->count = index_in_subtransaction->count;
+
+ rec.dptr = (uint8_t *)&index_in_top_level;
+ rec.dsize = sizeof(void *);
+
+
+ /*
+ * This is not a store into the main DB, but into an in-memory
+ * TDB, so we don't need a guard on ltdb->read_only
+ */
+ ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+ if (ret != 0) {
+ ldb_kv->idxptr->error = ltdb_err_map(
+ tdb_error(ldb_kv->idxptr->itdb));
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+ ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+
+ /*
+ * We use a tiny hash size for the sub-database (11).
+ *
+ * The sub-transaction is only for one record at a time, we
+ * would use a linked list but that would make the code even
+ * more complex when manipulating the index, as it would have
+ * to know if we were in a nested transaction (normal
+ * operations) or the top one (a reindex).
+ */
+ ldb_kv->nested_idx_ptr->itdb =
+ tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+ if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
+ }
+ return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+ int ret = 0;
+
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ return LDB_SUCCESS;
+ }
+ if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+ return LDB_SUCCESS;
+ }
+ tdb_traverse(
+ ldb_kv->nested_idx_ptr->itdb,
+ ldb_kv_sub_transaction_traverse,
+ ldb_kv->module);
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ ldb_kv->nested_idx_ptr->itdb = NULL;
+
+ ret = ldb_kv->nested_idx_ptr->error;
+ if (ret != LDB_SUCCESS) {
+ struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+ if (!ldb_errstring(ldb)) {
+ ldb_set_errstring(ldb, ldb_strerror(ret));
+ }
+ ldb_asprintf_errstring(
+ ldb,
+ __location__": Failed to update index records in "
+ "sub transaction commit: %s",
+ ldb_errstring(ldb));
+ }
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
+ return ret;
+}