s4-ldb: use TYPESAFE_QSORT() in the rest of the ldb code
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index 709a2e1626df067e7b154c31e1d0e60ea0fdd554..bb4617db0907ee1c02a74acf591cad58922c4f07 100644 (file)
@@ -40,7 +40,6 @@ struct dn_list {
 
 struct ltdb_idxptr {
        struct tdb_context *itdb;
-       bool repack;
        int error;
 };
 
@@ -61,12 +60,13 @@ int ltdb_index_transaction_start(struct ldb_module *module)
  * differences in string termination */
 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
 {
-       int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
-       if (ret != 0) return ret;
-       if (v2->length > v1->length && v2->data[v1->length] != 0) {
+       if (v1->length > v2->length && v1->data[v2->length] != 0) {
+               return -1;
+       }
+       if (v1->length < v2->length && v2->data[v1->length] != 0) {
                return 1;
        }
-       return 0;
+       return strncmp((char *)v1->data, (char *)v2->data, v1->length);
 }
 
 
@@ -95,6 +95,31 @@ static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
        return ltdb_dn_list_find_val(list, &v);
 }
 
+static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
+{
+       struct dn_list *list;
+       if (rec.dsize != sizeof(void *)) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad data size for idxptr %u", (unsigned)rec.dsize);
+               return NULL;
+       }
+       memcpy(&list, rec.dptr, sizeof(void *));
+       list = talloc_get_type(list, struct dn_list);
+       if (list == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad type '%s' for idxptr", 
+                                      talloc_get_name(list));
+               return NULL;
+       }
+       if (check_parent && list->dn && talloc_parent(list->dn) != list) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad parent '%s' for idxptr", 
+                                      talloc_get_name(talloc_parent(list->dn)));
+               return NULL;
+       }
+       return list;
+}
+
 /*
   return the @IDX list in an index entry for a dn as a 
   struct dn_list
@@ -128,13 +153,11 @@ static int ltdb_dn_list_load(struct ldb_module *module,
        }
 
        /* we've found an in-memory index entry */
