fixed a speellling erra
[gd/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index 8d31a3a81f9b7ad3b91289e45b2f5e14b5194c82..269305a4680c90c94ad4dc625486ad3b5bcaffcc 100644 (file)
@@ -10,7 +10,7 @@
    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
+   version 3 of the License, or (at your option) any later version.
 
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -18,8 +18,7 @@
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 
 /*
  *  Author: Andrew Tridgell
  */
 
-#include "includes.h"
-#include "ldb/include/ldb.h"
-#include "ldb/include/ldb_private.h"
-#include "ldb/ldb_tdb/ldb_tdb.h"
-#include "ldb/include/ldb_parse.h"
+#include "ldb_includes.h"
 
-struct dn_list {
-       unsigned int count;
-       char **dn;
-};
+#include "ldb_tdb.h"
 
 /*
-  free a struct dn_list
+  find an element in a list, using the given comparison function and
+  assuming that the list is already sorted using comp_fn
+
+  return -1 if not found, or the index of the first occurance of needle if found
 */
-static void dn_list_free(struct ldb_context *ldb, struct dn_list *list)
+static int ldb_list_find(const void *needle, 
+                        const void *base, size_t nmemb, size_t size, 
+                        comparison_fn_t comp_fn)
 {
-       unsigned int i;
-       for (i=0;i<list->count;i++) {
-               ldb_free(ldb, list->dn[i]);
+       const char *base_p = (const char *)base;
+       size_t min_i, max_i, test_i;
+
+       if (nmemb == 0) {
+               return -1;
+       }
+
+       min_i = 0;
+       max_i = nmemb-1;
+
+       while (min_i < max_i) {
+               int r;
+
+               test_i = (min_i + max_i) / 2;
+               /* the following cast looks strange, but is
+                correct. The key to understanding it is that base_p
+                is a pointer to an array of pointers, so we have to
+                dereference it after casting to void **. The strange
+                const in the middle gives us the right type of pointer
+                after the dereference  (tridge) */
+               r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
+               if (r == 0) {
+                       /* scan back for first element */
+                       while (test_i > 0 &&
+                              comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
+                               test_i--;
+                       }
+                       return test_i;
+               }
+               if (r < 0) {
+                       if (test_i == 0) {
+                               return -1;
+                       }
+                       max_i = test_i - 1;
+               }
+               if (r > 0) {
+                       min_i = test_i + 1;
+               }
+       }
+
+       if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
+               return min_i;
        }
-       ldb_free(ldb, list->dn);
+
+       return -1;
 }
 
+struct dn_list {
+       unsigned int count;
+       char **dn;
+};
+
 /*
   return the dn key to be used for an index
   caller frees
 */
-static char *ldb_dn_key(struct ldb_context *ldb,
-                       const char *attr, const struct ldb_val *value)
+static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
+                                    const char *attr, const struct ldb_val *value)
 {
-       char *ret = NULL;
+       struct ldb_dn *ret;
+       struct ldb_val v;
+       const struct ldb_schema_attribute *a;
+       char *attr_folded;
+       int r;
+
+       attr_folded = ldb_attr_casefold(ldb, attr);
+       if (!attr_folded) {
+               return NULL;
+       }
 
-       if (ldb_should_b64_encode(value)) {
-               char *vstr = ldb_base64_encode(ldb, value->data, value->length);
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
+       if (r != LDB_SUCCESS) {
+               const char *errstr = ldb_errstring(ldb);
+               /* canonicalisation can be refused. For example, 
+                  a attribute that takes wildcards will refuse to canonicalise
+                  if the value contains a wildcard */
+               ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
+                                      attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
+               talloc_free(attr_folded);
+               return NULL;
+       }
+       if (ldb_should_b64_encode(&v)) {
+               char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
                if (!vstr) return NULL;
-               ldb_asprintf(ldb, &ret, "%s:%s::%s", LTDB_INDEX, attr, vstr);
-               ldb_free(ldb, vstr);
-               return ret;
+               ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
+               talloc_free(vstr);
+       } else {
+               ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
+       }
+
+       if (v.data != value->data) {
+               talloc_free(v.data);
        }
+       talloc_free(attr_folded);
 
-       ldb_asprintf(ldb, &ret, "%s:%s:%.*s", LTDB_INDEX, attr, value->length, (char *)value->data);
        return ret;
 }
 
@@ -85,8 +153,14 @@ static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
        unsigned int i, j;
        for (i=0;i<msg->num_elements;i++) {
                if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
-                       const struct ldb_message_element *el = 
-                               &msg->elements[i];
+                       const struct ldb_message_element *el = &msg->elements[i];
+
+                       if (attr == NULL) {
+                               /* in this case we are just looking to see if key is present,
+                                  we are not spearching for a specific index */
+                               return 0;
+                       }
+
                        for (j=0;j<el->num_values;j++) {
                                if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
                                        if (v_idx) {
@@ -110,140 +184,100 @@ static int list_cmp(const char **s1, const char **s2)
   return a list of dn's that might match a simple indexed search or
  */
 static int ltdb_index_dn_simple(struct ldb_module *module, 
-                               struct ldb_parse_tree *tree,
+                               const struct ldb_parse_tree *tree,
                                const struct ldb_message *index_list,
                                struct dn_list *list)
 {
        struct ldb_context *ldb = module->ldb;
-       char *dn = NULL;
+       struct ldb_dn *dn;
        int ret;
        unsigned int i, j;
-       struct ldb_message msg;
+       struct ldb_message *msg;
 
        list->count = 0;
        list->dn = NULL;
 
-       /*
-         if the value is a wildcard then we can't do a match via indexing
-       */
-       if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
-               return -1;
-       }
-
        /* if the attribute isn't in the list of indexed attributes then
           this node needs a full search */
-       if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
-               return -1;
+       if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* the attribute is indexed. Pull the list of DNs that match the 
           search criterion */
-       dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
-       if (!dn) return -1;
+       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
+       if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       ret = ltdb_search_dn1(module, dn, &msg);
-       ldb_free(ldb, dn);
-       if (ret == 0 || ret == -1) {
+       msg = talloc(list, struct ldb_message);
+       if (msg == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_search_dn1(module, dn, msg);
+       talloc_free(dn);
+       if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       for (i=0;i<msg.num_elements;i++) {
+       for (i=0;i<msg->num_elements;i++) {
                struct ldb_message_element *el;
 
-               if (strcmp(msg.elements[i].name, LTDB_IDX) != 0) {
+               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
                        continue;
                }
 
-               el = &msg.elements[i];
+               el = &msg->elements[i];
 
-               list->dn = ldb_malloc_array_p(ldb, char *, el->num_values);
+               list->dn = talloc_array(list, char *, el->num_values);
                if (!list->dn) {
-                       break;          
+                       talloc_free(msg);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
                for (j=0;j<el->num_values;j++) {
                        list->dn[list->count] = 
-                               ldb_strdup(ldb, (char *)el->values[j].data);
+                               talloc_strdup(list->dn, (char *)el->values[j].data);
                        if (!list->dn[list->count]) {
-                               dn_list_free(ldb, list);
-                               ltdb_search_dn1_free(module, &msg);
-                               return -1;
+                               talloc_free(msg);
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                        list->count++;
                }
        }
 
-       ltdb_search_dn1_free(module, &msg);
+       talloc_free(msg);
 
-       qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       if (list->count > 1) {
+               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       }
 
-       return 1;
+       return LDB_SUCCESS;
 }
 
 
 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
 
-/*
-  return a list of dn's that might match a simple indexed search on
-  the special objectclass attribute
- */
-static int ltdb_index_dn_objectclass(struct ldb_module *module, 
-                                    struct ldb_parse_tree *tree,
-                                    const struct ldb_message *index_list,
-                                    struct dn_list *list)
-{
-       struct ldb_context *ldb = module->ldb;
-       struct ltdb_private *ltdb = module->private_data;
-       unsigned int i;
-       int ret;
-       const char *target = tree->u.simple.value.data;
-
-       list->count = 0;
-       list->dn = NULL;
-
-       ret = ltdb_index_dn_simple(module, tree, index_list, list);
-
-       for (i=0;i<ltdb->cache.subclasses.num_elements;i++) {
-               struct ldb_message_element *el = &ltdb->cache.subclasses.elements[i];
-               if (ldb_attr_cmp(el->name, target) == 0) {
-                       unsigned int j;
-                       for (j=0;j<el->num_values;j++) {
-                               struct ldb_parse_tree tree2;
-                               struct dn_list list2;
-                               tree2.operation = LDB_OP_SIMPLE;
-                               tree2.u.simple.attr = ldb_strdup(ldb, LTDB_OBJECTCLASS);
-                               if (!tree2.u.simple.attr) {
-                                       return -1;
-                               }
-                               tree2.u.simple.value = el->values[j];
-                               if (ltdb_index_dn_objectclass(module, &tree2, 
-                                                             index_list, &list2) == 1) {
-                                       if (list->count == 0) {
-                                               *list = list2;
-                                               ret = 1;
-                                       } else {
-                                               list_union(ldb, list, &list2);
-                                               dn_list_free(ldb, &list2);
-                                       }
-                               }
-                               ldb_free(ldb, tree2.u.simple.attr);
-                       }
-               }
-       }
-
-       return ret;
-}
-
 /*
   return a list of dn's that might match a leaf indexed search
  */
 static int ltdb_index_dn_leaf(struct ldb_module *module, 
-                             struct ldb_parse_tree *tree,
+                             const struct ldb_parse_tree *tree,
                              const struct ldb_message *index_list,
                              struct dn_list *list)
 {
-       if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
-               return ltdb_index_dn_objectclass(module, tree, index_list, list);
+       if (ldb_attr_dn(tree->u.equality.attr) == 0) {
+               list->dn = talloc_array(list, char *, 1);
+               if (list->dn == NULL) {
+                       ldb_oom(module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
+               if (list->dn[0] == NULL) {
+                       ldb_oom(module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               list->count = 1;
+               return LDB_SUCCESS;
        }
        return ltdb_index_dn_simple(module, tree, index_list, list);
 }
@@ -257,37 +291,42 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
 static int list_intersect(struct ldb_context *ldb,
                          struct dn_list *list, const struct dn_list *list2)
 {
-       struct dn_list list3;
+       struct dn_list *list3;
        unsigned int i;
 
        if (list->count == 0 || list2->count == 0) {
                /* 0 & X == 0 */
-               dn_list_free(ldb, list);
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       list3.dn = ldb_malloc_array_p(ldb, char *, list->count);
-       if (!list3.dn) {
-               dn_list_free(ldb, list);
-               return -1;
+       list3 = talloc(ldb, struct dn_list);
+       if (list3 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
-       list3.count = 0;
+
+       list3->dn = talloc_array(list3, char *, list->count);
+       if (!list3->dn) {
+               talloc_free(list3);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       list3->count = 0;
 
        for (i=0;i<list->count;i++) {
                if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
                              sizeof(char *), (comparison_fn_t)strcmp) != -1) {
-                       list3.dn[list3.count] = list->dn[i];
-                       list3.count++;
+                       list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
+                       list3->count++;
                } else {
-                       ldb_free(ldb, list->dn[i]);
+                       talloc_free(list->dn[i]);
                }               
        }
 
-       ldb_free(ldb, list->dn);
-       list->dn = list3.dn;
-       list->count = list3.count;
+       talloc_free(list->dn);
+       list->dn = talloc_move(list, &list3->dn);
+       list->count = list3->count;
+       talloc_free(list3);
 
-       return 0;
+       return LDB_ERR_NO_SUCH_OBJECT;
 }
 
 
@@ -305,24 +344,21 @@ static int list_union(struct ldb_context *ldb,
 
        if (list->count == 0 && list2->count == 0) {
                /* 0 | 0 == 0 */
-               dn_list_free(ldb, list);
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       d = ldb_realloc_p(ldb, list->dn, char *, list->count + list2->count);
+       d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
        if (!d) {
-               dn_list_free(ldb, list);
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        list->dn = d;
 
        for (i=0;i<list2->count;i++) {
                if (ldb_list_find(list2->dn[i], list->dn, count, 
                              sizeof(char *), (comparison_fn_t)strcmp) == -1) {
-                       list->dn[list->count] = ldb_strdup(ldb, list2->dn[i]);
+                       list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
                        if (!list->dn[list->count]) {
-                               dn_list_free(ldb, list);
-                               return -1;
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                        list->count++;
                }               
@@ -332,11 +368,11 @@ static int list_union(struct ldb_context *ldb,
                qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
        }
 
-       return 0;
+       return LDB_ERR_NO_SUCH_OBJECT;
 }
 
 static int ltdb_index_dn(struct ldb_module *module, 
-                        struct ldb_parse_tree *tree,
+                        const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list);
 
@@ -345,7 +381,7 @@ static int ltdb_index_dn(struct ldb_module *module,
   OR two index results
  */
 static int ltdb_index_dn_or(struct ldb_module *module, 
-                           struct ldb_parse_tree *tree,
+                           const struct ldb_parse_tree *tree,
                            const struct ldb_message *index_list,
                            struct dn_list *list)
 {
@@ -353,44 +389,53 @@ static int ltdb_index_dn_or(struct ldb_module *module,
        unsigned int i;
        int ret;
        
-       ret = -1;
+       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
        for (i=0;i<tree->u.list.num_elements;i++) {
-               struct dn_list list2;
+               struct dn_list *list2;
                int v;
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, &list2);
 
-               if (v == 0) {
+               list2 = talloc(module, struct dn_list);
+               if (list2 == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
+
+               if (v == LDB_ERR_NO_SUCH_OBJECT) {
                        /* 0 || X == X */
-                       if (ret == -1) {
-                               ret = 0;
+                       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                               ret = v;
                        }
+                       talloc_free(list2);
                        continue;
                }
 
-               if (v == -1) {
+               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
                        /* 1 || X == 1 */
-                       dn_list_free(ldb, list);
-                       return -1;
+                       talloc_free(list->dn);
+                       talloc_free(list2);
+                       return v;
                }
 
-               if (ret == -1) {
-                       ret = 1;
-                       *list = list2;
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
+                       list->dn = talloc_move(list, &list2->dn);
+                       list->count = list2->count;
                } else {
-                       if (list_union(ldb, list, &list2) == -1) {
-                               dn_list_free(ldb, &list2);
-                               return -1;
+                       if (list_union(ldb, list, list2) == -1) {
+                               talloc_free(list2);
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
-                       dn_list_free(ldb, &list2);
+                       ret = LDB_SUCCESS;
                }
+               talloc_free(list2);
        }
 
        if (list->count == 0) {
-               dn_list_free(ldb, list);
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
        return ret;
@@ -401,7 +446,7 @@ static int ltdb_index_dn_or(struct ldb_module *module,
   NOT an index results
  */
 static int ltdb_index_dn_not(struct ldb_module *module, 
-                            struct ldb_parse_tree *tree,
+                            const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
 {
@@ -413,14 +458,14 @@ static int ltdb_index_dn_not(struct ldb_module *module,
           instead, we just give up, and rely on a full index scan
           (unless an outer & manages to reduce the list)
        */
-       return -1;
+       return LDB_ERR_OPERATIONS_ERROR;
 }
 
 /*
   AND two index results
  */
 static int ltdb_index_dn_and(struct ldb_module *module, 
-                            struct ldb_parse_tree *tree,
+                            const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
 {
@@ -428,61 +473,165 @@ static int ltdb_index_dn_and(struct ldb_module *module,
        unsigned int i;
        int ret;
        
-       ret = -1;
+       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
        for (i=0;i<tree->u.list.num_elements;i++) {
-               struct dn_list list2;
+               struct dn_list *list2;
                int v;
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, &list2);
 
-               if (v == 0) {
+               list2 = talloc(module, struct dn_list);
+               if (list2 == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
+
+               if (v == LDB_ERR_NO_SUCH_OBJECT) {
                        /* 0 && X == 0 */
-                       dn_list_free(ldb, list);
-                       return 0;
+                       talloc_free(list->dn);
+                       talloc_free(list2);
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
 
-               if (v == -1) {
+               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
+                       talloc_free(list2);
                        continue;
                }
 
-               if (ret == -1) {
-                       ret = 1;
-                       *list = list2;
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ret = LDB_SUCCESS;
+                       talloc_free(list->dn);
+                       list->dn = talloc_move(list, &list2->dn);
+                       list->count = list2->count;
                } else {
-                       if (list_intersect(ldb, list, &list2) == -1) {
-                               dn_list_free(ldb, &list2);
-                               return -1;
+                       if (list_intersect(ldb, list, list2) == -1) {
+                               talloc_free(list2);
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
-                       dn_list_free(ldb, &list2);
                }
 
+               talloc_free(list2);
+
                if (list->count == 0) {
-                       if (list->dn) ldb_free(ldb, list->dn);
-                       return 0;
+                       talloc_free(list->dn);
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
        }
 
        return ret;
 }
 
+/*
+  AND index results and ONE level special index
+ */
+static int ltdb_index_dn_one(struct ldb_module *module, 
+                            struct ldb_dn *parent_dn,
+                            struct dn_list *list)
+{
+       struct ldb_context *ldb = module->ldb;
+       struct dn_list *list2;
+       struct ldb_message *msg;
+       struct ldb_dn *key;
+       struct ldb_val val;
+       unsigned int i, j;
+       int ret;
+       
+       list2 = talloc_zero(module, struct dn_list);
+       if (list2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* the attribute is indexed. Pull the list of DNs that match the 
+          search criterion */
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
+       val.length = strlen((char *)val.data);
+       key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
+       if (!key) {
+               talloc_free(list2);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       msg = talloc(list2, struct ldb_message);
+       if (msg == NULL) {
+               talloc_free(list2);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_search_dn1(module, key, msg);
+       talloc_free(key);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       for (i = 0; i < msg->num_elements; i++) {
+               struct ldb_message_element *el;
+
+               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
+                       continue;
+               }
+
+               el = &msg->elements[i];
+
+               list2->dn = talloc_array(list2, char *, el->num_values);
+               if (!list2->dn) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               for (j = 0; j < el->num_values; j++) {
+                       list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
+                       if (!list2->dn[list2->count]) {
+                               talloc_free(list2);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       list2->count++;
+               }
+       }
+
+       if (list2->count == 0) {
+               talloc_free(list2);
+               return LDB_ERR_NO_SUCH_OBJECT;
+       }
+
+       if (list2->count > 1) {
+               qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
+       }
+
+       if (list->count > 0) {
+               if (list_intersect(ldb, list, list2) == -1) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               if (list->count == 0) {
+                       talloc_free(list->dn);
+                       talloc_free(list2);
+                       return LDB_ERR_NO_SUCH_OBJECT;
+               }
+       } else {
+               list->dn = talloc_move(list, &list2->dn);
+               list->count = list2->count;
+       }
+
+       talloc_free(list2);
+
+       return LDB_SUCCESS;
+}
+
 /*
   return a list of dn's that might match a indexed search or
-  -1 if an error. return 0 for no matches, or 1 for matches
+  an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
  */
 static int ltdb_index_dn(struct ldb_module *module, 
-                        struct ldb_parse_tree *tree,
+                        const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list)
 {
-       int ret = -1;
+       int ret = LDB_ERR_OPERATIONS_ERROR;
 
        switch (tree->operation) {
-       case LDB_OP_SIMPLE:
-               ret = ltdb_index_dn_leaf(module, tree, index_list, list);
-               break;
-
        case LDB_OP_AND:
                ret = ltdb_index_dn_and(module, tree, index_list, list);
                break;
@@ -494,6 +643,20 @@ static int ltdb_index_dn(struct ldb_module *module,
        case LDB_OP_NOT:
                ret = ltdb_index_dn_not(module, tree, index_list, list);
                break;
+
+       case LDB_OP_EQUALITY:
+               ret = ltdb_index_dn_leaf(module, tree, index_list, list);
+               break;
+
+       case LDB_OP_SUBSTRING:
+       case LDB_OP_GREATER:
+       case LDB_OP_LESS:
+       case LDB_OP_PRESENT:
+       case LDB_OP_APPROX:
+       case LDB_OP_EXTENDED:
+               /* we can't index with fancy bitops yet */
+               ret = LDB_ERR_OPERATIONS_ERROR;
+               break;
        }
 
        return ret;
@@ -503,39 +666,79 @@ static int ltdb_index_dn(struct ldb_module *module,
   filter a candidate dn_list from an indexed search into a set of results
   extracting just the given attributes
 */
-static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
-                           const char *base,
-                           enum ldb_scope scope,
-                           const struct dn_list *dn_list, 
-                           const char * const attrs[], struct ldb_message ***res)
+static int ltdb_index_filter(const struct dn_list *dn_list, 
+                            struct ldb_handle *handle)
 {
+       struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
+       struct ldb_reply *ares = NULL;
        unsigned int i;
-       int count = 0;
 
-       for (i=0;i<dn_list->count;i++) {
-               struct ldb_message msg;
+       for (i = 0; i < dn_list->count; i++) {
+               struct ldb_dn *dn;
                int ret;
-               ret = ltdb_search_dn1(module, dn_list->dn[i], &msg);
-               if (ret == 0) {
+
+               ares = talloc_zero(ac, struct ldb_reply);
+               if (!ares) {
+                       handle->status = LDB_ERR_OPERATIONS_ERROR;
+                       handle->state = LDB_ASYNC_DONE;
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               ares->message = ldb_msg_new(ares);
+               if (!ares->message) {
+                       handle->status = LDB_ERR_OPERATIONS_ERROR;
+                       handle->state = LDB_ASYNC_DONE;
+                       talloc_free(ares);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+
+               dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
+               if (dn == NULL) {
+                       talloc_free(ares);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               ret = ltdb_search_dn1(ac->module, dn, ares->message);
+               talloc_free(dn);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
+                       talloc_free(ares);
                        continue;
                }
 
-               if (ret == -1) {
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                        /* an internal error */
-                       return -1;
+                       talloc_free(ares);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               if (ltdb_message_match(module, &msg, tree, base, scope) == 1) {
-                       ret = ltdb_add_attr_results(module, &msg, attrs, &count, res);
+               if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
+                       talloc_free(ares);
+                       continue;
                }
-               ltdb_search_dn1_free(module, &msg);
-               if (ret != 0) {
-                       return -1;
+
+               /* filter the attributes that the user wants */
+               ret = ltdb_filter_attrs(ares->message, ac->attrs);
+
+               if (ret == -1) {
+                       handle->status = LDB_ERR_OPERATIONS_ERROR;
+                       handle->state = LDB_ASYNC_DONE;
+                       talloc_free(ares);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               ares->type = LDB_REPLY_ENTRY;
+               handle->state = LDB_ASYNC_PENDING;
+               handle->status = ac->callback(ac->module->ldb, ac->context, ares);
+
+               if (handle->status != LDB_SUCCESS) {
+                       handle->state = LDB_ASYNC_DONE;
+                       return handle->status;
                }
        }
 
-       return count;
+       return LDB_SUCCESS;
 }
 
 /*
@@ -543,32 +746,77 @@ static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tr
   returns -1 if an indexed search is not possible, in which
   case the caller should call ltdb_search_full() 
 */
-int ltdb_search_indexed(struct ldb_module *module, 
-                       const char *base,
-                       enum ldb_scope scope,
-                       struct ldb_parse_tree *tree,
-                       const char * const attrs[], struct ldb_message ***res)
+int ltdb_search_indexed(struct ldb_handle *handle)
 {
-       struct ldb_context *ldb = module->ldb;
-       struct ltdb_private *ltdb = module->private_data;
-       struct dn_list dn_list;
-       int ret;
+       struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
+       struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
+       struct dn_list *dn_list;
+       int ret, idxattr, idxone;
+
+       idxattr = idxone = 0;
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
+       if (ret == 0 ) {
+               idxattr = 1;
+       }
 
-       if (ltdb->cache.indexlist.num_elements == 0) {
-               /* no index list? must do full search */
-               return -1;
+       /* We do one level indexing only if requested */
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
+       if (ret == 0 ) {
+               idxone = 1;
+       }
+
+       if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
+           (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
+               /* no indexes? must do full search */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = LDB_ERR_OPERATIONS_ERROR;
+
+       dn_list = talloc_zero(handle, struct dn_list);
+       if (dn_list == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ac->scope == LDB_SCOPE_BASE) {
+               /* with BASE searches only one DN can match */
+               dn_list->dn = talloc_array(dn_list, char *, 1);
+               if (dn_list->dn == NULL) {
+                       ldb_oom(ac->module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
+               if (dn_list->dn[0] == NULL) {
+                       ldb_oom(ac->module->ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               dn_list->count = 1;
+               ret = LDB_SUCCESS;
+       }
+
+       if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
+               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
+
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
        }
 
-       ret = ltdb_index_dn(module, tree, &ltdb->cache.indexlist, &dn_list);
+       if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
+               ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
+       }
 
-       if (ret == 1) {
+       if (ret == LDB_SUCCESS) {
                /* we've got a candidate list - now filter by the full tree
                   and extract the needed attributes */
-               ret = ldb_index_filter(module, tree, base, scope, &dn_list, 
-                                      attrs, res);
-               dn_list_free(ldb, &dn_list);
+               ret = ltdb_index_filter(dn_list, handle);
+               handle->status = ret;
+               handle->state = LDB_ASYNC_DONE;
        }
 
+       talloc_free(dn_list);
+
        return ret;
 }
 
@@ -577,34 +825,33 @@ int ltdb_search_indexed(struct ldb_module *module,
 */
 static int ltdb_index_add1_new(struct ldb_context *ldb, 
                               struct ldb_message *msg,
-                              struct ldb_message_element *el,
-                              char *dn)
+                              const char *dn)
 {
-       struct ldb_message_element *el2;
+       struct ldb_message_element *el;
 
        /* add another entry */
-       el2 = ldb_realloc_p(ldb, msg->elements, 
-                           struct ldb_message_element, msg->num_elements+1);
-       if (!el2) {
-               return -1;
+       el = talloc_realloc(msg, msg->elements, 
+                              struct ldb_message_element, msg->num_elements+1);
+       if (!el) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg->elements = el2;
-       msg->elements[msg->num_elements].name = ldb_strdup(ldb, LTDB_IDX);
+       msg->elements = el;
+       msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
        if (!msg->elements[msg->num_elements].name) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[msg->num_elements].num_values = 0;
-       msg->elements[msg->num_elements].values = ldb_malloc_p(ldb, struct ldb_val);
+       msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
        if (!msg->elements[msg->num_elements].values) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[msg->num_elements].values[0].length = strlen(dn);
-       msg->elements[msg->num_elements].values[0].data = dn;
+       msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
        msg->elements[msg->num_elements].num_values = 1;
        msg->num_elements++;
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 
@@ -614,132 +861,141 @@ static int ltdb_index_add1_new(struct ldb_context *ldb,
 */
 static int ltdb_index_add1_add(struct ldb_context *ldb, 
                               struct ldb_message *msg,
-                              struct ldb_message_element *el,
                               int idx,
-                              char *dn)
+                              const char *dn)
 {
        struct ldb_val *v2;
        unsigned int i;
 
        /* for multi-valued attributes we can end up with repeats */
        for (i=0;i<msg->elements[idx].num_values;i++) {
-               if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
-                       return 0;
+               if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
+                       return LDB_SUCCESS;
                }
        }
 
-       v2 = ldb_realloc_p(ldb, msg->elements[idx].values,
-                          struct ldb_val, 
-                          msg->elements[idx].num_values+1);
+       v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
+                             struct ldb_val, 
+                             msg->elements[idx].num_values+1);
        if (!v2) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
        msg->elements[idx].values = v2;
 
        msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
-       msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
+       msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
        msg->elements[idx].num_values++;
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 /*
   add an index entry for one message element
 */
-static int ltdb_index_add1(struct ldb_module *module, char *dn, 
+static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
                           struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb = module->ldb;
-       struct ldb_message msg;
-       char *dn_key;
-       int ret, added=0, added_dn=0;
+       struct ldb_message *msg;
+       struct ldb_dn *dn_key;
+       int ret;
        unsigned int i;
 
-       dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
-       if (!dn_key) {
-               return -1;
+       msg = talloc(module, struct ldb_message);
+       if (msg == NULL) {
+               errno = ENOMEM;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, dn_key, &msg);
-       if (ret == -1) {
-               ldb_free(ldb, dn_key);
-               return -1;
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       if (!dn_key) {
+               talloc_free(msg);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
+       talloc_steal(msg, dn_key);
 
-       if (ret == 0) {
-               added_dn = 1;
-               msg.dn = ldb_strdup(ldb, dn_key);
-               if (!msg.dn) {
-                       ldb_free(ldb, dn_key);
-                       errno = ENOMEM;
-                       return -1;
-               }
-               msg.num_elements = 0;
-               msg.elements = NULL;
-               msg.private_data = NULL;
+       ret = ltdb_search_dn1(module, dn_key, msg);
+       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+               talloc_free(msg);
+               return ret;
        }
 
-       ldb_free(ldb, dn_key);
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               msg->dn = dn_key;
+               msg->num_elements = 0;
+               msg->elements = NULL;
+       }
 
-       for (i=0;i<msg.num_elements;i++) {
-               if (strcmp(LTDB_IDX, msg.elements[i].name) == 0) {
+       for (i=0;i<msg->num_elements;i++) {
+               if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
                        break;
                }
        }
 
-       if (i == msg.num_elements) {
-               added = 1;
-               ret = ltdb_index_add1_new(ldb, &msg, el, dn);
+       if (i == msg->num_elements) {
+               ret = ltdb_index_add1_new(ldb, msg, dn);
        } else {
-               ret = ltdb_index_add1_add(ldb, &msg, el, i, dn);
+               ret = ltdb_index_add1_add(ldb, msg, i, dn);
        }
 
-       if (ret == 0) {
-               ret = ltdb_store(module, &msg, TDB_REPLACE);
+       if (ret == LDB_SUCCESS) {
+               ret = ltdb_store(module, msg, TDB_REPLACE);
        }
 
-       if (added) {
-               ldb_free(ldb, msg.elements[i].name);
-       }
-       if (added_dn) {
-               ldb_free(ldb, msg.dn);
-       }
-
-       ltdb_search_dn1_free(module, &msg);
+       talloc_free(msg);
 
        return ret;
 }
 
-/*
-  add the index entries for a new record
-  return -1 on failure
-*/
-int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
+static int ltdb_index_add0(struct ldb_module *module, const char *dn,
+                          struct ldb_message_element *elements, int num_el)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
        unsigned int i, j;
 
-       if (ltdb->cache.indexlist.num_elements == 0) {
+       if (dn[0] == '@') {
+               return LDB_SUCCESS;
+       }
+
+       if (ltdb->cache->indexlist->num_elements == 0) {
                /* no indexed fields */
-               return 0;
+               return LDB_SUCCESS;
        }
 
-       for (i=0;i<msg->num_elements;i++) {
-               ret = ldb_msg_find_idx(&ltdb->cache.indexlist, msg->elements[i].name, 
+       for (i = 0; i < num_el; i++) {
+               ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
                                       NULL, LTDB_IDXATTR);
                if (ret == -1) {
                        continue;
                }
-               for (j=0;j<msg->elements[i].num_values;j++) {
-                       ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
-                       if (ret == -1) {
-                               return -1;
+               for (j = 0; j < elements[i].num_values; j++) {
+                       ret = ltdb_index_add1(module, dn, &elements[i], j);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
                        }
                }
        }
 
-       return 0;
+       return LDB_SUCCESS;
+}
+
+/*
+  add the index entries for a new record
+*/
+int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
+{
+       const char *dn;
+       int ret;
+
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+
+       return ret;
 }
 
 
@@ -750,54 +1006,64 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                         struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb = module->ldb;
-       struct ldb_message msg;
-       char *dn_key;
+       struct ldb_message *msg;
+       struct ldb_dn *dn_key;
        int ret, i;
        unsigned int j;
 
-       dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
+       if (dn[0] == '@') {
+               return LDB_SUCCESS;
+       }
+
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
        if (!dn_key) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, dn_key, &msg);
-       if (ret == -1) {
-               ldb_free(ldb, dn_key);
-               return -1;
+       msg = talloc(dn_key, struct ldb_message);
+       if (msg == NULL) {
+               talloc_free(dn_key);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (ret == 0) {
+       ret = ltdb_search_dn1(module, dn_key, msg);
+       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+               talloc_free(dn_key);
+               return ret;
+       }
+
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
-               ldb_free(ldb, dn_key);
-               return 0;
+               talloc_free(dn_key);
+               return LDB_SUCCESS;
        }
 
-       i = ldb_msg_find_idx(&msg, dn, &j, LTDB_IDX);
+       i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
        if (i == -1) {
-               ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                               "ERROR: dn %s not found in %s\n", dn,
+                               ldb_dn_get_linearized(dn_key));
                /* it ain't there. hmmm */
-               ltdb_search_dn1_free(module, &msg);
-               ldb_free(ldb, dn_key);
-               return 0;
+               talloc_free(dn_key);
+               return LDB_SUCCESS;
        }
 
-       if (j != msg.elements[i].num_values - 1) {
-               memmove(&msg.elements[i].values[j], 
-                       &msg.elements[i].values[j+1], 
-                       (msg.elements[i].num_values-(j+1)) * 
-                       sizeof(msg.elements[i].values[0]));
+       if (j != msg->elements[i].num_values - 1) {
+               memmove(&msg->elements[i].values[j], 
+                       &msg->elements[i].values[j+1], 
+                       (msg->elements[i].num_values-(j+1)) * 
+                       sizeof(msg->elements[i].values[0]));
        }
-       msg.elements[i].num_values--;
+       msg->elements[i].num_values--;
 
-       if (msg.elements[i].num_values == 0) {
+       if (msg->elements[i].num_values == 0) {
                ret = ltdb_delete_noindex(module, dn_key);
        } else {
-               ret = ltdb_store(module, &msg, TDB_REPLACE);
+               ret = ltdb_store(module, msg, TDB_REPLACE);
        }
 
-       ltdb_search_dn1_free(module, &msg);
-       ldb_free(ldb, dn_key);
+       talloc_free(dn_key);
 
        return ret;
 }
@@ -808,31 +1074,92 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
 */
 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
+       const char *dn;
        unsigned int i, j;
 
        /* find the list of indexed fields */   
-       if (ltdb->cache.indexlist.num_elements == 0) {
+       if (ltdb->cache->indexlist->num_elements == 0) {
                /* no indexed fields */
-               return 0;
+               return LDB_SUCCESS;
        }
 
-       for (i=0;i<msg->num_elements;i++) {
-               ret = ldb_msg_find_idx(&ltdb->cache.indexlist, msg->elements[i].name, 
+       if (ldb_dn_is_special(msg->dn)) {
+               return LDB_SUCCESS;
+       }
+
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       for (i = 0; i < msg->num_elements; i++) {
+               ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
                                       NULL, LTDB_IDXATTR);
                if (ret == -1) {
                        continue;
                }
-               for (j=0;j<msg->elements[i].num_values;j++) {
-                       ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
-                       if (ret == -1) {
-                               return -1;
+               for (j = 0; j < msg->elements[i].num_values; j++) {
+                       ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
                        }
                }
        }
 
-       return 0;
+       return LDB_SUCCESS;
+}
+
+/* 
+  handle special index for one level searches
+*/
+int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
+{
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
+       struct ldb_message_element el;
+       struct ldb_val val;
+       struct ldb_dn *pdn;
+       const char *dn;
+       int ret;
+
+       /* We index for ONE Level only if requested */
+       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
+       if (ret != 0) {
+               return LDB_SUCCESS;
+       }
+
+       pdn = ldb_dn_get_parent(module, msg->dn);
+       if (pdn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
+       if (val.data == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val.length = strlen((char *)val.data);
+       el.name = LTDB_IDXONE;
+       el.values = &val;
+       el.num_values = 1;
+
+       if (add) {
+               ret = ltdb_index_add1(module, dn, &el, 0);
+       } else { /* delete */
+               ret = ltdb_index_del_value(module, dn, &el, 0);
+       }
+
+       talloc_free(pdn);
+
+       return ret;
 }
 
 
@@ -842,7 +1169,7 @@ int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
        const char *dn = "DN=" LTDB_INDEX ":";
-       if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
+       if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
                return tdb_delete(tdb, key);
        }
        return 0;
@@ -853,29 +1180,64 @@ static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, vo
 */
 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
-       struct ldb_module *module = state;
-       struct ldb_message msg;
+       struct ldb_module *module = (struct ldb_module *)state;
+       struct ldb_message *msg;
+       const char *dn = NULL;
        int ret;
+       TDB_DATA key2;
 
-       if (strncmp(key.dptr, "DN=@", 4) == 0 ||
-           strncmp(key.dptr, "DN=", 3) != 0) {
+       if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
+           strncmp((char *)key.dptr, "DN=", 3) != 0) {
                return 0;
        }
 
-       ret = ltdb_unpack_data(module, &data, &msg);
+       msg = talloc(module, struct ldb_message);
+       if (msg == NULL) {
+               return -1;
+       }
+
+       ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
+               talloc_free(msg);
                return -1;
        }
 
-       if (!msg.dn) {
-               msg.dn = key.dptr+3;
+       /* check if the DN key has changed, perhaps due to the 
+          case insensitivity of an element changing */
+       key2 = ltdb_key(module, msg->dn);
+       if (key2.dptr == NULL) {
+               /* probably a corrupt record ... darn */
+               ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
+                                                       ldb_dn_get_linearized(msg->dn));
+               talloc_free(msg);
+               return 0;
+       }
+       if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
+               tdb_delete(tdb, key);
+               tdb_store(tdb, key2, data, 0);
        }
+       talloc_free(key2.dptr);
 
-       ret = ltdb_index_add(module, &msg);
+       if (msg->dn == NULL) {
+               dn = (char *)key.dptr + 3;
+       } else {
+               dn = ldb_dn_get_linearized(msg->dn);
+       }
 
-       ltdb_unpack_data_free(module, &msg);
+       ret = ltdb_index_one(module, msg, 1);
+       if (ret == LDB_SUCCESS) {
+               ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       } else {
+               ldb_debug(module->ldb, LDB_DEBUG_ERROR,
+                       "Adding special ONE LEVEL index failed (%s)!\n",
+                       ldb_dn_get_linearized(msg->dn));
+       }
 
-       return ret;
+       talloc_free(msg);
+
+       if (ret != LDB_SUCCESS) return -1;
+
+       return 0;
 }
 
 /*
@@ -883,28 +1245,29 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 */
 int ltdb_reindex(struct ldb_module *module)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
        int ret;
 
-       ltdb_cache_free(module);
-
-       if (ltdb_cache_load(module) != 0) {
-               return -1;
+       if (ltdb_cache_reload(module) != 0) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* first traverse the database deleting any @INDEX records */
        ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
        if (ret == -1) {
-               errno = EIO;
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if we don't have indexes we have nothing todo */
+       if (ltdb->cache->indexlist->num_elements == 0) {
+               return LDB_SUCCESS;
        }
 
        /* now traverse adding any indexes for normal LDB records */
        ret = tdb_traverse(ltdb->tdb, re_index, module);
        if (ret == -1) {
-               errno = EIO;
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       return 0;
+       return LDB_SUCCESS;
 }