repack the ldb after re-indexing
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index fb29a9ddbfc5a04ff3597d9d464ffb8ffbcaa7c4..eedbda41706c656bd678dd9d6fe9be5d4e168a45 100644 (file)
@@ -10,7 +10,7 @@
    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
+   version 3 of the License, or (at your option) any later version.
 
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -18,8 +18,7 @@
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 
 /*
  *  Author: Andrew Tridgell
  */
 
-#include "includes.h"
-#include "ldb/include/includes.h"
+#include "ldb_includes.h"
 
-#include "ldb/ldb_tdb/ldb_tdb.h"
+#include "ldb_tdb.h"
 
 /*
   find an element in a list, using the given comparison function and
@@ -47,7 +45,7 @@ static int ldb_list_find(const void *needle,
                         const void *base, size_t nmemb, size_t size, 
                         comparison_fn_t comp_fn)
 {
-       const char *base_p = base;
+       const char *base_p = (const char *)base;
        size_t min_i, max_i, test_i;
 
        if (nmemb == 0) {
@@ -61,6 +59,12 @@ static int ldb_list_find(const void *needle,
                int r;
 
                test_i = (min_i + max_i) / 2;
+               /* the following cast looks strange, but is
+                correct. The key to understanding it is that base_p
+                is a pointer to an array of pointers, so we have to
+                dereference it after casting to void **. The strange
+                const in the middle gives us the right type of pointer
+                after the dereference  (tridge) */
                r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
                if (r == 0) {
                        /* scan back for first element */
@@ -97,52 +101,46 @@ struct dn_list {
   return the dn key to be used for an index
   caller frees
 */
-static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
-                       const char *attr, const struct ldb_val *value)
+static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
+                                    const char *attr, const struct ldb_val *value)
 {
        struct ldb_dn *ret;
-       char *dn;
        struct ldb_val v;
-       const struct ldb_attrib_handler *h;
+       const struct ldb_schema_attribute *a;
        char *attr_folded;
+       int r;
 
-       attr_folded = ldb_casefold(ldb, ldb, attr);
+       attr_folded = ldb_attr_casefold(ldb, attr);
        if (!attr_folded) {
                return NULL;
        }
 
-       h = ldb_attrib_handler(ldb, attr);
-       if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
+       if (r != LDB_SUCCESS) {
+               const char *errstr = ldb_errstring(ldb);
                /* canonicalisation can be refused. For example, 
                   a attribute that takes wildcards will refuse to canonicalise
                   if the value contains a wildcard */
+               ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
+                                      attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
                talloc_free(attr_folded);
                return NULL;
        }
        if (ldb_should_b64_encode(&v)) {
                char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
                if (!vstr) return NULL;
-               dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
+               ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
                talloc_free(vstr);
-               if (v.data != value->data) {
-                       talloc_free(v.data);
-               }
-               talloc_free(attr_folded);
-               if (dn == NULL) return NULL;
-               goto done;
+       } else {
+               ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
        }
 
-       dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
-                             LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
-
        if (v.data != value->data) {
                talloc_free(v.data);
        }
        talloc_free(attr_folded);
 
-done:
-       ret = ldb_dn_explode(ldb, dn);
-       talloc_free(dn);
        return ret;
 }
 
