ldb: relatively efficient functions for finding duplicate values
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index 3c181509c37c7c4f0a4bc6054fa77805136eeb9d..1822ea01a08380e8f43899c4aee335fdf265bdfb 100644 (file)
@@ -50,6 +50,7 @@
  */
 
 #include "ldb_tdb.h"
+#include "ldb_private.h"
 #include <tdb.h>
 
 /*
@@ -228,7 +229,13 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
        if (ldb_dn_is_special(dn) &&
            (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
-            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
+            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
+       {
+               if (ltdb->warn_reindex) {
+                       ldb_debug(ldb_module_get_ctx(module),
+                               LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
+                               tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+               }
                ret = ltdb_reindex(module);
        }
 
@@ -257,6 +264,7 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TDB_DATA tdb_key, tdb_data;
+       struct ldb_val ldb_data;
        int ret = LDB_SUCCESS;
 
        tdb_key = ltdb_key(module, msg->dn);
@@ -264,12 +272,16 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
                return LDB_ERR_OTHER;
        }
 
-       ret = ltdb_pack_data(module, msg, &tdb_data);
+       ret = ldb_pack_data(ldb_module_get_ctx(module),
+                           msg, &ldb_data);
        if (ret == -1) {
                talloc_free(tdb_key.dptr);
                return LDB_ERR_OTHER;
        }
 
+       tdb_data.dptr = ldb_data.data;
+       tdb_data.dsize = ldb_data.length;
+
        ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
        if (ret != 0) {
                ret = ltdb_err_map(tdb_error(ltdb->tdb));
@@ -278,7 +290,7 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 
 done:
        talloc_free(tdb_key.dptr);
-       talloc_free(tdb_data.dptr);
+       talloc_free(ldb_data.data);
 
        return ret;
 }
@@ -318,7 +330,7 @@ static int ltdb_add_internal(struct ldb_module *module,
 {
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        int ret = LDB_SUCCESS;
-       unsigned int i, j;
+       unsigned int i;
 
        for (i=0;i<msg->num_elements;i++) {
                struct ldb_message_element *el = &msg->elements[i];
@@ -343,12 +355,23 @@ static int ltdb_add_internal(struct ldb_module *module,
                        continue;
                }
 
-               /* TODO: This is O(n^2) - replace with more efficient check */
-               for (j=0; j<el->num_values; j++) {
-                       if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                               ldb_asprintf_errstring(ldb,
-                                                      "attribute '%s': value #%u on '%s' provided more than once",
-                                                      el->name, j, ldb_dn_get_linearized(msg->dn));
+               if (check_single_value) {
+                       struct ldb_val *duplicate = NULL;
+
+                       ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
+                                                        el, &duplicate, 0);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
+                       }
+                       if (duplicate != NULL) {
+                               ldb_asprintf_errstring(
+                                       ldb,
+                                       "attribute '%s': value '%.*s' on '%s' "
+                                       "provided more than once in ADD object",
+                                       el->name,
+                                       (int)duplicate->length,
+                                       duplicate->data,
+                                       ldb_dn_get_linearized(msg->dn));
                                return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                        }
                }
