ldb_tdb: Do not trigger the unique index check during a re-index, use another pass
authorAndrew Bartlett <abartlet@samba.org>
Fri, 18 Aug 2017 05:01:07 +0000 (17:01 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 7 Sep 2017 04:56:26 +0000 (06:56 +0200)
We want to rename the objects, then scan looking for the index values.

This avoids a DB modify during the index scan traverse (the index values
are actually added to an in-memory TDB, written in prepare_commit()).

This allows us to remove the "this might already exist" case in the
index handling, we now know that the entry did not exist in the index
before we add it.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=13015

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
lib/ldb/ldb_tdb/ldb_index.c

index fbc05bd6f1a3f8191fbdb3b945b63feeff43ce17..29f59f3003f82064e30c5a6f72c3ea7a8b134ac6 100644 (file)
@@ -1147,8 +1147,7 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
  * @return                  An ldb error code
  */
 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
-                          struct ldb_message_element *el, int v_idx,
-                          bool is_new)
+                          struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb;
        struct ldb_dn *dn_key;
@@ -1198,16 +1197,6 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
                return LDB_ERR_ENTRY_ALREADY_EXISTS;
        }
 
-       /* If we are doing an ADD, then this can not already be in the index,
-          as it was not already in the database, and this has already been
-          checked because the store succeeded */
-       if (! is_new) {
-               if (ltdb_dn_list_find_str(list, dn) != -1) {
-                       talloc_free(list);
-                       return LDB_SUCCESS;
-               }
-       }
-
        /* overallocate the list a bit, to reduce the number of
         * realloc trigered copies */
        alloc_len = ((list->count+1)+7) & ~7;
@@ -1231,11 +1220,11 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
   add index entries for one elements in a message
  */
 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
-                            struct ldb_message_element *el, bool is_new)
+                            struct ldb_message_element *el)
 {
        unsigned int i;
        for (i = 0; i < el->num_values; i++) {
-               int ret = ltdb_index_add1(module, dn, el, i, is_new);
+               int ret = ltdb_index_add1(module, dn, el, i);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -1249,8 +1238,7 @@ static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
  */
 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
                              struct ldb_message_element *elements,
-                             unsigned int num_el,
-                             bool is_new)
+                             unsigned int num_el)
 {
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        unsigned int i;
@@ -1269,7 +1257,7 @@ static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
                if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
                        continue;
                }
-               ret = ltdb_index_add_el(module, dn, &elements[i], is_new);
+               ret = ltdb_index_add_el(module, dn, &elements[i]);
                if (ret != LDB_SUCCESS) {
                        struct ldb_context *ldb = ldb_module_get_ctx(module);
                        ldb_asprintf_errstring(ldb,
@@ -1286,9 +1274,11 @@ static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
 /*
   insert a one level index for a message
 */
-static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
-{
-       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+static int ltdb_index_onelevel(struct ldb_module *module,
+                              const struct ldb_message *msg, int add)
+{      
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
+                                                   struct ltdb_private);
        struct ldb_message_element el;
        struct ldb_val val;
        struct ldb_dn *pdn;
@@ -1323,7 +1313,7 @@ static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_messa
        el.num_values = 1;
 
        if (add) {
-               ret = ltdb_index_add1(module, dn, &el, 0, add);
+               ret = ltdb_index_add1(module, dn, &el, 0);
        } else { /* delete */
                ret = ltdb_index_del_value(module, msg->dn, &el, 0);
        }
@@ -1347,7 +1337,7 @@ int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
        if (!ltdb_is_indexed(module, ltdb, el->name)) {
                return LDB_SUCCESS;
        }
-       return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el, true);
+       return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
 }
 
 /*
@@ -1367,8 +1357,7 @@ int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
-                                true);
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
@@ -1571,7 +1560,7 @@ struct ltdb_reindex_context {
 /*
   traversal function that adds @INDEX records during a re index
 */
-static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
        struct ldb_context *ldb;
        struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
@@ -1582,7 +1571,6 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                .data = data.dptr,
                .length = data.dsize,
        };
-       const char *dn = NULL;
        int ret;
        TDB_DATA key2;
 
@@ -1627,6 +1615,52 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        }
        talloc_free(key2.dptr);
 
+       talloc_free(msg);
+
+       return 0;
+}
+
+/*
+  traversal function that adds @INDEX records during a re index
+*/
+static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+{
+       struct ldb_context *ldb;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
+       struct ldb_message *msg;
+       const char *dn = NULL;
+       unsigned int nb_elements_in_db;
+       const struct ldb_val val = {
+               .data = data.dptr,
+               .length = data.dsize,
+       };
+       int ret;
+
+       ldb = ldb_module_get_ctx(module);
+
+       if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
+           strncmp((char *)key.dptr, "DN=", 3) != 0) {
+               return 0;
+       }
+
+       msg = ldb_msg_new(module);
+       if (msg == NULL) {
+               return -1;
+       }
+
+       ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
+                                                  msg,
+                                                  NULL, 0,
+                                                  LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
+                                                  &nb_elements_in_db);
+       if (ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                                               ldb_dn_get_linearized(msg->dn));
+               talloc_free(msg);
+               return -1;
+       }
+
        if (msg->dn == NULL) {
                dn = (char *)key.dptr + 3;
        } else {
@@ -1642,8 +1676,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                return -1;
        }
 
-       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
-                                false);
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
 
        if (ret != LDB_SUCCESS) {
                ctx->error = ret;
@@ -1686,6 +1719,9 @@ int ltdb_reindex(struct ldb_module *module)
         */
        ret = tdb_traverse(ltdb->tdb, delete_index, module);
        if (ret < 0) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
+                                      ldb_errstring(ldb));
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1697,11 +1733,29 @@ int ltdb_reindex(struct ldb_module *module)
        ctx.module = module;
        ctx.error = 0;
 
+       /* now traverse adding any indexes for normal LDB records */
+       ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
+       if (ret < 0) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
+                                      ldb_errstring(ldb));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ctx.error != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
+               return ctx.error;
+       }
+
+       ctx.error = 0;
+
        /* now traverse adding any indexes for normal LDB records */
        ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
        if (ret < 0) {
                struct ldb_context *ldb = ldb_module_get_ctx(module);
-               ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
+               ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
+                                      ldb_errstring(ldb));
                return LDB_ERR_OPERATIONS_ERROR;
        }