};
struct ldb_kv_idxptr {
+ /*
+ * In memory tdb to cache the index updates performed during a
+ * transaction. This improves the performance of operations like
+ * re-index and join
+ */
struct tdb_context *itdb;
int error;
};
alignment
*/
static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
- TDB_DATA rec,
- bool check_parent)
+ TDB_DATA rec)
{
struct dn_list *list;
if (rec.dsize != sizeof(void *)) {
talloc_get_name(list));
return NULL;
}
- if (check_parent && list->dn && talloc_parent(list->dn) != list) {
- ldb_asprintf_errstring(ldb_module_get_ctx(module),
- "Bad parent '%s' for idxptr",
- talloc_get_name(talloc_parent(list->dn)));
- return NULL;
- }
return list;
}
+enum dn_list_will_be_read_only {
+ DN_LIST_MUTABLE = 0,
+ DN_LIST_WILL_BE_READ_ONLY = 1,
+};
+
/*
return the @IDX list in an index entry for a dn as a
struct dn_list
static int ldb_kv_dn_list_load(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
struct ldb_dn *dn,
- struct dn_list *list)
+ struct dn_list *list,
+ enum dn_list_will_be_read_only read_only)
{
struct ldb_message *msg;
int ret, version;
struct ldb_message_element *el;
- TDB_DATA rec;
+ TDB_DATA rec = {0};
struct dn_list *list2;
- TDB_DATA key;
+ bool from_primary_cache = false;
+ TDB_DATA key = {0};
list->dn = NULL;
list->count = 0;
+ list->strict = false;
- /* see if we have any in-memory index entries */
- if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
+ /*
+ * See if we have an in memory index cache
+ */
+ if (ldb_kv->idxptr == NULL) {
goto normal_index;
}
key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
key.dsize = strlen((char *)key.dptr);
- rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ /*
+ * Have we cached this index record?
+ * If we have a nested transaction cache try that first.
+ * then try the transaction cache.
+ * if the record is not cached it will need to be read from disk.
+ */
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+ }
+ if (rec.dptr == NULL) {
+ from_primary_cache = true;
+ rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ }
if (rec.dptr == NULL) {
goto normal_index;
}
/* we've found an in-memory index entry */
- list2 = ldb_kv_index_idxptr(module, rec, true);
+ list2 = ldb_kv_index_idxptr(module, rec);
if (list2 == NULL) {
free(rec.dptr);
return LDB_ERR_OPERATIONS_ERROR;
}
free(rec.dptr);
- *list = *list2;
+ /*
+ * If this is a read only transaction the indexes will not be
+ * changed so we don't need a copy in the event of a rollback
+ *
+ * In this case make an early return
+ */
+ if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * record was read from the sub transaction cache, so we have
+ * already copied the primary cache record
+ */
+ if (!from_primary_cache) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * No index sub transaction active, so no need to cache a copy
+ */
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ *list = *list2;
+ return LDB_SUCCESS;
+ }
+
+ /*
+ * There is an active index sub transaction, and the record was
+ * found in the primary index transaction cache. A copy of the
+ * record needs be taken to prevent the original entry being
+ * altered, until the index sub transaction is committed.
+ */
+
+ {
+ struct ldb_val *dns = NULL;
+ size_t x = 0;
+
+ dns = talloc_array(
+ list,
+ struct ldb_val,
+ list2->count);
+ if (dns == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ for (x = 0; x < list2->count; x++) {
+ dns[x].length = list2->dn[x].length;
+ dns[x].data = talloc_memdup(
+ dns,
+ list2->dn[x].data,
+ list2->dn[x].length);
+ if (dns[x].data == NULL) {
+ TALLOC_FREE(dns);
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ }
+ list->dn = dns;
+ list->count = list2->count;
+ }
return LDB_SUCCESS;
+ /*
+ * Index record not found in the caches, read it from the
+ * database.
+ */
normal_index:
msg = ldb_msg_new(list);
if (msg == NULL) {
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
ret = LDB_SUCCESS;
}
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ret;
}
ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
LDB_KV_INDEXING_VERSION);
if (ret != LDB_SUCCESS) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_oom(module);
}
} else {
ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
LDB_KV_GUID_INDEXING_VERSION);
if (ret != LDB_SUCCESS) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_oom(module);
}
}
ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
if (ret != LDB_SUCCESS) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_oom(module);
}
el->values = talloc_array(msg,
struct ldb_val, 1);
if (el->values == NULL) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_oom(module);
}
list->count,
LDB_KV_GUID_SIZE);
if (v.data == NULL) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_oom(module);
}
for (i = 0; i < list->count; i++) {
if (list->dn[i].length !=
LDB_KV_GUID_SIZE) {
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ldb_module_operr(module);
}
memcpy(&v.data[LDB_KV_GUID_SIZE*i],
}
ret = ldb_kv_store(module, msg, TDB_REPLACE);
- talloc_free(msg);
+ TALLOC_FREE(msg);
return ret;
}
{
struct ldb_kv_private *ldb_kv = talloc_get_type(
ldb_module_get_private(module), struct ldb_kv_private);
- TDB_DATA rec, key;
- int ret;
- struct dn_list *list2;
+ TDB_DATA rec = {0};
+ TDB_DATA key = {0};
- if (ldb_kv->idxptr == NULL) {
- return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
- }
+ int ret = LDB_SUCCESS;
+ struct dn_list *list2 = NULL;
+ struct ldb_kv_idxptr *idxptr = NULL;
key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
if (key.dptr == NULL) {
}
key.dsize = strlen((char *)key.dptr);
- rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ /*
+ * If there is an index sub transaction active, update the
+ * sub transaction index cache. Otherwise update the
+ * primary index cache
+ */
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ idxptr = ldb_kv->nested_idx_ptr;
+ } else {
+ idxptr = ldb_kv->idxptr;
+ }
+ /*
+ * Get the cache entry for the index
+ *
+ * As the value in the cache is a pointer to a dn_list we update
+ * the dn_list directly.
+ *
+ */
+ rec = tdb_fetch(idxptr->itdb, key);
if (rec.dptr != NULL) {
- list2 = ldb_kv_index_idxptr(module, rec, false);
+ list2 = ldb_kv_index_idxptr(module, rec);
if (list2 == NULL) {
free(rec.dptr);
return LDB_ERR_OPERATIONS_ERROR;
}
free(rec.dptr);
- list2->dn = talloc_steal(list2, list->dn);
- list2->count = list->count;
+ /* Now put the updated pointer back in the cache */
+ if (list->dn == NULL) {
+ list2->dn = NULL;
+ list2->count = 0;
+ } else {
+ list2->dn = talloc_steal(list2, list->dn);
+ list2->count = list->count;
+ }
return LDB_SUCCESS;
}
- list2 = talloc(ldb_kv->idxptr, struct dn_list);
+ list2 = talloc(idxptr, struct dn_list);
if (list2 == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
/*
* This is not a store into the main DB, but into an in-memory
* TDB, so we don't need a guard on ltdb->read_only
+ *
+ * Also as we directly update the in memory dn_list for existing
+ * cache entries we must be adding a new entry to the cache.
*/
- ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+ ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
if (ret != 0) {
- return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+ return ltdb_err_map( tdb_error(idxptr->itdb));
}
return LDB_SUCCESS;
}
struct ldb_val v;
struct dn_list *list;
- list = ldb_kv_index_idxptr(module, data, true);
+ list = ldb_kv_index_idxptr(module, data);
if (list == NULL) {
ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
return -1;
if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
tdb_close(ldb_kv->idxptr->itdb);
}
- talloc_free(ldb_kv->idxptr);
- ldb_kv->idxptr = NULL;
+ TALLOC_FREE(ldb_kv->idxptr);
+ if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ }
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
return LDB_SUCCESS;
}
*/
if (!dn) return LDB_ERR_OPERATIONS_ERROR;
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
+ DN_LIST_WILL_BE_READ_ONLY);
talloc_free(dn);
return ret;
}
/*
NOT an index results
*/
-static int ldb_kv_index_dn_not(struct ldb_module *module,
- struct ldb_kv_private *ldb_kv,
- const struct ldb_parse_tree *tree,
- struct dn_list *list)
+static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
+ _UNUSED_ struct ldb_kv_private *ldb_kv,
+ _UNUSED_ const struct ldb_parse_tree *tree,
+ _UNUSED_ struct dn_list *list)
{
/* the only way to do an indexed not would be if we could
negate the not via another not or if we knew the total
struct dn_list *dn_list;
};
-static int traverse_range_index(struct ldb_kv_private *ldb_kv,
- struct ldb_val key,
+static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
+ _UNUSED_ struct ldb_val key,
struct ldb_val data,
void *state)
{
return LDB_ERR_OPERATIONS_ERROR;
}
- ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
+ DN_LIST_WILL_BE_READ_ONLY);
talloc_free(key);
if (ret != LDB_SUCCESS) {
return ret;
}
talloc_steal(list, dn_key);
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+ DN_LIST_MUTABLE);
if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
talloc_free(list);
return ret;
*
* So need to pull the DN's to check if it's really a duplicate
*/
- int i;
+ unsigned int i;
for (i=0; i < list->count; i++) {
uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
struct ldb_val key = {
return LDB_ERR_OPERATIONS_ERROR;
}
- ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+ ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+ DN_LIST_MUTABLE);
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
/* it wasn't indexed. Did we have an earlier error? If we did then
its gone now */
*/
static int delete_index(struct ldb_kv_private *ldb_kv,
struct ldb_val key,
- struct ldb_val data,
+ _UNUSED_ struct ldb_val data,
void *state)
{
struct ldb_module *module = state;
}
static int re_pack(struct ldb_kv_private *ldb_kv,
- struct ldb_val key,
+ _UNUSED_ struct ldb_val key,
struct ldb_val val,
void *state)
{
* want to spam the log.
*/
if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
- ldb_debug(ldb, LDB_DEBUG_WARNING,
+ ldb_debug(ldb, LDB_DEBUG_ALWAYS_LOG,
"Repacking database with format %#010x",
ldb_kv->pack_format_version);
ctx->normal_record_seen = true;
struct ldb_kv_repack_context ctx;
int ret;
+ ctx.old_version = ldb_kv->pack_format_version;
ctx.count = 0;
ctx.error = LDB_SUCCESS;
ctx.normal_record_seen = false;
+ ldb_kv->pack_format_version = ldb_kv->target_pack_format_version;
+
/* Iterate all database records and repack them in the new format */
ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
if (ret < 0) {
* re-index
*/
ldb_kv_index_transaction_cancel(module);
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ ldb_kv_index_sub_transaction_cancel(ldb_kv);
+ }
/*
- * Calculate the size of the index cache that we'll need for
- * the re-index
+ * Calculate the size of the index cache needed for
+ * the re-index. If specified always use the
+ * ldb_kv->index_transaction_cache_size otherwise use the maximum
+ * of the size estimate or the DEFAULT_INDEX_CACHE_SIZE
*/
- index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
- if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
- index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
+ if (ldb_kv->index_transaction_cache_size > 0) {
+ index_cache_size = ldb_kv->index_transaction_cache_size;
+ } else {
+ index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
+ if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
+ index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
+ }
}
+ /*
+ * Note that we don't start an index sub transaction for re-indexing
+ */
ret = ldb_kv_index_transaction_start(module, index_cache_size);
if (ret != LDB_SUCCESS) {
return ret;
}
return LDB_SUCCESS;
}
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ *
+ * During this 'commit' of the subtransaction to the main transaction
+ * (cache), care must be taken to free any existing index at the top
+ * level because otherwise we would leak memory.
+ */
+static int ldb_kv_sub_transaction_traverse(
+ struct tdb_context *tdb,
+ TDB_DATA key,
+ TDB_DATA data,
+ void *state)
+{
+ struct ldb_module *module = state;
+ struct ldb_kv_private *ldb_kv = talloc_get_type(
+ ldb_module_get_private(module), struct ldb_kv_private);
+ TDB_DATA rec = {0};
+ struct dn_list *index_in_subtransaction = NULL;
+ struct dn_list *index_in_top_level = NULL;
+ int ret = 0;
+
+ /*
+ * This unwraps the pointer in the DB into a pointer in
+ * memory, we are abusing TDB as a hash map, not a linearised
+ * database store
+ */
+ index_in_subtransaction = ldb_kv_index_idxptr(module, data);
+ if (index_in_subtransaction == NULL) {
+ ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+ return -1;
+ }
+
+ /*
+ * Do we already have an entry in the primary transaction cache
+ * If so free it's dn_list and replace it with the dn_list from
+ * the secondary cache
+ *
+ * The TDB and so the fetched rec contains NO DATA, just a
+ * pointer to data held in memory.
+ */
+ rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+ if (rec.dptr != NULL) {
+ index_in_top_level = ldb_kv_index_idxptr(module, rec);
+ free(rec.dptr);
+ if (index_in_top_level == NULL) {
+ abort();
+ }
+ /*
+ * We had this key at the top level. However we made a copy
+ * at the sub-transaction level so that we could possibly
+ * roll back. We have to free the top level index memory
+ * otherwise we would leak
+ */
+ if (index_in_top_level->count > 0) {
+ TALLOC_FREE(index_in_top_level->dn);
+ }
+ index_in_top_level->dn
+ = talloc_steal(index_in_top_level,
+ index_in_subtransaction->dn);
+ index_in_top_level->count = index_in_subtransaction->count;
+ return 0;
+ }
+
+ index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
+ if (index_in_top_level == NULL) {
+ ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+ return -1;
+ }
+ index_in_top_level->dn
+ = talloc_steal(index_in_top_level,
+ index_in_subtransaction->dn);
+ index_in_top_level->count = index_in_subtransaction->count;
+
+ rec.dptr = (uint8_t *)&index_in_top_level;
+ rec.dsize = sizeof(void *);
+
+
+ /*
+ * This is not a store into the main DB, but into an in-memory
+ * TDB, so we don't need a guard on ltdb->read_only
+ */
+ ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+ if (ret != 0) {
+ ldb_kv->idxptr->error = ltdb_err_map(
+ tdb_error(ldb_kv->idxptr->itdb));
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+ ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+
+ /*
+ * We use a tiny hash size for the sub-database (11).
+ *
+ * The sub-transaction is only for one record at a time, we
+ * would use a linked list but that would make the code even
+ * more complex when manipulating the index, as it would have
+ * to know if we were in a nested transaction (normal
+ * operations) or the top one (a reindex).
+ */
+ ldb_kv->nested_idx_ptr->itdb =
+ tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+ if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+ if (ldb_kv->nested_idx_ptr != NULL) {
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
+ }
+ return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+ int ret = 0;
+
+ if (ldb_kv->nested_idx_ptr == NULL) {
+ return LDB_SUCCESS;
+ }
+ if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+ return LDB_SUCCESS;
+ }
+ tdb_traverse(
+ ldb_kv->nested_idx_ptr->itdb,
+ ldb_kv_sub_transaction_traverse,
+ ldb_kv->module);
+ tdb_close(ldb_kv->nested_idx_ptr->itdb);
+ ldb_kv->nested_idx_ptr->itdb = NULL;
+
+ ret = ldb_kv->nested_idx_ptr->error;
+ if (ret != LDB_SUCCESS) {
+ struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+ if (!ldb_errstring(ldb)) {
+ ldb_set_errstring(ldb, ldb_strerror(ret));
+ }
+ ldb_asprintf_errstring(
+ ldb,
+ __location__": Failed to update index records in "
+ "sub transaction commit: %s",
+ ldb_errstring(ldb));
+ }
+ TALLOC_FREE(ldb_kv->nested_idx_ptr);
+ return ret;
+}