s4-ldb: cope better with corruption of tdb records
authorAndrew Tridgell <tridge@samba.org>
Tue, 15 Sep 2009 17:00:24 +0000 (10:00 -0700)
committerAndrew Tridgell <tridge@samba.org>
Wed, 16 Sep 2009 01:45:42 +0000 (18:45 -0700)
When doing an indexed search if we hit a corrupt record we abandoned
the indexed search and did a full search. The problem was that we
might have sent some records to the caller already, which means the
caller ended up with duplicate records. Fix this by returning a search
error if indexing returns an error and we have given any records to
the caller.

source4/lib/ldb/ldb_tdb/ldb_index.c
source4/lib/ldb/ldb_tdb/ldb_pack.c
source4/lib/ldb/ldb_tdb/ldb_search.c
source4/lib/ldb/ldb_tdb/ldb_tdb.h

index 85fbfa0458fce8093bd7ec29bbb742520d38ca43..b959471d163584598243d92b4d53d07e5c858fa9 100644 (file)
@@ -1037,7 +1037,8 @@ static int ltdb_index_dn(struct ldb_module *module,
   extracting just the given attributes
 */
 static int ltdb_index_filter(const struct dn_list *dn_list,
-                            struct ltdb_context *ac)
+                            struct ltdb_context *ac, 
+                            uint32_t *match_count)
 {
        struct ldb_context *ldb;
        struct ldb_message *msg;
@@ -1093,6 +1094,8 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        ac->request_terminated = true;
                        return ret;
                }
+
+               (*match_count)++;
        }
 
        return LDB_SUCCESS;
@@ -1103,7 +1106,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
   returns -1 if an indexed search is not possible, in which
   case the caller should call ltdb_search_full()
 */
-int ltdb_search_indexed(struct ltdb_context *ac)
+int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
 {
        struct ldb_context *ldb;
        void *data = ldb_module_get_private(ac->module);
@@ -1166,7 +1169,7 @@ int ltdb_search_indexed(struct ltdb_context *ac)
        if (ret == LDB_SUCCESS) {
                /* we've got a candidate list - now filter by the full tree
                   and extract the needed attributes */
-               ret = ltdb_index_filter(dn_list, ac);
+               ret = ltdb_index_filter(dn_list, ac, match_count);
        }
 
        talloc_free(dn_list);
@@ -1578,6 +1581,8 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 
        ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                         ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
index 5640e7053c8f493ca9e1157d4b51d38aa50d659b..e7aeb47e72d8bf4d627092004786a8848d44ff26 100644 (file)
@@ -236,6 +236,10 @@ int ltdb_unpack_data(struct ldb_module *module,
                        errno = EIO;
                        goto failed;
                }
+               if (len == 0) {
+                       errno = EIO;
+                       goto failed;
+               }
                message->elements[i].flags = 0;
                message->elements[i].name = talloc_strndup(message->elements, (char *)p, len);
                if (message->elements[i].name == NULL) {
index b307c5fb2f349d40cde4bd048934a2e3a8784686..a6647ccd506bc34a5953470118fd4efa438ccb0b 100644 (file)
@@ -265,6 +265,9 @@ int ltdb_search_dn1(struct ldb_module *module, struct ldb_dn *dn, struct ldb_mes
        ret = ltdb_unpack_data(module, &tdb_data, msg);
        free(tdb_data.dptr);
        if (ret == -1) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                         ldb_dn_get_linearized(msg->dn));
                return LDB_ERR_OPERATIONS_ERROR;                
        }
 
@@ -535,7 +538,9 @@ int ltdb_search(struct ltdb_context *ctx)
        ctx->attrs = req->op.search.attrs;
 
        if (ret == LDB_SUCCESS) {
-               ret = ltdb_search_indexed(ctx);
+               uint32_t match_count = 0;
+
+               ret = ltdb_search_indexed(ctx, &match_count);
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* Not in the index, therefore OK! */
                        ret = LDB_SUCCESS;
@@ -553,6 +558,17 @@ int ltdb_search(struct ltdb_context *ctx)
                        printf("FULL SEARCH: %s\n", expression);
                        talloc_free(expression);
 #endif
+                       if (match_count != 0) {
+                               /* the indexing code gave an error
+                                * after having returned at least one
+                                * entry. This means the indexes are
+                                * corrupt or a database record is
+                                * corrupt. We cannot continue with a
+                                * full search or we may return
+                                * duplicate entries
+                                */
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
                        ret = ltdb_search_full(ctx);
                        if (ret != LDB_SUCCESS) {
                                ldb_set_errstring(ldb, "Indexed and full searches both failed!\n");
index 75034dcf4b333ad76360d64a482689b5f86c311a..c8c1dad5de6f4ecd4cc8cc7582b0ac09a900e763 100644 (file)
@@ -82,7 +82,7 @@ int ltdb_check_at_attributes_values(const struct ldb_val *value);
 
 struct ldb_parse_tree;
 
-int ltdb_search_indexed(struct ltdb_context *ctx);
+int ltdb_search_indexed(struct ltdb_context *ctx, uint32_t *);
 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg);
 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg);
 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add);