@@ -155,8 +153,14 @@ static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
        unsigned int i, j;
        for (i=0;i<msg->num_elements;i++) {
                if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
-                       const struct ldb_message_element *el = 
-                               &msg->elements[i];
+                       const struct ldb_message_element *el = &msg->elements[i];
+
+                       if (attr == NULL) {
+                               /* in this case we are just looking to see if key is present,
+                                  we are not spearching for a specific index */
+                               return 0;
+                       }
+
                        for (j=0;j<el->num_values;j++) {
                                if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
                                        if (v_idx) {
@@ -180,7 +184,7 @@ static int list_cmp(const char **s1, const char **s2)
   return a list of dn's that might match a simple indexed search or
  */
 static int ltdb_index_dn_simple(struct ldb_module *module, 
-                               struct ldb_parse_tree *tree,
+                               const struct ldb_parse_tree *tree,
                                const struct ldb_message *index_list,
                                struct dn_list *list)
 {
@@ -196,22 +200,22 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
        /* if the attribute isn't in the list of indexed attributes then
           this node needs a full search */
        if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* the attribute is indexed. Pull the list of DNs that match the 
           search criterion */
-       dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
-       if (!dn) return -1;
+       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
+       if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
        msg = talloc(list, struct ldb_message);
        if (msg == NULL) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ret = ltdb_search_dn1(module, dn, msg);
        talloc_free(dn);
-       if (ret == 0 || ret == -1) {
+       if (ret != LDB_SUCCESS) {
                return ret;
        }
 
@@ -226,14 +230,16 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
 
                list->dn = talloc_array(list, char *, el->num_values);
                if (!list->dn) {
-                       break;          
+                       talloc_free(msg);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
                for (j=0;j<el->num_values;j++) {
                        list->dn[list->count] = 
                                talloc_strdup(list->dn, (char *)el->values[j].data);
                        if (!list->dn[list->count]) {
-                               return -1;
+                               talloc_free(msg);
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                        list->count++;
                }
@@ -241,99 +247,37 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
 
        talloc_free(msg);
 
-       qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       if (list->count > 1) {
+               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       }
 
-       return 1;
+       return LDB_SUCCESS;
 }
 
 
 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
 
-/*
-  return a list of dn's that might match a simple indexed search on
-  the special objectclass attribute
- */
-static int ltdb_index_dn_objectclass(struct ldb_module *module, 
-                                    struct ldb_parse_tree *tree,
-                                    const struct ldb_message *index_list,
-                                    struct dn_list *list)
-{
-       struct ldb_context *ldb = module->ldb;
-       unsigned int i;
-       int ret;
-       const char *target = (const char *)tree->u.equality.value.data;
-       const char **subclasses;
-
-       list->count = 0;
-       list->dn = NULL;
-
-       ret = ltdb_index_dn_simple(module, tree, index_list, list);
-
-       subclasses = ldb_subclass_list(module->ldb, target);
-
-       if (subclasses == NULL) {
-               return ret;
-       }
-
-       for (i=0;subclasses[i];i++) {
-               struct ldb_parse_tree tree2;
-               struct dn_list *list2;
-               tree2.operation = LDB_OP_EQUALITY;
-               tree2.u.equality.attr = LTDB_OBJECTCLASS;
-               if (!tree2.u.equality.attr) {
-                       return -1;
-               }
-               tree2.u.equality.value.data = 
-                       (uint8_t *)talloc_strdup(list, subclasses[i]);
-               if (tree2.u.equality.value.data == NULL) {
-                       return -1;                      
-               }
-               tree2.u.equality.value.length = strlen(subclasses[i]);
-               list2 = talloc(list, struct dn_list);
-               if (list2 == NULL) {
-                       talloc_free(tree2.u.equality.value.data);
-                       return -1;
-               }
-               if (ltdb_index_dn_objectclass(module, &tree2, 
-                                             index_list, list2) == 1) {
-                       if (list->count == 0) {
-                               *list = *list2;
-                               ret = 1;
-                       } else {
-                               list_union(ldb, list, list2);
-                               talloc_free(list2);
-                       }
-               }
-               talloc_free(tree2.u.equality.value.data);
-       }
-
-       return ret;
-}
-
 /*
   return a list of dn's that might match a leaf indexed search
  */
 static int ltdb_index_dn_leaf(struct ldb_module *module, 
-                             struct ldb_parse_tree *tree,
+                             const struct ldb_parse_tree *tree,
                              const struct ldb_message *index_list,
                              struct dn_list *list)
 {
-       if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
-               return ltdb_index_dn_objectclass(module, tree, index_list, list);
-       }
        if (ldb_attr_dn(tree->u.equality.attr) == 0) {
                list->dn = talloc_array(list, char *, 1);
                if (list->dn == NULL) {
                        ldb_oom(module->ldb);
-                       return -1;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
+               list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
                if (list->dn[0] == NULL) {
                        ldb_oom(module->ldb);
-                       return -1;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
                list->count = 1;
-               return 1;
+               return LDB_SUCCESS;
        }
        return ltdb_index_dn_simple(module, tree, index_list, list);
 }
@@ -352,25 +296,25 @@ static int list_intersect(struct ldb_context *ldb,
 
        if (list->count == 0 || list2->count == 0) {
                /* 0 & X == 0 */
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
        list3 = talloc(ldb, struct dn_list);
        if (list3 == NULL) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        list3->dn = talloc_array(list3, char *, list->count);
        if (!list3->dn) {
                talloc_free(list3);
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        list3->count = 0;
 
        for (i=0;i<list->count;i++) {
                if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
                              sizeof(char *), (comparison_fn_t)strcmp) != -1) {
-                       list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
+                       list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
                        list3->count++;
                } else {
                        talloc_free(list->dn[i]);
@@ -378,11 +322,11 @@ static int list_intersect(struct ldb_context *ldb,
        }
 
        talloc_free(list->dn);
-       list->dn = talloc_steal(list, list3->dn);
+       list->dn = talloc_move(list, &list3->dn);
        list->count = list3->count;
        talloc_free(list3);
 
-       return 0;
+       return LDB_ERR_NO_SUCH_OBJECT;
 }
 
 
@@ -400,12 +344,12 @@ static int list_union(struct ldb_context *ldb,
 
        if (list->count == 0 && list2->count == 0) {
                /* 0 | 0 == 0 */
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
        d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
        if (!d) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        list->dn = d;
 
@@ -414,7 +358,7 @@ static int list_union(struct ldb_context *ldb,
                              sizeof(char *), (comparison_fn_t)strcmp) == -1) {
                        list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
                        if (!list->dn[list->count]) {
-                               return -1;
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                        list->count++;
                }               
@@ -424,11 +368,11 @@ static int list_union(struct ldb_context *ldb,
                qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
        }
 
-       return 0;
+       return LDB_ERR_NO_SUCH_OBJECT;
 }
 
 static int ltdb_index_dn(struct ldb_module *module, 
-                        struct ldb_parse_tree *tree,
+                        const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list);
 
@@ -437,7 +381,7 @@ static int ltdb_index_dn(struct ldb_module *module,
   OR two index results
  */
 static int ltdb_index_dn_or(struct ldb_module *module, 
-                           struct ldb_parse_tree *tree,
+                           const struct ldb_parse_tree *tree,
                            const struct ldb_message *index_list,
                            struct dn_list *list)
 {
@@ -445,7 +389,7 @@ static int ltdb_index_dn_or(struct ldb_module *module,
        unsigned int i;
        int ret;
        
-       ret = -1;
+       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
@@ -455,43 +399,43 @@ static int ltdb_index_dn_or(struct ldb_module *module,
 
                list2 = talloc(module, struct dn_list);
                if (list2 == NULL) {
-                       return -1;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
                v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
 
-               if (v == 0) {
+               if (v == LDB_ERR_NO_SUCH_OBJECT) {
                        /* 0 || X == X */
-                       if (ret == -1) {
-                               ret = 0;
+                       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                               ret = v;
                        }
                        talloc_free(list2);
                        continue;
                }
 
-               if (v == -1) {
+               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
                        /* 1 || X == 1 */
                        talloc_free(list->dn);
                        talloc_free(list2);
-                       return -1;
+                       return v;
                }
 
-               if (ret == -1) {
-                       ret = 1;
-                       list->dn = talloc_steal(list, list2->dn);
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
+                       list->dn = talloc_move(list, &list2->dn);
                        list->count = list2->count;
                } else {
                        if (list_union(ldb, list, list2) == -1) {
                                talloc_free(list2);
-                               return -1;
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
-                       ret = 1;
+                       ret = LDB_SUCCESS;
                }
                talloc_free(list2);
        }
 
        if (list->count == 0) {
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
        return ret;
@@ -502,7 +446,7 @@ static int ltdb_index_dn_or(struct ldb_module *module,
   NOT an index results
  */
 static int ltdb_index_dn_not(struct ldb_module *module, 
-                            struct ldb_parse_tree *tree,
+                            const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
 {
@@ -514,14 +458,14 @@ static int ltdb_index_dn_not(struct ldb_module *module,
           instead, we just give up, and rely on a full index scan
           (unless an outer & manages to reduce the list)
        */
-       return -1;
+       return LDB_ERR_OPERATIONS_ERROR;
 }
 
 /*
   AND two index results
  */
 static int ltdb_index_dn_and(struct ldb_module *module, 
-                            struct ldb_parse_tree *tree,
+                            const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
 {
@@ -529,7 +473,7 @@ static int ltdb_index_dn_and(struct ldb_module *module,
        unsigned int i;
        int ret;
        
-       ret = -1;
+       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
@@ -539,32 +483,32 @@ static int ltdb_index_dn_and(struct ldb_module *module,
 
                list2 = talloc(module, struct dn_list);
                if (list2 == NULL) {
-                       return -1;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
                v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
 
-               if (v == 0) {
+               if (v == LDB_ERR_NO_SUCH_OBJECT) {
                        /* 0 && X == 0 */
                        talloc_free(list->dn);
                        talloc_free(list2);
-                       return 0;
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
 
-               if (v == -1) {
+               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
                        talloc_free(list2);
                        continue;
                }
 
-               if (ret == -1) {
-                       ret = 1;
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
                        talloc_free(list->dn);
-                       list->dn = talloc_steal(list, list2->dn);
+                       list->dn = talloc_move(list, &list2->dn);
                        list->count = list2->count;
                } else {
                        if (list_intersect(ldb, list, list2) == -1) {
                                talloc_free(list2);
-                               return -1;
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                }
 
@@ -572,23 +516,120 @@ static int ltdb_index_dn_and(struct ldb_module *module,
 
                if (list->count == 0) {
                        talloc_free(list->dn);
-                       return 0;
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
        }
 
        return ret;
 }
 
+/*
+  AND index results and ONE level special index
+ */
+static int ltdb_index_dn_one(struct ldb_module *module, 
+                            struct ldb_dn *parent_dn,
+                            struct dn_list *list)
+{
+       struct ldb_context *ldb = module->ldb;
+       struct dn_list *list2;
+       struct ldb_message *msg;
+       struct ldb_dn *key;
+       struct ldb_val val;
+       unsigned int i, j;
+       int ret;
+       
+       list2 = talloc_zero(module, struct dn_list);
+       if (list2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* the attribute is indexed. Pull the list of DNs that match the 
+          search criterion */
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
+       val.length = strlen((char *)val.data);
+       key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
+       if (!key) {
+               talloc_free(list2);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       msg = talloc(list2, struct ldb_message);
+       if (msg == NULL) {
+               talloc_free(list2);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_search_dn1(module, key, msg);
+       talloc_free(key);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       for (i = 0; i < msg->num_elements; i++) {
+               struct ldb_message_element *el;
+
+               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
+                       continue;
+               }
+
+               el = &msg->elements[i];
+
+               list2->dn = talloc_array(list2, char *, el->num_values);
+               if (!list2->dn) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               for (j = 0; j < el->num_values; j++) {
+                       list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
+                       if (!list2->dn[list2->count]) {
+                               talloc_free(list2);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       list2->count++;
+               }
+       }
+
+       if (list2->count == 0) {
+               talloc_free(list2);
+               return LDB_ERR_NO_SUCH_OBJECT;
+       }
+
+       if (list2->count > 1) {
+               qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       }
+
+       if (list->count > 0) {
+               if (list_intersect(ldb, list, list2) == -1) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               if (list->count == 0) {
+                       talloc_free(list->dn);
+                       talloc_free(list2);
+                       return LDB_ERR_NO_SUCH_OBJECT;
+               }
+       } else {
+               list->dn = talloc_move(list, &list2->dn);
+               list->count = list2->count;
+       }
+
+       talloc_free(list2);
+
+       return LDB_SUCCESS;
+}
+
 /*
   return a list of dn's that might match a indexed search or
-  -1 if an error. return 0 for no matches, or 1 for matches
+  an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
  */
 static int ltdb_index_dn(struct ldb_module *module, 
-                        struct ldb_parse_tree *tree,
+                        const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list)
 {
-       int ret = -1;
+       int ret = LDB_ERR_OPERATIONS_ERROR;
 
        switch (tree->operation) {
        case LDB_OP_AND:
@@ -614,7 +655,7 @@ static int ltdb_index_dn(struct ldb_module *module,
        case LDB_OP_APPROX:
        case LDB_OP_EXTENDED:
                /* we can't index with fancy bitops yet */
-               ret = -1;
+               ret = LDB_ERR_OPERATIONS_ERROR;
                break;
        }
 
@@ -625,52 +666,59 @@ static int ltdb_index_dn(struct ldb_module *module,
   filter a candidate dn_list from an indexed search into a set of results
   extracting just the given attributes
 */
-static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
-                           const struct ldb_dn *base,
-                           enum ldb_scope scope,
-                           const struct dn_list *dn_list, 
-                           const char * const attrs[], struct ldb_result *res)
+static int ltdb_index_filter(const struct dn_list *dn_list, 
+                            struct ltdb_context *ac)
 {
+       struct ldb_message *msg;
        unsigned int i;
 
        for (i = 0; i < dn_list->count; i++) {
-               struct ldb_message *msg;
                struct ldb_dn *dn;
                int ret;
 
-               msg = talloc(module, struct ldb_message);
-               if (msg == NULL) {
-                       return LDB_ERR_OTHER;
+               msg = ldb_msg_new(ac);
+               if (!msg) {
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               dn = ldb_dn_explode(msg, dn_list->dn[i]);
+               dn = ldb_dn_new(msg, ac->module->ldb, dn_list->dn[i]);
                if (dn == NULL) {
                        talloc_free(msg);
-                       return LDB_ERR_OTHER;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = ltdb_search_dn1(module, dn, msg);
+               ret = ltdb_search_dn1(ac->module, dn, msg);
                talloc_free(dn);
-               if (ret == 0) {
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
                        talloc_free(msg);
                        continue;
                }
 
-               if (ret == -1) {
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                        /* an internal error */
                        talloc_free(msg);
-                       return LDB_ERR_OTHER;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = 0;
-               if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
-                       ret = ltdb_add_attr_results(module, res, msg, 
-                                                   attrs, &(res->count), &(res->msgs));
+               if (!ldb_match_msg(ac->module->ldb, msg,
+                                  ac->tree, ac->base, ac->scope)) {
+                       talloc_free(msg);
+                       continue;
                }
-               talloc_free(msg);
-               if (ret != 0) {
-                       return LDB_ERR_OTHER;
+
+               /* filter the attributes that the user wants */
+               ret = ltdb_filter_attrs(msg, ac->attrs);
+
+               if (ret == -1) {
+                       talloc_free(msg);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               ret = ldb_module_send_entry(ac->req, msg);
+               if (ret != LDB_SUCCESS) {
+                       ac->callback_failed = true;
+                       return ret;
                }
        }
 
@@ -682,58 +730,70 @@ static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tr
   returns -1 if an indexed search is not possible, in which
   case the caller should call ltdb_search_full() 
 */
-int ltdb_search_indexed(struct ldb_module *module, 
-                       const struct ldb_dn *base,
-                       enum ldb_scope scope,
-                       struct ldb_parse_tree *tree,
-                       const char * const attrs[], struct ldb_result **res)
+int ltdb_search_indexed(struct ltdb_context *ac)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
        struct dn_list *dn_list;
-       int ret;
+       int ret, idxattr, idxone;
 
-       if (ltdb->cache->indexlist->num_elements == 0 && 
-           scope != LDB_SCOPE_BASE) {
-               /* no index list? must do full search */
-               return -1;
+       idxattr = idxone = 0;
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
+       if (ret == 0 ) {
+               idxattr = 1;
        }
 
-       dn_list = talloc(module, struct dn_list);
-       if (dn_list == NULL) {
-               return -1;
+       /* We do one level indexing only if requested */
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
+       if (ret == 0 ) {
+               idxone = 1;
        }
 
-       *res = talloc(module, struct ldb_result);
-       if (*res == NULL) {
-               return LDB_ERR_OTHER;
+       if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
+           (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
+               /* no indexes? must do full search */
+               return LDB_ERR_OPERATIONS_ERROR;
        }
-       (*res)->count = 0;
-       (*res)->msgs = NULL;
-       (*res)->controls = NULL;
 
-       if (scope == LDB_SCOPE_BASE) {
+       ret = LDB_ERR_OPERATIONS_ERROR;
+
+       dn_list = talloc_zero(ac, struct dn_list);
+       if (dn_list == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ac->scope == LDB_SCOPE_BASE) {
                /* with BASE searches only one DN can match */
                dn_list->dn = talloc_array(dn_list, char *, 1);
                if (dn_list->dn == NULL) {
-                       ldb_oom(module->ldb);
-                       return -1;
+                       ldb_oom(ac->module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               dn_list->dn[0] = ldb_dn_linearize(dn_list, base);
+               dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
                if (dn_list->dn[0] == NULL) {
-                       ldb_oom(module->ldb);
-                       return -1;
+                       ldb_oom(ac->module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
                dn_list->count = 1;
-               ret = 1;
-       } else {
-               ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
+               ret = LDB_SUCCESS;
+       }
+
+       if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
+               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
+
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
+       }
+
+       if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
+               ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
        }
 
-       if (ret == 1) {
+       if (ret == LDB_SUCCESS) {
                /* we've got a candidate list - now filter by the full tree
                   and extract the needed attributes */
-               ret = ldb_index_filter(module, tree, base, scope, dn_list, 
-                                      attrs, *res);
+               ret = ltdb_index_filter(dn_list, ac);
        }
 
        talloc_free(dn_list);
@@ -746,34 +806,33 @@ int ltdb_search_indexed(struct ldb_module *module,
 */
 static int ltdb_index_add1_new(struct ldb_context *ldb, 
                               struct ldb_message *msg,
-                              struct ldb_message_element *el,
-                              char *dn)
+                              const char *dn)
 {
-       struct ldb_message_element *el2;
+       struct ldb_message_element *el;
 
        /* add another entry */
-       el2 = talloc_realloc(msg, msg->elements, 
+       el = talloc_realloc(msg, msg->elements, 
                               struct ldb_message_element, msg->num_elements+1);
-       if (!el2) {
-               return -1;
+       if (!el) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg->elements = el2;
+       msg->elements = el;
        msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
        if (!msg->elements[msg->num_elements].name) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[msg->num_elements].num_values = 0;
        msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
        if (!msg->elements[msg->num_elements].values) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[msg->num_elements].values[0].length = strlen(dn);
-       msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
+       msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
        msg->elements[msg->num_elements].num_values = 1;
        msg->num_elements++;
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 
@@ -783,9 +842,8 @@ static int ltdb_index_add1_new(struct ldb_context *ldb,
 */
 static int ltdb_index_add1_add(struct ldb_context *ldb, 
                               struct ldb_message *msg,
-                              struct ldb_message_element *el,
                               int idx,
-                              char *dn)
+                              const char *dn)
 {
        struct ldb_val *v2;
        unsigned int i;
@@ -793,7 +851,7 @@ static int ltdb_index_add1_add(struct ldb_context *ldb,
        /* for multi-valued attributes we can end up with repeats */
        for (i=0;i<msg->elements[idx].num_values;i++) {
                if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
-                       return 0;
+                       return LDB_SUCCESS;
                }
        }
 
@@ -801,21 +859,21 @@ static int ltdb_index_add1_add(struct ldb_context *ldb,
                              struct ldb_val, 
                              msg->elements[idx].num_values+1);
        if (!v2) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[idx].values = v2;
 
        msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
-       msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
+       msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
        msg->elements[idx].num_values++;
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 /*
   add an index entry for one message element
 */
-static int ltdb_index_add1(struct ldb_module *module, char *dn, 
+static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
                           struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb = module->ldb;
@@ -827,24 +885,23 @@ static int ltdb_index_add1(struct ldb_module *module, char *dn,
        msg = talloc(module, struct ldb_message);
        if (msg == NULL) {
                errno = ENOMEM;
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
        if (!dn_key) {
                talloc_free(msg);
-               errno = ENOMEM;
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        talloc_steal(msg, dn_key);
 
        ret = ltdb_search_dn1(module, dn_key, msg);
-       if (ret == -1) {
+       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(msg);
-               return -1;
+               return ret;
        }
 
-       if (ret == 0) {
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                msg->dn = dn_key;
                msg->num_elements = 0;
                msg->elements = NULL;
@@ -857,12 +914,12 @@ static int ltdb_index_add1(struct ldb_module *module, char *dn,
        }
 
        if (i == msg->num_elements) {
-               ret = ltdb_index_add1_new(ldb, msg, el, dn);
+               ret = ltdb_index_add1_new(ldb, msg, dn);
        } else {
-               ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
+               ret = ltdb_index_add1_add(ldb, msg, i, dn);
        }
 
-       if (ret == 0) {
+       if (ret == LDB_SUCCESS) {
                ret = ltdb_store(module, msg, TDB_REPLACE);
        }
 
@@ -871,20 +928,20 @@ static int ltdb_index_add1(struct ldb_module *module, char *dn,
        return ret;
 }
 
-static int ltdb_index_add0(struct ldb_module *module, char *dn,
+static int ltdb_index_add0(struct ldb_module *module, const char *dn,
                           struct ldb_message_element *elements, int num_el)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
        unsigned int i, j;
 
        if (dn[0] == '@') {
-               return 0;
+               return LDB_SUCCESS;
        }
 
        if (ltdb->cache->indexlist->num_elements == 0) {
                /* no indexed fields */
-               return 0;
+               return LDB_SUCCESS;
        }
 
        for (i = 0; i < num_el; i++) {
@@ -895,35 +952,30 @@ static int ltdb_index_add0(struct ldb_module *module, char *dn,
                }
                for (j = 0; j < elements[i].num_values; j++) {
                        ret = ltdb_index_add1(module, dn, &elements[i], j);
-                       if (ret == -1) {
-                               talloc_free(dn);
-                               return -1;
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
                        }
                }
        }
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 /*
   add the index entries for a new record
-  return -1 on failure
 */
 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
 {
-       struct ltdb_private *ltdb = module->private_data;
-       char *dn;
+       const char *dn;
        int ret;
 
-       dn = ldb_dn_linearize(ltdb, msg->dn);
+       dn = ldb_dn_get_linearized(msg->dn);
        if (dn == NULL) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
 
-       talloc_free(dn);
-
        return ret;
 }
 
@@ -941,41 +993,41 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
        unsigned int j;
 
        if (dn[0] == '@') {
-               return 0;
+               return LDB_SUCCESS;
        }
 
-       dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
        if (!dn_key) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        msg = talloc(dn_key, struct ldb_message);
        if (msg == NULL) {
                talloc_free(dn_key);
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ret = ltdb_search_dn1(module, dn_key, msg);
-       if (ret == -1) {
+       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(dn_key);
-               return -1;
+               return ret;
        }
 
-       if (ret == 0) {
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
                talloc_free(dn_key);
-               return 0;
+               return LDB_SUCCESS;
        }
 
        i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
        if (i == -1) {
                ldb_debug(ldb, LDB_DEBUG_ERROR,
                                "ERROR: dn %s not found in %s\n", dn,
-                               ldb_dn_linearize(dn_key, dn_key));
+                               ldb_dn_get_linearized(dn_key));
                /* it ain't there. hmmm */
                talloc_free(dn_key);
-               return 0;
+               return LDB_SUCCESS;
        }
 
        if (j != msg->elements[i].num_values - 1) {
@@ -1003,24 +1055,24 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
 */
 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
-       char *dn;
+       const char *dn;
        unsigned int i, j;
 
-       if (ldb_dn_is_special(msg->dn)) {
-               return 0;
+       /* find the list of indexed fields */   
+       if (ltdb->cache->indexlist->num_elements == 0) {
+               /* no indexed fields */
+               return LDB_SUCCESS;
        }
 
-       dn = ldb_dn_linearize(ltdb, msg->dn);
-       if (dn == NULL) {
-               return -1;
+       if (ldb_dn_is_special(msg->dn)) {
+               return LDB_SUCCESS;
        }
 
-       /* find the list of indexed fields */   
-       if (ltdb->cache->indexlist->num_elements == 0) {
-               /* no indexed fields */
-               return 0;
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        for (i = 0; i < msg->num_elements; i++) {
@@ -1031,15 +1083,64 @@ int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
                }
                for (j = 0; j < msg->elements[i].num_values; j++) {
                        ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
-                       if (ret == -1) {
-                               talloc_free(dn);
-                               return -1;
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
                        }
                }
        }
 
-       talloc_free(dn);
-       return 0;
+       return LDB_SUCCESS;
+}
+
+/* 
+  handle special index for one level searches
+*/
+int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
+{
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
+       struct ldb_message_element el;
+       struct ldb_val val;
+       struct ldb_dn *pdn;
+       const char *dn;
+       int ret;
+
+       /* We index for ONE Level only if requested */
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
+       if (ret != 0) {
+               return LDB_SUCCESS;
+       }
+
+       pdn = ldb_dn_get_parent(module, msg->dn);
+       if (pdn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
+       if (val.data == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val.length = strlen((char *)val.data);
+       el.name = LTDB_IDXONE;
+       el.values = &val;
+       el.num_values = 1;
+
+       if (add) {
+               ret = ltdb_index_add1(module, dn, &el, 0);
+       } else { /* delete */
+               ret = ltdb_index_del_value(module, dn, &el, 0);
+       }
+
+       talloc_free(pdn);
+
+       return ret;
 }
 
 
@@ -1060,9 +1161,9 @@ static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, vo
 */
 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
-       struct ldb_module *module = state;
+       struct ldb_module *module = (struct ldb_module *)state;
        struct ldb_message *msg;
-       char *dn = NULL;
+       const char *dn = NULL;
        int ret;
        TDB_DATA key2;
 
@@ -1088,7 +1189,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        if (key2.dptr == NULL) {
                /* probably a corrupt record ... darn */
                ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
-                                                       ldb_dn_linearize(msg, msg->dn));
+                                                       ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return 0;
        }
@@ -1101,14 +1202,23 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        if (msg->dn == NULL) {
                dn = (char *)key.dptr + 3;
        } else {
-               dn = ldb_dn_linearize(msg->dn, msg->dn);
+               dn = ldb_dn_get_linearized(msg->dn);
        }
 
-       ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       ret = ltdb_index_one(module, msg, 1);
+       if (ret == LDB_SUCCESS) {
+               ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       } else {
+               ldb_debug(module->ldb, LDB_DEBUG_ERROR,
+                       "Adding special ONE LEVEL index failed (%s)!\n",
+                       ldb_dn_get_linearized(msg->dn));
+       }
 
        talloc_free(msg);
 
-       return ret;
+       if (ret != LDB_SUCCESS) return -1;
+
+       return 0;
 }
 
 /*
@@ -1116,24 +1226,33 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 */
 int ltdb_reindex(struct ldb_module *module)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
 
        if (ltdb_cache_reload(module) != 0) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* first traverse the database deleting any @INDEX records */
        ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
        if (ret == -1) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if we don't have indexes we have nothing todo */
+       if (ltdb->cache->indexlist->num_elements == 0) {
+               return LDB_SUCCESS;
        }
 
        /* now traverse adding any indexes for normal LDB records */
        ret = tdb_traverse(ltdb->tdb, re_index, module);
        if (ret == -1) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       return 0;
+       if (tdb_repack(ltdb->tdb) != 0) {
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+
+       return LDB_SUCCESS;
 }