-       if (rec.dsize != sizeof(void *)) {
+       list2 = ltdb_index_idxptr(module, rec, true);
+       if (list2 == NULL) {
                free(rec.dptr);
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
-                                      "Bad internal index size %u", (unsigned)rec.dsize);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       list2 = *(struct dn_list **)rec.dptr;
        free(rec.dptr);
 
        *list = *list2;
@@ -176,6 +199,14 @@ static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
        struct ldb_message *msg;
        int ret;
 
+       if (list->count == 0) {
+               ret = ltdb_delete_noindex(module, dn);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       return LDB_SUCCESS;
+               }
+               return ret;
+       }
+
        msg = ldb_msg_new(module);
        if (!msg) {
                ldb_module_oom(module);
@@ -184,6 +215,7 @@ static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
 
        ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
        if (ret != LDB_SUCCESS) {
+               talloc_free(msg);
                ldb_module_oom(module);
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -234,13 +266,11 @@ static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
 
        rec = tdb_fetch(ltdb->idxptr->itdb, key);
        if (rec.dptr != NULL) {
-               if (rec.dsize != sizeof(void *)) {
+               list2 = ltdb_index_idxptr(module, rec, false);
+               if (list2 == NULL) {
                        free(rec.dptr);
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module), 
-                                              "Bad internal index size %u", (unsigned)rec.dsize);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
-               list2 = *(struct dn_list **)rec.dptr;
                free(rec.dptr);
                list2->dn = talloc_steal(list2, list->dn);
                list2->count = list->count;
@@ -258,8 +288,10 @@ static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
        rec.dsize = sizeof(void *);
 
        ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
-
-       return ret;     
+       if (ret == -1) {
+               return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
+       }
+       return LDB_SUCCESS;
 }
 
 /*
@@ -274,26 +306,28 @@ static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_
        struct ldb_val v;
        struct dn_list *list;
 
-       if (data.dsize != sizeof(void *)) {
-               ldb_asprintf_errstring(ldb, "Bad internal index size %u", (unsigned)data.dsize);
+       list = ltdb_index_idxptr(module, data, true);
+       if (list == NULL) {
                ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
                return -1;
        }
-       
-       list = *(struct dn_list **)data.dptr;
 
        v.data = key.dptr;
-       v.length = key.dsize;
+       v.length = strnlen((char *)key.dptr, key.dsize);
 
        dn = ldb_dn_from_ldb_val(module, ldb, &v);
        if (dn == NULL) {
+               ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
                ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
                return -1;
        }
 
        ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
        talloc_free(dn);
-       return ltdb->idxptr->error;
+       if (ltdb->idxptr->error != 0) {
+               return -1;
+       }
+       return 0;
 }
 
 /* cleanup the idxptr mode when transaction commits */
@@ -302,6 +336,9 @@ int ltdb_index_transaction_commit(struct ldb_module *module)
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
 
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       ldb_reset_err_string(ldb);
        if (ltdb->idxptr->itdb) {
                tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
                tdb_close(ltdb->idxptr->itdb);
@@ -310,8 +347,10 @@ int ltdb_index_transaction_commit(struct ldb_module *module)
        ret = ltdb->idxptr->error;
 
        if (ret != LDB_SUCCESS) {
-               struct ldb_context *ldb = ldb_module_get_ctx(module);
-               ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
+               if (!ldb_errstring(ldb)) {
+                       ldb_set_errstring(ldb, ldb_strerror(ret));
+               }
+               ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
        }
 
        talloc_free(ltdb->idxptr);
@@ -395,6 +434,8 @@ static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *at
        if (el == NULL) {
                return false;
        }
+
+       /* TODO: this is too expensive! At least use a binary search */
        for (i=0; i<el->num_values; i++) {
                if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
                        return true;
@@ -932,7 +973,7 @@ static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
                return;
        }
 
-       qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
+       TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
 
        new_count = 1;
        for (i=1; i<list->count; i++) {
@@ -1033,6 +1074,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
        int ret;
        const struct ldb_schema_attribute *a;
        struct dn_list *list;
+       unsigned alloc_len;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -1061,15 +1103,21 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
 
        if (list->count > 0 &&
            a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               talloc_free(list);
+               ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
+                                      el->name, dn);
                return LDB_ERR_ENTRY_ALREADY_EXISTS;            
        }
 
-       list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
+       /* overallocate the list a bit, to reduce the number of
+        * realloc trigered copies */    
+       alloc_len = ((list->count+1)+7) & ~7;
+       list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
        if (list->dn == NULL) {
                talloc_free(list);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       list->dn[list->count].data = discard_const_p(unsigned char, dn);
+       list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
        list->dn[list->count].length = strlen(dn);
        list->count++;
 
@@ -1122,6 +1170,10 @@ static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
                }
                ret = ltdb_index_add_el(module, dn, &elements[i]);
                if (ret != LDB_SUCCESS) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+                       ldb_asprintf_errstring(ldb,
+                                              __location__ ": Failed to re-index %s in %s - %s",
+                                              elements[i].name, dn, ldb_errstring(ldb));
                        return ret;
                }
        }
@@ -1172,7 +1224,7 @@ static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_messa
        if (add) {
                ret = ltdb_index_add1(module, dn, &el, 0);
        } else { /* delete */
-               ret = ltdb_index_del_value(module, dn, &el, 0);
+               ret = ltdb_index_del_value(module, msg->dn, &el, 0);
        }
 
        talloc_free(pdn);
@@ -1187,9 +1239,13 @@ static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_messa
 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
                           struct ldb_message_element *el)
 {
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        if (ldb_dn_is_special(dn)) {
                return LDB_SUCCESS;
        }
+       if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
+               return LDB_SUCCESS;
+       }
        return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
 }
 
@@ -1222,17 +1278,23 @@ int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
 /*
   delete an index entry for one message element
 */
