ldb: Avoid cost of talloc_free() for unmatched messages
authorAndrew Bartlett <abartlet@samba.org>
Thu, 25 Aug 2016 21:58:38 +0000 (09:58 +1200)
committerDouglas Bagnall <dbagnall@samba.org>
Wed, 31 Aug 2016 05:09:27 +0000 (07:09 +0200)
Instead, we pay the cost of allocating a copy of the whole message once
and we pay the cost of allocating a "struct ldb_val" that will not be used
for each element in that message.

This differes from the approach of ldb_unpack_data_only_attr_list()
in that we need not allocate each value for a message that we do not
return, so is more efficient for large multi-valued attributes and
un-indexed or poorly indexed searches

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
lib/ldb/ldb_tdb/ldb_index.c
lib/ldb/ldb_tdb/ldb_search.c
lib/ldb/ldb_tdb/ldb_tdb.h

index 392b4a71d6c20ac8f446f7c5e91ec51f4224e6c2..6c238562970a6cbdf3ecb01a8e5e7dac694795ca 100644 (file)
@@ -931,6 +931,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
 {
        struct ldb_context *ldb;
        struct ldb_message *msg;
+       struct ldb_message *filtered_msg;
        unsigned int i;
 
        ldb = ldb_module_get_ctx(ac->module);
@@ -951,7 +952,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = ltdb_search_dn1(ac->module, dn, msg, 0);
+               ret = ltdb_search_dn1(ac->module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
                talloc_free(dn);
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
@@ -977,14 +978,15 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                }
 
                /* filter the attributes that the user wants */
-               ret = ltdb_filter_attrs(msg, ac->attrs);
+               ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
+
+               talloc_free(msg);
 
                if (ret == -1) {
-                       talloc_free(msg);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = ldb_module_send_entry(ac->req, msg, NULL);
+               ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
                if (ret != LDB_SUCCESS) {
                        /* Regardless of success or failure, the msg
                         * is the callbacks responsiblity, and should
index 133e5d43f6239dd99468a8cac0b098ecf686ff3d..8dc93d68acea4489dbd6d7b41cf640e646cd4a28 100644 (file)
@@ -385,81 +385,144 @@ int ltdb_add_attr_results(struct ldb_module *module,
 
 /*
   filter the specified list of attributes from a message
-  removing not requested attrs.
+  removing not requested attrs from the new message constructed.
+
+  The reason this makes a new message is that the old one may not be
+  individually allocated, which is what our callers expect.
+
  */
-int ltdb_filter_attrs(struct ldb_message *msg, const char * const *attrs)
+int ltdb_filter_attrs(TALLOC_CTX *mem_ctx,
+                     const struct ldb_message *msg, const char * const *attrs,
+                     struct ldb_message **filtered_msg)
 {
        unsigned int i;
-       int keep_all = 0;
-       struct ldb_message_element *el2;
+       bool keep_all = false;
+       bool add_dn = false;
        uint32_t num_elements;
+       uint32_t elements_size;
+       struct ldb_message *msg2;
+
+       msg2 = ldb_msg_new(mem_ctx);
+       if (msg2 == NULL) {
+               goto failed;
+       }
+
+       msg2->dn = ldb_dn_copy(msg2, msg->dn);
+       if (msg2->dn == NULL) {
+               goto failed;
+       }
 
        if (attrs) {
                /* check for special attrs */
                for (i = 0; attrs[i]; i++) {
-                       if (strcmp(attrs[i], "*") == 0) {
-                               keep_all = 1;
+                       int cmp = strcmp(attrs[i], "*");
+                       if (cmp == 0) {
+                               keep_all = true;
                                break;
                        }
-
-                       if (ldb_attr_cmp(attrs[i], "distinguishedName") == 0) {
-                               if (msg_add_distinguished_name(msg) != 0) {
-                                       return -1;
-                               }
+                       cmp = ldb_attr_cmp(attrs[i], "distinguishedName");
+                       if (cmp == 0) {
+                               add_dn = true;
                        }
                }
        } else {
-               keep_all = 1;
+               keep_all = true;
        }
-       
+
        if (keep_all) {
-               if (msg_add_distinguished_name(msg) != 0) {
+               add_dn = true;
+               elements_size = msg->num_elements + 1;
+
+       /* Shortcuts for the simple cases */
+       } else if (add_dn && i == 1) {
+               if (msg_add_distinguished_name(msg2) != 0) {
                        return -1;
                }
+               *filtered_msg = msg2;
+               return 0;
+       } else if (i == 0) {
+               *filtered_msg = msg2;
                return 0;
-       }
 
-       el2 = talloc_array(msg, struct ldb_message_element, msg->num_elements);
-       if (el2 == NULL) {
-               return -1;
+       /* Otherwise we are copying at most as many element as we have attributes */
+       } else {
+               elements_size = i;
        }
+
+       msg2->elements = talloc_array(msg2, struct ldb_message_element,
+                                     elements_size);
+       if (msg2->elements == NULL) goto failed;
+
        num_elements = 0;
 
        for (i = 0; i < msg->num_elements; i++) {
+               struct ldb_message_element *el = &msg->elements[i];
+               struct ldb_message_element *el2 = &msg2->elements[num_elements];
                unsigned int j;
-               int found = 0;
-               
-               for (j = 0; attrs[j]; j++) {
-                       if (ldb_attr_cmp(msg->elements[i].name, attrs[j]) == 0) {
-                               found = 1;
-                               break;
+
+               if (keep_all == false) {
+                       bool found = false;
+                       for (j = 0; attrs[j]; j++) {
+                               int cmp = ldb_attr_cmp(el->name, attrs[j]);
+                               if (cmp == 0) {
+                                       found = true;
+                                       break;
+                               }
+                       }
+                       if (found == false) {
+                               continue;
+                       }
+               }
+               *el2 = *el;
+               el2->name = talloc_strdup(msg2->elements, el->name);
+               if (el2->name == NULL) {
+                       goto failed;
+               }
+               el2->values = talloc_array(msg2->elements, struct ldb_val, el->num_values);
+               if (el2->values == NULL) {
+                       goto failed;
+               }
+               for (j=0;j<el->num_values;j++) {
+                       el2->values[j] = ldb_val_dup(el2->values, &el->values[j]);
+                       if (el2->values[j].data == NULL && el->values[j].length != 0) {
+                               goto failed;
                        }
                }
+               num_elements++;
 
-               if (found) {
-                       el2[num_elements] = msg->elements[i];
-                       talloc_steal(el2, el2[num_elements].name);
-                       talloc_steal(el2, el2[num_elements].values);
-                       num_elements++;
+               /* Pidginhole principle: we can't have more elements
+                * than the number of attributes if they are unique in
+                * the DB */
+               if (num_elements > elements_size) {
+                       goto failed;
                }
        }
 
-       talloc_free(msg->elements);
+       msg2->num_elements = num_elements;
 
-       if (num_elements > 0) {
-               msg->elements = talloc_realloc(msg, el2, struct ldb_message_element,
-                                              num_elements);
-       } else {
-               msg->elements = talloc_array(msg, struct ldb_message_element, 0);
-               talloc_free(el2);
+       if (add_dn) {
+               if (msg_add_distinguished_name(msg2) != 0) {
+                       return -1;
+               }
        }
-       if (msg->elements == NULL) {
-               return -1;
+
+       if (msg2->num_elements > 0) {
+               msg2->elements = talloc_realloc(msg2, msg2->elements,
+                                               struct ldb_message_element,
+                                               msg2->num_elements);
+               if (msg2->elements == NULL) {
+                       return -1;
+               }
+       } else {
+               talloc_free(msg2->elements);
+               msg2->elements = NULL;
        }
 
-       msg->num_elements = num_elements;
+       *filtered_msg = msg2;
 
        return 0;
+failed:
+       return -1;
 }
 
 /*
@@ -469,13 +532,14 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
 {
        struct ldb_context *ldb;
        struct ltdb_context *ac;
-       struct ldb_message *msg;
+       struct ldb_message *msg, *filtered_msg;
        const struct ldb_val val = {
                .data = data.dptr,
                .length = data.dsize,
        };
        int ret;
        bool matched;
+       unsigned int nb_elements_in_db;
 
        ac = talloc_get_type(state, struct ltdb_context);
        ldb = ldb_module_get_ctx(ac->module);
@@ -492,7 +556,11 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
        }
 
        /* unpack the record */
-       ret = ldb_unpack_data(ldb, &val, msg);
+       ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
+                                                  msg,
+                                                  NULL, 0,
+                                                  LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
+                                                  &nb_elements_in_db);
        if (ret == -1) {
                talloc_free(msg);
                ac->error = LDB_ERR_OPERATIONS_ERROR;
@@ -523,15 +591,15 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
        }
 
        /* filter the attributes that the user wants */
-       ret = ltdb_filter_attrs(msg, ac->attrs);
+       ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
+       talloc_free(msg);
 
        if (ret == -1) {
-               talloc_free(msg);
                ac->error = LDB_ERR_OPERATIONS_ERROR;
                return -1;
        }
 
-       ret = ldb_module_send_entry(ac->req, msg, NULL);
+       ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
        if (ret != LDB_SUCCESS) {
                ac->request_terminated = true;
                /* the callback failed, abort the operation */
index 7390a047b20f61e1feea36222b1ac7bc8519f6ec..b5b78a99903b58bdaddee87443a68b933e3ef61f 100644 (file)
@@ -109,7 +109,9 @@ int ltdb_add_attr_results(struct ldb_module *module,
                          const char * const attrs[], 
                          unsigned int *count, 
                          struct ldb_message ***res);
-int ltdb_filter_attrs(struct ldb_message *msg, const char * const *attrs);
+int ltdb_filter_attrs(TALLOC_CTX *mem_ctx,
+                     const struct ldb_message *msg, const char * const *attrs,
+                     struct ldb_message **filtered_msg);
 int ltdb_search(struct ltdb_context *ctx);
 
 /* The following definitions come from lib/ldb/ldb_tdb/ldb_tdb.c  */