ldb: Save a copy of the index result before calling the callbacks.
authorAndrew Bartlett <abartlet@samba.org>
Mon, 28 May 2018 01:01:18 +0000 (13:01 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 30 May 2018 02:23:27 +0000 (04:23 +0200)
Otherwise Samba modules like subtree_rename can fail as they modify the
index during the callback.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=13452

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
lib/ldb/ldb_tdb/ldb_index.c
lib/ldb/tests/ldb_mod_op_test.c

index cd64e6965f6b5ec20f055dd5eaf830418685c1e9..dbe59837f6e0cd07f08c538e92761b24052b7715 100644 (file)
@@ -1717,26 +1717,61 @@ static int ltdb_index_filter(struct ltdb_private *ltdb,
                             uint32_t *match_count,
                             enum key_truncation scope_one_truncation)
 {
                             uint32_t *match_count,
                             enum key_truncation scope_one_truncation)
 {
-       struct ldb_context *ldb;
+       struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
        struct ldb_message *msg;
        struct ldb_message *filtered_msg;
        unsigned int i;
        struct ldb_message *msg;
        struct ldb_message *filtered_msg;
        unsigned int i;
+       unsigned int num_keys = 0;
        uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
        uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
+       TDB_DATA *keys = NULL;
+
+       /*
+        * We have to allocate the key list (rather than just walk the
+        * caller supplied list) as the callback could change the list
+        * (by modifying an indexed attribute hosted in the in-memory
+        * index cache!)
+        */
+       keys = talloc_array(ac, TDB_DATA, dn_list->count);
+       if (keys == NULL) {
+               return ldb_module_oom(ac->module);
+       }
 
 
-       ldb = ldb_module_get_ctx(ac->module);
+       if (ltdb->cache->GUID_index_attribute != NULL) {
+               /*
+                * We speculate that the keys will be GUID based and so
+                * pre-fill in enough space for a GUID (avoiding a pile of
+                * small allocations)
+                */
+               struct guid_tdb_key {
+                       uint8_t guid_key[LTDB_GUID_KEY_SIZE];
+               } *key_values = NULL;
+
+               key_values = talloc_array(keys,
+                                         struct guid_tdb_key,
+                                         dn_list->count);
+
+               for (i = 0; i < dn_list->count; i++) {
+                       keys[i].dptr = key_values[i].guid_key;
+                       keys[i].dsize = sizeof(key_values[i].guid_key);
+               }
+               if (key_values == NULL) {
+                       return ldb_module_oom(ac->module);
+               }
+       } else {
+               for (i = 0; i < dn_list->count; i++) {
+                       keys[i].dptr = NULL;
+                       keys[i].dsize = 0;
+               }
+       }
 
        for (i = 0; i < dn_list->count; i++) {
 
        for (i = 0; i < dn_list->count; i++) {
-               uint8_t guid_key[LTDB_GUID_KEY_SIZE];
-               TDB_DATA tdb_key = {
-                       .dptr = guid_key,
-                       .dsize = sizeof(guid_key)
-               };
                int ret;
                int ret;
-               bool matched;
 
 
-               ret = ltdb_idx_to_key(ac->module, ltdb,
-                                     ac, &dn_list->dn[i],
-                                     &tdb_key);
+               ret = ltdb_idx_to_key(ac->module,
+                                     ltdb,
+                                     keys,
+                                     &dn_list->dn[i],
+                                     &keys[num_keys]);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -1753,28 +1788,35 @@ static int ltdb_index_filter(struct ltdb_private *ltdb,
                         * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
                         */
 
                         * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
                         */
 
-                       if (memcmp(previous_guid_key, tdb_key.dptr,
+                       if (memcmp(previous_guid_key,
+                                  keys[num_keys].dptr,
                                   sizeof(previous_guid_key)) == 0) {
                                continue;
                        }
 
                                   sizeof(previous_guid_key)) == 0) {
                                continue;
                        }
 
-                       memcpy(previous_guid_key, tdb_key.dptr,
+                       memcpy(previous_guid_key,
+                              keys[num_keys].dptr,
                               sizeof(previous_guid_key));
                }
                               sizeof(previous_guid_key));
                }
+               num_keys++;
+       }
 
 
+
+       /*
+        * Now that the list is a safe copy, send the callbacks
+        */
+       for (i = 0; i < num_keys; i++) {
+               int ret;
+               bool matched;
                msg = ldb_msg_new(ac);
                if (!msg) {
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
                msg = ldb_msg_new(ac);
                if (!msg) {
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-
                ret = ltdb_search_key(ac->module, ltdb,
                ret = ltdb_search_key(ac->module, ltdb,
-                                     tdb_key, msg,
+                                     keys[i], msg,
                                      LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
                                      LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
                                      LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
                                      LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
-               if (tdb_key.dptr != guid_key) {
-                       TALLOC_FREE(tdb_key.dptr);
-               }
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
                        talloc_free(msg);
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
                        talloc_free(msg);
@@ -1834,6 +1876,7 @@ static int ltdb_index_filter(struct ltdb_private *ltdb,
                (*match_count)++;
        }
 
                (*match_count)++;
        }
 
+       TALLOC_FREE(keys);
        return LDB_SUCCESS;
 }
 
        return LDB_SUCCESS;
 }
 
index 7b6f19c3a18d16a7c88228b22dcd0ff4958a9a8b..01667af38653242dc30019da36698827d716c695 100644 (file)
@@ -2487,6 +2487,269 @@ static void test_ldb_modify_before_ldb_wait(void **state)
        assert_int_equal(res2->count, 1);
 }
 
        assert_int_equal(res2->count, 1);
 }
 
+/*
+ * This test is also complex.
+ * The purpose is to test if a modify can occur during an ldb_search()
+ * This would be a failure if if in process
+ * (1) and (2):
+ *  - (1) ltdb_search() starts and calls back for one entry
+ *  - (2) one of the entries to be matched is modified
+ *  - (1) the indexed search tries to return the modified entry, but
+ *        it is no longer found, either:
+ *          - despite it still matching (dn changed)
+ *          - it no longer matching (attrs changed)
+ *
+ * We also try un-indexed to show that the behaviour differs on this
+ * point, which it should not (an index should only impact search
+ * speed).
+ */
+
+/*
+ * This purpose of this callback is to trigger a write in the callback
+ * so as to change in in-memory index code while looping over the
+ * index result.
+ */
+
+static int test_ldb_callback_modify_during_search_callback1(struct ldb_request *req,
+                                                  struct ldb_reply *ares)
+{
+       int ret;
+       struct modify_during_search_test_ctx *ctx = req->context;
+       struct ldb_dn *dn = NULL, *new_dn = NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctx->test_ctx);
+       struct ldb_message *msg = NULL;
+
+       assert_non_null(tmp_ctx);
+
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+       {
+               const struct ldb_val *cn_val
+                       = ldb_dn_get_component_val(ares->message->dn, 0);
+               const char *cn = (char *)cn_val->data;
+               ctx->res_count++;
+               if (strcmp(cn, "test_search_cn") == 0) {
+                       ctx->got_cn = true;
+               } else if (strcmp(cn, "test_search_2_cn") == 0) {
+                       ctx->got_2_cn = true;
+               }
+               if (ctx->res_count == 2) {
+                       return LDB_SUCCESS;
+               }
+               break;
+       }
+       case LDB_REPLY_REFERRAL:
+               return LDB_SUCCESS;
+
+       case LDB_REPLY_DONE:
+               return ldb_request_done(req, LDB_SUCCESS);
+       }
+
+       if (ctx->rename) {
+               if (ctx->got_2_cn) {
+                       /* Modify this one */
+                       dn = ldb_dn_new_fmt(tmp_ctx,
+                                           ctx->test_ctx->ldb,
+                                           "cn=test_search_2_cn,%s",
+                                           ldb_dn_get_linearized(ctx->basedn));
+               } else {
+                       dn = ldb_dn_new_fmt(tmp_ctx,
+                                           ctx->test_ctx->ldb,
+                                           "cn=test_search_cn,%s",
+                                           ldb_dn_get_linearized(ctx->basedn));
+               }
+               assert_non_null(dn);
+
+               new_dn = ldb_dn_new_fmt(tmp_ctx,
+                                       ctx->test_ctx->ldb,
+                                       "cn=test_search_cn_renamed,"
+                                       "dc=not_search_test_entry");
+               assert_non_null(new_dn);
+
+               ret = ldb_rename(ctx->test_ctx->ldb, dn, new_dn);
+               assert_int_equal(ret, 0);
+
+       } else {
+               if (ctx->got_2_cn) {
+                       /* Delete this one */
+                       dn = ldb_dn_new_fmt(tmp_ctx,
+                                           ctx->test_ctx->ldb,
+                                           "cn=test_search_2_cn,%s",
+                                           ldb_dn_get_linearized(ctx->basedn));
+               } else {
+                       dn = ldb_dn_new_fmt(tmp_ctx,
+                                           ctx->test_ctx->ldb,
+                                           "cn=test_search_cn,%s",
+                                           ldb_dn_get_linearized(ctx->basedn));
+               }
+               assert_non_null(dn);
+
+               ret = ldb_delete(ctx->test_ctx->ldb, dn);
+               assert_int_equal(ret, 0);
+       }
+
+       /*
+        * Now fill in the position we just removed from the
+        * index to ensure we fail the test (otherwise we just read
+        * past the end of the array and find the value we wanted to
+        * skip)
+        */
+       msg = ldb_msg_new(tmp_ctx);
+       assert_non_null(msg);
+
+       /* We deliberatly use ou= not cn= here */
+       msg->dn = ldb_dn_new_fmt(msg,
+                                ctx->test_ctx->ldb,
+                                "ou=test_search_cn_extra,%s",
+                                ldb_dn_get_linearized(ctx->basedn));
+
+       ret = ldb_msg_add_string(msg,
+                                "objectUUID",
+                                "0123456789abcde3");
+
+       ret = ldb_add(ctx->test_ctx->ldb,
+                     msg);
+       assert_int_equal(ret, LDB_SUCCESS);
+
+       TALLOC_FREE(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
+static void test_ldb_callback_modify_during_search(void **state, bool add_index,
+                                         bool rename)
+{
+       struct search_test_ctx *search_test_ctx = talloc_get_type_abort(*state,
+                       struct search_test_ctx);
+       struct modify_during_search_test_ctx
+               ctx =
+               { .res_count = 0,
+                 .test_ctx = search_test_ctx->ldb_test_ctx,
+                 .rename = rename
+               };
+
+       int ret;
+       struct ldb_request *req;
+
+       ret = ldb_transaction_start(search_test_ctx->ldb_test_ctx->ldb);
+       assert_int_equal(ret, 0);
+
+       if (add_index) {
+               struct ldb_message *msg;
+               struct ldb_dn *indexlist = ldb_dn_new(search_test_ctx,
+                                                     search_test_ctx->ldb_test_ctx->ldb,
+                                                     "@INDEXLIST");
+               assert_non_null(indexlist);
+
+               msg = ldb_msg_new(search_test_ctx);
+               assert_non_null(msg);
+
+               msg->dn = indexlist;
+
+               ret = ldb_msg_add_string(msg, "@IDXONE", "1");
+               assert_int_equal(ret, LDB_SUCCESS);
+               ret = ldb_msg_add_string(msg, "@IDXATTR", "cn");
+               assert_int_equal(ret, LDB_SUCCESS);
+               ret = ldb_add(search_test_ctx->ldb_test_ctx->ldb,
+                             msg);
+               if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+                       msg->elements[0].flags = LDB_FLAG_MOD_ADD;
+                       msg->elements[1].flags = LDB_FLAG_MOD_ADD;
+                       ret = ldb_modify(search_test_ctx->ldb_test_ctx->ldb,
+                                        msg);
+               }
+               assert_int_equal(ret, LDB_SUCCESS);
+
+               /*
+                * Now bring the IDXONE index into memory by modifying
+                * it.  This exposes an issue in ldb_tdb
+                */
+               msg = ldb_msg_new(search_test_ctx);
+               assert_non_null(msg);
+
+               msg->dn = ldb_dn_new_fmt(search_test_ctx,
+                                        search_test_ctx->ldb_test_ctx->ldb,
+                                        "cn=test_search_cn_extra,%s",
+                                        search_test_ctx->base_dn);
+
+               ret = ldb_msg_add_string(msg,
+                                        "objectUUID",
+                                        "0123456789abcde2");
+
+               ret = ldb_add(search_test_ctx->ldb_test_ctx->ldb,
+                             msg);
+               assert_int_equal(ret, LDB_SUCCESS);
+
+               ret = ldb_delete(search_test_ctx->ldb_test_ctx->ldb,
+                                msg->dn);
+               assert_int_equal(ret, LDB_SUCCESS);
+       }
+
+       tevent_loop_allow_nesting(search_test_ctx->ldb_test_ctx->ev);
+
+       ctx.basedn
+               = ldb_dn_new_fmt(search_test_ctx,
+                                search_test_ctx->ldb_test_ctx->ldb,
+                                "%s",
+                                search_test_ctx->base_dn);
+       assert_non_null(ctx.basedn);
+
+
+       /*
+        * This search must be over multiple items, and should include
+        * the new name after a rename, to show that it would match
+        * both before and after that modify
+        *
+        * This needs to be a search that isn't matched by an index so
+        * that we just use the one-level index.
+        */
+       ret = ldb_build_search_req(&req,
+                                  search_test_ctx->ldb_test_ctx->ldb,
+                                  search_test_ctx,
+                                  ctx.basedn,
+                                  LDB_SCOPE_ONELEVEL,
+                                  "(cn=*)",
+                                  NULL,
+                                  NULL,
+                                  &ctx,
+                                  test_ldb_callback_modify_during_search_callback1,
+                                  NULL);
+       assert_int_equal(ret, 0);
+
+       ret = ldb_request(search_test_ctx->ldb_test_ctx->ldb, req);
+
+       if (ret == LDB_SUCCESS) {
+               ret = ldb_wait(req->handle, LDB_WAIT_ALL);
+       }
+       assert_int_equal(ret, 0);
+
+       ret = ldb_transaction_commit(search_test_ctx->ldb_test_ctx->ldb);
+       assert_int_equal(ret, 0);
+
+       assert_int_equal(ctx.res_count, 2);
+       assert_int_equal(ctx.got_cn, true);
+       assert_int_equal(ctx.got_2_cn, true);
+}
+
+static void test_ldb_callback_delete_during_indexed_search(void **state)
+{
+       test_ldb_callback_modify_during_search(state, true, false);
+}
+
+static void test_ldb_callback_delete_during_unindexed_search(void **state)
+{
+       test_ldb_callback_modify_during_search(state, false, false);
+}
+
+static void test_ldb_callback_rename_during_indexed_search(void **state)
+{
+       test_ldb_callback_modify_during_search(state, true, true);
+}
+
+static void test_ldb_callback_rename_during_unindexed_search(void **state)
+{
+       test_ldb_callback_modify_during_search(state, false, true);
+}
+
 static int ldb_case_test_setup(void **state)
 {
        int ret;
 static int ldb_case_test_setup(void **state)
 {
        int ret;
@@ -4345,6 +4608,18 @@ int main(int argc, const char **argv)
                cmocka_unit_test_setup_teardown(test_ldb_rename_during_indexed_search,
                                                ldb_search_test_setup,
                                                ldb_search_test_teardown),
                cmocka_unit_test_setup_teardown(test_ldb_rename_during_indexed_search,
                                                ldb_search_test_setup,
                                                ldb_search_test_teardown),
+               cmocka_unit_test_setup_teardown(test_ldb_callback_rename_during_unindexed_search,
+                                               ldb_search_test_setup,
+                                               ldb_search_test_teardown),
+               cmocka_unit_test_setup_teardown(test_ldb_callback_rename_during_indexed_search,
+                                               ldb_search_test_setup,
+                                               ldb_search_test_teardown),
+               cmocka_unit_test_setup_teardown(test_ldb_callback_delete_during_unindexed_search,
+                                               ldb_search_test_setup,
+                                               ldb_search_test_teardown),
+               cmocka_unit_test_setup_teardown(test_ldb_callback_delete_during_indexed_search,
+                                               ldb_search_test_setup,
+                                               ldb_search_test_teardown),
                cmocka_unit_test_setup_teardown(test_ldb_modify_during_whole_search,
                                                ldb_search_test_setup,
                                                ldb_search_test_teardown),
                cmocka_unit_test_setup_teardown(test_ldb_modify_during_whole_search,
                                                ldb_search_test_setup,
                                                ldb_search_test_teardown),