-int ltdb_index_del_value(struct ldb_module *module, const char *dn,
+int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
                         struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb;
        struct ldb_dn *dn_key;
+       const char *dn_str;
        int ret, i;
        struct dn_list *list;
 
        ldb = ldb_module_get_ctx(module);
 
-       if (dn[0] == '@') {
+       dn_str = ldb_dn_get_linearized(dn);
+       if (dn_str == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (dn_str[0] == '@') {
                return LDB_SUCCESS;
        }
 
@@ -1260,7 +1322,7 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                return ret;
        }
 
-       i = ltdb_dn_list_find_str(list, dn);
+       i = ltdb_dn_list_find_str(list, dn_str);
        if (i == -1) {
                /* nothing to delete */
                talloc_free(dn_key);
@@ -1284,9 +1346,11 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
   delete the index entries for a element
   return -1 on failure
 */
-int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
+int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
+                          struct ldb_message_element *el)
 {
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       const char *dn_str;
        int ret;
        unsigned int i;
 
@@ -1295,7 +1359,12 @@ int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb
                return LDB_SUCCESS;
        }
 
-       if (dn[0] == '@') {
+       dn_str = ldb_dn_get_linearized(dn);
+       if (dn_str == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (dn_str[0] == '@') {
                return LDB_SUCCESS;
        }
 
@@ -1320,7 +1389,6 @@ int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
 {
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
-       const char *dn;
        unsigned int i;
 
        if (ldb_dn_is_special(msg->dn)) {
@@ -1337,13 +1405,8 @@ int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
                return LDB_SUCCESS;
        }
 
-       dn = ldb_dn_get_linearized(msg->dn);
-       if (dn == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        for (i = 0; i < msg->num_elements; i++) {
-               ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
+               ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -1358,20 +1421,50 @@ int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
 */
 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
-       const char *dn = "DN=" LTDB_INDEX ":";
-       if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
-               return tdb_delete(tdb, key);
+       struct ldb_module *module = state;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       const char *dnstr = "DN=" LTDB_INDEX ":";
+       struct dn_list list;
+       struct ldb_dn *dn;
+       struct ldb_val v;
+       int ret;
+
+       if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
+               return 0;
+       }
+       /* we need to put a empty list in the internal tdb for this
+        * index entry */
+       list.dn = NULL;
+       list.count = 0;
+       v.data = key.dptr;
+       v.length = strnlen((char *)key.dptr, key.dsize);
+
+       dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
+       ret = ltdb_dn_list_store(module, dn, &list);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Unable to store null index for %s\n",
+                                               ldb_dn_get_linearized(dn));
+               talloc_free(dn);
+               return -1;
        }
+       talloc_free(dn);
        return 0;
 }
 
+struct ltdb_reindex_context {
+       struct ldb_module *module;
+       int error;
+};
+
 /*
   traversal function that adds @INDEX records during a re index
 */
 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
        struct ldb_context *ldb;
-       struct ldb_module *module = (struct ldb_module *)state;
+       struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+       struct ldb_module *module = ctx->module;
        struct ldb_message *msg;
        const char *dn = NULL;
        int ret;
@@ -1384,7 +1477,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                return 0;
        }
 
-       msg = talloc(module, struct ldb_message);
+       msg = ldb_msg_new(module);
        if (msg == NULL) {
                return -1;
        }
@@ -1392,7 +1485,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
                ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
-                         ldb_dn_get_linearized(msg->dn));
+                                               ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
@@ -1403,7 +1496,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        if (key2.dptr == NULL) {
                /* probably a corrupt record ... darn */
                ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
-                                                       ldb_dn_get_linearized(msg->dn));
+                                               ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return 0;
        }
@@ -1422,17 +1515,21 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        ret = ltdb_index_onelevel(module, msg, 1);
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_ERROR,
-                       "Adding special ONE LEVEL index failed (%s)!",
-                       ldb_dn_get_linearized(msg->dn));
+                         "Adding special ONE LEVEL index failed (%s)!",
+                                               ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
 
        ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
 
-       talloc_free(msg);
+       if (ret != LDB_SUCCESS) {
+               ctx->error = ret;
+               talloc_free(msg);
+               return -1;
+       }
 
-       if (ret != LDB_SUCCESS) return -1;
+       talloc_free(msg);
 
        return 0;
 }
@@ -1444,13 +1541,16 @@ int ltdb_reindex(struct ldb_module *module)
 {
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
+       struct ltdb_reindex_context ctx;
 
        if (ltdb_cache_reload(module) != 0) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* first traverse the database deleting any @INDEX records */
-       ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
+       /* first traverse the database deleting any @INDEX records by
+        * putting NULL entries in the in-memory tdb
+        */
+       ret = tdb_traverse(ltdb->tdb, delete_index, module);
        if (ret == -1) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -1460,14 +1560,21 @@ int ltdb_reindex(struct ldb_module *module)
                return LDB_SUCCESS;
        }
 
+       ctx.module = module;
+       ctx.error = 0;
+
        /* now traverse adding any indexes for normal LDB records */
-       ret = tdb_traverse(ltdb->tdb, re_index, module);
+       ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
        if (ret == -1) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (ltdb->idxptr) {
-               ltdb->idxptr->repack = true;
+       if (ctx.error != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
+               return ctx.error;
        }
 
        return LDB_SUCCESS;