@@ -437,7 +460,7 @@ static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
 
        /* in case any attribute of the message was indexed, we need
           to fetch the old record */
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
        if (ret != LDB_SUCCESS) {
                /* not finding the old record is an error */
                goto done;
@@ -510,8 +533,7 @@ static int find_element(const struct ldb_message *msg, const char *name)
 
   returns 0 on success, -1 on failure (and sets errno)
 */
-static int ltdb_msg_add_element(struct ldb_context *ldb,
-                               struct ldb_message *msg,
+static int ltdb_msg_add_element(struct ldb_message *msg,
                                struct ldb_message_element *el)
 {
        struct ldb_message_element *e2;
@@ -555,7 +577,6 @@ static int ltdb_msg_add_element(struct ldb_context *ldb,
   delete all elements having a specified attribute name
 */
 static int msg_delete_attribute(struct ldb_module *module,
-                               struct ldb_context *ldb,
                                struct ldb_message *msg, const char *name)
 {
        unsigned int i;
@@ -622,7 +643,7 @@ static int msg_delete_element(struct ldb_module *module,
                }
                if (matched) {
                        if (el->num_values == 1) {
-                               return msg_delete_attribute(module, ldb, msg, name);
+                               return msg_delete_attribute(module, msg, name);
                        }
 
                        ret = ltdb_index_del_value(module, msg->dn, el, i);
@@ -665,8 +686,9 @@ int ltdb_modify_internal(struct ldb_module *module,
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        TDB_DATA tdb_key, tdb_data;
+       struct ldb_val ldb_data;
        struct ldb_message *msg2;
-       unsigned int i, j, k;
+       unsigned int i, j;
        int ret = LDB_SUCCESS, idx;
        struct ldb_control *control_permissive = NULL;
 
@@ -693,7 +715,10 @@ int ltdb_modify_internal(struct ldb_module *module,
                goto done;
        }
 
-       ret = ltdb_unpack_data(module, &tdb_data, msg2);
+       ldb_data.data = tdb_data.dptr;
+       ldb_data.length = tdb_data.dsize;
+
+       ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
        free(tdb_data.dptr);
        if (ret == -1) {
                ret = LDB_ERR_OTHER;
@@ -709,6 +734,10 @@ int ltdb_modify_internal(struct ldb_module *module,
                struct ldb_val *vals;
                const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
                const char *dn;
+               uint32_t options = 0;
+               if (control_permissive != NULL) {
+                       options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
+               }
 
                switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
                case LDB_FLAG_MOD_ADD:
@@ -753,7 +782,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                        /* Checks if element already exists */
                        idx = find_element(msg2, el->name);
                        if (idx == -1) {
-                               if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
+                               if (ltdb_msg_add_element(msg2, el) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
                                }
@@ -777,30 +806,43 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                                /* Check that values don't exist yet on multi-
                                   valued attributes or aren't provided twice */
-                               /* TODO: This is O(n^2) - replace with more efficient check */
-                               for (j = 0; j < el->num_values; j++) {
-                                       if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
-                                               if (control_permissive) {
-                                                       /* remove this one as if it was never added */
-                                                       el->num_values--;
-                                                       for (k = j; k < el->num_values; k++) {
-                                                               el->values[k] = el->values[k + 1];
-                                                       }
-                                                       j--; /* rewind */
-
-                                                       continue;
-                                               }
-
+                               if (!(el->flags &
+                                     LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                                       struct ldb_val *duplicate = NULL;
+                                       ret = ldb_msg_find_common_values(ldb,
+                                                                        msg2,
+                                                                        el,
+                                                                        el2,
+                                                                        options);
+
+                                       if (ret ==
+                                           LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
                                                ldb_asprintf_errstring(ldb,
-                                                                      "attribute '%s': value #%u on '%s' already exists",
-                                                                      el->name, j, ldb_dn_get_linearized(msg2->dn));
-                                               ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                                                       "attribute '%s': value "
+                                                       "#%u on '%s' already "
+                                                       "exists", el->name, j,
+                                                       ldb_dn_get_linearized(msg2->dn));
+                                               goto done;
+                                       } else if (ret != LDB_SUCCESS) {
                                                goto done;
                                        }
-                                       if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                               ldb_asprintf_errstring(ldb,
-                                                                      "attribute '%s': value #%u on '%s' provided more than once",
-                                                                      el->name, j, ldb_dn_get_linearized(msg2->dn));
+
+                                       ret = ldb_msg_find_duplicate_val(
+                                               ldb, msg2, el, &duplicate, 0);
+                                       if (ret != LDB_SUCCESS) {
+                                               goto done;
+                                       }
+                                       if (duplicate != NULL) {
+                                               ldb_asprintf_errstring(
+                                                       ldb,
+                                                       "attribute '%s': value "
+                                                       "'%.*s' on '%s' "
+                                                       "provided more than "
+                                                       "once in ADD",
+                                                       el->name,
+                                                       (int)duplicate->length,
+                                                       duplicate->data,
+                                                       ldb_dn_get_linearized(msg->dn));
                                                ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                                goto done;
                                        }
@@ -842,12 +884,30 @@ int ltdb_modify_internal(struct ldb_module *module,
                                goto done;
                        }
 
-                       /* TODO: This is O(n^2) - replace with more efficient check */
-                       for (j=0; j<el->num_values; j++) {
-                               if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
-                                       ldb_asprintf_errstring(ldb,
-                                                              "attribute '%s': value #%u on '%s' provided more than once",
-                                                              el->name, j, ldb_dn_get_linearized(msg2->dn));
+                       /*
+                        * We don't need to check this if we have been
+                        * pre-screened by the repl_meta_data module
+                        * in Samba, or someone else who can claim to
+                        * know what they are doing. 
+                        */
+                       if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
+                               struct ldb_val *duplicate = NULL;
+
+                               ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
+                                                                &duplicate, 0);
+                               if (ret != LDB_SUCCESS) {
+                                       goto done;
+                               }
+                               if (duplicate != NULL) {
+                                       ldb_asprintf_errstring(
+                                               ldb,
+                                               "attribute '%s': value '%.*s' "
+                                               "on '%s' provided more than "
+                                               "once in REPLACE",
+                                               el->name,
+                                               (int)duplicate->length,
+                                               duplicate->data,
+                                               ldb_dn_get_linearized(msg2->dn));
                                        ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                        goto done;
                                }
@@ -871,7 +931,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                                }
 
                                /* Delete the attribute if it exists in the DB */
-                               if (msg_delete_attribute(module, ldb, msg2,
+                               if (msg_delete_attribute(module, msg2,
                                                         el->name) != 0) {
                                        ret = LDB_ERR_OTHER;
                                        goto done;
@@ -879,7 +939,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                        }
 
                        /* Recreate it with the new values */
-                       if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
+                       if (ltdb_msg_add_element(msg2, el) != 0) {
                                ret = LDB_ERR_OTHER;
                                goto done;
                        }
@@ -900,7 +960,7 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                        if (msg->elements[i].num_values == 0) {
                                /* Delete the whole attribute */
-                               ret = msg_delete_attribute(module, ldb, msg2,
+                               ret = msg_delete_attribute(module, msg2,
                                                           msg->elements[i].name);
                                if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                    control_permissive) {
@@ -923,7 +983,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                                        if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
                                            control_permissive) {
                                                ret = LDB_SUCCESS;
-                                       } else {
+                                       } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                                ldb_asprintf_errstring(ldb,
                                                                       "attribute '%s': no matching attribute value while deleting attribute on '%s'",
                                                                       msg->elements[i].name, dn);
@@ -1009,7 +1069,8 @@ static int ltdb_rename(struct ltdb_context *ctx)
        }
 
        /* we need to fetch the old record to re-add under the new name */
-       ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
+       ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
+                             LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
        if (ret != LDB_SUCCESS) {
                /* not finding the old record is an error */
                return ret;
@@ -1038,7 +1099,7 @@ static int ltdb_rename(struct ltdb_context *ctx)
                        talloc_free(tdb_key.dptr);
                        ldb_asprintf_errstring(ldb_module_get_ctx(module),
                                               "Entry %s already exists",
-                                              ldb_dn_get_linearized(msg->dn));
+                                              ldb_dn_get_linearized(req->op.rename.newdn));
                        /* finding the new record already in the DB is an error */
                        talloc_free(msg);
                        return LDB_ERR_ENTRY_ALREADY_EXISTS;
@@ -1092,6 +1153,7 @@ static int ltdb_start_trans(struct ldb_module *module)
 
 static int ltdb_prepare_commit(struct ldb_module *module)
 {
+       int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
@@ -1099,15 +1161,21 @@ static int ltdb_prepare_commit(struct ldb_module *module)
                return LDB_SUCCESS;
        }
 
-       if (ltdb_index_transaction_commit(module) != 0) {
+       ret = ltdb_index_transaction_commit(module);
+       if (ret != LDB_SUCCESS) {
                tdb_transaction_cancel(ltdb->tdb);
                ltdb->in_transaction--;
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               return ret;
        }
 
        if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
+               ret = ltdb_err_map(tdb_error(ltdb->tdb));
                ltdb->in_transaction--;
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Failure during tdb_transaction_prepare_commit(): %s -> %s",
+                                      tdb_errorstr(ltdb->tdb),
+                                      ldb_strerror(ret));
+               return ret;
        }
 
        ltdb->prepared_commit = true;
@@ -1117,11 +1185,12 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 
 static int ltdb_end_trans(struct ldb_module *module)
 {
+       int ret;
        void *data = ldb_module_get_private(module);
        struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
        if (!ltdb->prepared_commit) {
-               int ret = ltdb_prepare_commit(module);
+               ret = ltdb_prepare_commit(module);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -1131,7 +1200,12 @@ static int ltdb_end_trans(struct ldb_module *module)
        ltdb->prepared_commit = false;
 
        if (tdb_transaction_commit(ltdb->tdb) != 0) {
-               return ltdb_err_map(tdb_error(ltdb->tdb));
+               ret = ltdb_err_map(tdb_error(ltdb->tdb));
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "Failure during tdb_transaction_commit(): %s -> %s",
+                                      tdb_errorstr(ltdb->tdb),
+                                      ldb_strerror(ret));
+               return ret;
        }
 
        return LDB_SUCCESS;
@@ -1208,7 +1282,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
                goto done;
        }
 
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_search_dn1(module, dn, msg, 0);
        if (ret != LDB_SUCCESS) {
                goto done;
        }
@@ -1434,7 +1508,7 @@ static int ltdb_handle_request(struct ldb_module *module,
                return LDB_ERR_TIME_LIMIT_EXCEEDED;
        }
 
-       ev = ldb_get_event_context(ldb);
+       ev = ldb_handle_get_event_context(req->handle);
 
        ac = talloc_zero(ldb, struct ltdb_context);
        if (ac == NULL) {
@@ -1453,11 +1527,15 @@ static int ltdb_handle_request(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       tv.tv_sec = req->starttime + req->timeout;
-       ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
-       if (NULL == ac->timeout_event) {
-               talloc_free(ac);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (req->timeout > 0) {
+               tv.tv_sec = req->starttime + req->timeout;
+               tv.tv_usec = 0;
+               ac->timeout_event = tevent_add_timer(ev, ac, tv,
+                                                    ltdb_timeout, ac);
+               if (NULL == ac->timeout_event) {
+                       talloc_free(ac);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        }
 
        /* set a spy so that we do not try to use the request context
@@ -1510,6 +1588,13 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
        int tdb_flags, open_flags;
        struct ltdb_private *ltdb;
 
+       /*
+        * We hold locks, so we must use a private event context
+        * on each returned handle
+        */
+
+       ldb_set_require_private_event_context(ldb);
+
        /* parse the url */
        if (strchr(url, ':')) {
                if (strncmp(url, "tdb://", 6) != 0) {
@@ -1536,6 +1621,8 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 
        if (flags & LDB_FLG_RDONLY) {
                open_flags = O_RDONLY;
+       } else if (flags & LDB_FLG_DONT_CREATE_DB) {
+               open_flags = O_RDWR;
        } else {
                open_flags = O_CREAT | O_RDWR;
        }
@@ -1552,10 +1639,13 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                                   ldb_get_create_perms(ldb), ldb);
        if (!ltdb->tdb) {
                ldb_asprintf_errstring(ldb,
-                                      "Unable to open tdb '%s'", path);
+                                      "Unable to open tdb '%s': %s", path, strerror(errno));
                ldb_debug(ldb, LDB_DEBUG_ERROR,
-                         "Unable to open tdb '%s'", path);
+                         "Unable to open tdb '%s': %s", path, strerror(errno));
                talloc_free(ltdb);
+               if (errno == EACCES || errno == EPERM) {
+                       return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
+               }
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -1563,6 +1653,10 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                ltdb->warn_unindexed = true;
        }
 
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
        ltdb->sequence_number = 0;
 
        module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);