s4-ldb: don't try to index non-indexed attributes
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index 045be985a4cd1c6e314ca25b06c0c768cca09726..a28e97256fcdc2979f22a3d94da10b4770f7791b 100644 (file)
@@ -1,16 +1,16 @@
-/* 
+/*
    ldb database library
 
-   Copyright (C) Andrew Tridgell  2004
+   Copyright (C) Andrew Tridgell  2004-2009
 
      ** NOTE! The following LGPL license applies to the ldb
      ** library. This does NOT imply that all of Samba is released
      ** under the LGPL
-   
+
    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
+   version 3 of the License, or (at your option) any later version.
 
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -18,8 +18,7 @@
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 
 /*
  *  Author: Andrew Tridgell
  */
 
-#include "includes.h"
-#include "ldb/include/includes.h"
+#include "ldb_tdb.h"
 
-#include "ldb/ldb_tdb/ldb_tdb.h"
+struct dn_list {
+       unsigned int count;
+       struct ldb_val *dn;
+};
 
-/*
-  find an element in a list, using the given comparison function and
-  assuming that the list is already sorted using comp_fn
+struct ltdb_idxptr {
+       struct tdb_context *itdb;
+       bool repack;
+       int error;
+};
 
-  return -1 if not found, or the index of the first occurance of needle if found
+/* we put a @IDXVERSION attribute on index entries. This
+   allows us to tell if it was written by an older version
 */
-static int ldb_list_find(const void *needle, 
-                        const void *base, size_t nmemb, size_t size, 
-                        comparison_fn_t comp_fn)
+#define LTDB_INDEXING_VERSION 2
+
+/* enable the idxptr mode when transactions start */
+int ltdb_index_transaction_start(struct ldb_module *module)
 {
-       const char *base_p = base;
-       size_t min_i, max_i, test_i;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
+       return LDB_SUCCESS;
+}
 
-       if (nmemb == 0) {
-               return -1;
+/* compare two DN entries in a dn_list. Take account of possible
+ * differences in string termination */
+static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
+{
+       int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
+       if (ret != 0) return ret;
+       if (v2->length > v1->length && v2->data[v1->length] != 0) {
+               return 1;
        }
+       return 0;
+}
 
-       min_i = 0;
-       max_i = nmemb-1;
-
-       while (min_i < max_i) {
-               int r;
-
-               test_i = (min_i + max_i) / 2;
-               /* the following cast looks strange, but is
-                correct. The key to understanding it is that base_p
-                is a pointer to an array of pointers, so we have to
-                dereference it after casting to void **. The strange
-                const in the middle gives us the right type of pointer
-                after the dereference  (tridge) */
-               r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
-               if (r == 0) {
-                       /* scan back for first element */
-                       while (test_i > 0 &&
-                              comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
-                               test_i--;
-                       }
-                       return test_i;
+
+/*
+  find a entry in a dn_list, using a ldb_val. Uses a case sensitive
+  comparison with the dn returns -1 if not found
+ */
+static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
+{
+       int i;
+       for (i=0; i<list->count; i++) {
+               if (dn_list_cmp(&list->dn[i], v) == 0) return i;
+       }
+       return -1;
+}
+
+/*
+  find a entry in a dn_list. Uses a case sensitive comparison with the dn
+  returns -1 if not found
+ */
+static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
+{
+       struct ldb_val v;
+       v.data = discard_const_p(unsigned char, dn);
+       v.length = strlen(dn);
+       return ltdb_dn_list_find_val(list, &v);
+}
+
+static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
+{
+       struct dn_list *list;
+       if (rec.dsize != sizeof(void *)) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad data size for idxptr %u", (unsigned)rec.dsize);
+               return NULL;
+       }
+       
+       list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
+       if (list == NULL) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad type '%s' for idxptr", 
+                                      talloc_get_name(*(struct dn_list **)rec.dptr));
+               return NULL;
+       }
+       if (check_parent && list->dn && talloc_parent(list->dn) != list) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad parent '%s' for idxptr", 
+                                      talloc_get_name(talloc_parent(list->dn)));
+               return NULL;
+       }
+       return list;
+}
+
+/*
+  return the @IDX list in an index entry for a dn as a 
+  struct dn_list
+ */
+static int ltdb_dn_list_load(struct ldb_module *module,
+                            struct ldb_dn *dn, struct dn_list *list)
+{
+       struct ldb_message *msg;
+       int ret;
+       struct ldb_message_element *el;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       TDB_DATA rec;
+       struct dn_list *list2;
+       TDB_DATA key;
+
+       list->dn = NULL;
+       list->count = 0;
+
+       /* see if we have any in-memory index entries */
+       if (ltdb->idxptr == NULL ||
+           ltdb->idxptr->itdb == NULL) {
+               goto normal_index;
+       }
+
+       key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
+       key.dsize = strlen((char *)key.dptr);
+
+       rec = tdb_fetch(ltdb->idxptr->itdb, key);
+       if (rec.dptr == NULL) {
+               goto normal_index;
+       }
+
+       /* we've found an in-memory index entry */
+       list2 = ltdb_index_idxptr(module, rec, true);
+       if (list2 == NULL) {
+               free(rec.dptr);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       free(rec.dptr);
+
+       *list = *list2;
+       return LDB_SUCCESS;
+
+normal_index:
+       msg = ldb_msg_new(list);
+       if (msg == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_search_dn1(module, dn, msg);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /* TODO: check indexing version number */
+
+       el = ldb_msg_find_element(msg, LTDB_IDX);
+       if (!el) {
+               talloc_free(msg);
+               return LDB_SUCCESS;
+       }
+
+       /* we avoid copying the strings by stealing the list */
+       list->dn = talloc_steal(list, el->values);
+       list->count = el->num_values;
+
+       return LDB_SUCCESS;
+}
+
+
+/*
+  save a dn_list into a full @IDX style record
+ */
+static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
+                                  struct dn_list *list)
+{
+       struct ldb_message *msg;
+       int ret;
+
+       if (list->count == 0) {
+               ret = ltdb_delete_noindex(module, dn);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       return LDB_SUCCESS;
                }
-               if (r < 0) {
-                       if (test_i == 0) {
-                               return -1;
-                       }
-                       max_i = test_i - 1;
+               return ret;
+       }
+
+       msg = ldb_msg_new(module);
+       if (!msg) {
+               ldb_module_oom(module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
+       if (ret != LDB_SUCCESS) {
+               ldb_module_oom(module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       msg->dn = dn;
+       if (list->count > 0) {
+               struct ldb_message_element *el;
+
+               ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
+               if (ret != LDB_SUCCESS) {
+                       ldb_module_oom(module);
+                       talloc_free(msg);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               if (r > 0) {
-                       min_i = test_i + 1;
+               el->values = list->dn;
+               el->num_values = list->count;
+       }
+
+       ret = ltdb_store(module, msg, TDB_REPLACE);
+       talloc_free(msg);
+       return ret;
+}
+
+/*
+  save a dn_list into the database, in either @IDX or internal format
+ */
+static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
+                             struct dn_list *list)
+{
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       TDB_DATA rec, key;
+       int ret;
+       struct dn_list *list2;
+
+       if (ltdb->idxptr == NULL) {
+               return ltdb_dn_list_store_full(module, dn, list);
+       }
+
+       if (ltdb->idxptr->itdb == NULL) {
+               ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
+               if (ltdb->idxptr->itdb == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
        }
 
-       if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
-               return min_i;
+       key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
+       key.dsize = strlen((char *)key.dptr);
+
+       rec = tdb_fetch(ltdb->idxptr->itdb, key);
+       if (rec.dptr != NULL) {
+               list2 = ltdb_index_idxptr(module, rec, false);
+               if (list2 == NULL) {
+                       free(rec.dptr);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               free(rec.dptr);
+               list2->dn = talloc_steal(list2, list->dn);
+               list2->count = list->count;
+               return LDB_SUCCESS;
        }
 
-       return -1;
+       list2 = talloc(ltdb->idxptr, struct dn_list);
+       if (list2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       list2->dn = talloc_steal(list2, list->dn);
+       list2->count = list->count;
+
+       rec.dptr = (uint8_t *)&list2;
+       rec.dsize = sizeof(void *);
+
+       ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
+       if (ret == -1) {
+               return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+  traverse function for storing the in-memory index entries on disk
+ */
+static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+{
+       struct ldb_module *module = state;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       struct ldb_dn *dn;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_val v;
+       struct dn_list *list;
+
+       list = ltdb_index_idxptr(module, data, true);
+       if (list == NULL) {
+               ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+
+       v.data = key.dptr;
+       v.length = key.dsize;
+
+       dn = ldb_dn_from_ldb_val(module, ldb, &v);
+       if (dn == NULL) {
+               ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+
+       ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
+       talloc_free(dn);
+       if (ltdb->idxptr->error != 0) {
+               return -1;
+       }
+       return 0;
+}
+
+/* cleanup the idxptr mode when transaction commits */
+int ltdb_index_transaction_commit(struct ldb_module *module)
+{
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       int ret;
+
+       if (ltdb->idxptr->itdb) {
+               tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
+               tdb_close(ltdb->idxptr->itdb);
+       }
+
+       ret = ltdb->idxptr->error;
+
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
+       }
+
+       talloc_free(ltdb->idxptr);
+       ltdb->idxptr = NULL;
+       return ret;
+}
+
+/* cleanup the idxptr mode when transaction cancels */
+int ltdb_index_transaction_cancel(struct ldb_module *module)
+{
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       if (ltdb->idxptr && ltdb->idxptr->itdb) {
+               tdb_close(ltdb->idxptr->itdb);
+       }
+       talloc_free(ltdb->idxptr);
+       ltdb->idxptr = NULL;
+       return LDB_SUCCESS;
 }
 
-struct dn_list {
-       unsigned int count;
-       char **dn;
-};
 
 /*
   return the dn key to be used for an index
-  caller frees
+  the caller is responsible for freeing
 */
 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
-                                    const char *attr, const struct ldb_val *value)
+                                    const char *attr, const struct ldb_val *value,
+                                    const struct ldb_schema_attribute **ap)
 {
        struct ldb_dn *ret;
        struct ldb_val v;
-       const struct ldb_attrib_handler *h;
+       const struct ldb_schema_attribute *a;
        char *attr_folded;
+       int r;
 
        attr_folded = ldb_attr_casefold(ldb, attr);
        if (!attr_folded) {
                return NULL;
        }
 
-       h = ldb_attrib_handler(ldb, attr);
-       if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       if (ap) {
+               *ap = a;
+       }
+       r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
+       if (r != LDB_SUCCESS) {
+               const char *errstr = ldb_errstring(ldb);
                /* canonicalisation can be refused. For example, 
                   a attribute that takes wildcards will refuse to canonicalise
                   if the value contains a wildcard */
+               ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
+                                      attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
                talloc_free(attr_folded);
                return NULL;
        }
-       if (ldb_should_b64_encode(&v)) {
+       if (ldb_should_b64_encode(ldb, &v)) {
                char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
                if (!vstr) return NULL;
                ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
@@ -144,195 +418,89 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
 /*
   see if a attribute value is in the list of indexed attributes
 */
-static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
-                           unsigned int *v_idx, const char *key)
+static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
 {
-       unsigned int i, j;
-       for (i=0;i<msg->num_elements;i++) {
-               if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
-                       const struct ldb_message_element *el = 
-                               &msg->elements[i];
-                       for (j=0;j<el->num_values;j++) {
-                               if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
-                                       if (v_idx) {
-                                               *v_idx = j;
-                                       }
-                                       return i;
-                               }
-                       }
+       unsigned int i;
+       struct ldb_message_element *el;
+
+       el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
+       if (el == NULL) {
+               return false;
+       }
+       for (i=0; i<el->num_values; i++) {
+               if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
+                       return true;
                }
        }
-       return -1;
+       return false;
 }
 
-/* used in sorting dn lists */
-static int list_cmp(const char **s1, const char **s2)
-{
-       return strcmp(*s1, *s2);
-}
+/*
+  in the following logic functions, the return value is treated as
+  follows:
+
+     LDB_SUCCESS: we found some matching index values
+
+     LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
+
+     LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
+                               we'll need a full search
+ */
 
 /*
-  return a list of dn's that might match a simple indexed search or
+  return a list of dn's that might match a simple indexed search (an
+  equality search only)
  */
-static int ltdb_index_dn_simple(struct ldb_module *module, 
+static int ltdb_index_dn_simple(struct ldb_module *module,
                                const struct ldb_parse_tree *tree,
                                const struct ldb_message *index_list,
                                struct dn_list *list)
 {
-       struct ldb_context *ldb = module->ldb;
+       struct ldb_context *ldb;
        struct ldb_dn *dn;
        int ret;
-       unsigned int i, j;
-       struct ldb_message *msg;
+
+       ldb = ldb_module_get_ctx(module);
 
        list->count = 0;
        list->dn = NULL;
 
        /* if the attribute isn't in the list of indexed attributes then
           this node needs a full search */
-       if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
-               return -1;
+       if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* the attribute is indexed. Pull the list of DNs that match the 
           search criterion */
-       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
-       if (!dn) return -1;
-
-       msg = talloc(list, struct ldb_message);
-       if (msg == NULL) {
-               return -1;
-       }
+       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
+       if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_dn_list_load(module, dn, list);
        talloc_free(dn);
-       if (ret == 0 || ret == -1) {
-               return ret;
-       }
-
-       for (i=0;i<msg->num_elements;i++) {
-               struct ldb_message_element *el;
-
-               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
-                       continue;
-               }
-
-               el = &msg->elements[i];
-
-               list->dn = talloc_array(list, char *, el->num_values);
-               if (!list->dn) {
-                       talloc_free(msg);
-                       return -1;
-               }
-
-               for (j=0;j<el->num_values;j++) {
-                       list->dn[list->count] = 
-                               talloc_strdup(list->dn, (char *)el->values[j].data);
-                       if (!list->dn[list->count]) {
-                               talloc_free(msg);
-                               return -1;
-                       }
-                       list->count++;
-               }
-       }
-
-       talloc_free(msg);
-
-       if (list->count > 1) {
-               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
-       }
-
-       return 1;
+       return ret;
 }
 
 
-static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
-
-/*
-  return a list of dn's that might match a simple indexed search on
-  the special objectclass attribute
- */
-static int ltdb_index_dn_objectclass(struct ldb_module *module, 
-                                    const struct ldb_parse_tree *tree,
-                                    const struct ldb_message *index_list,
-                                    struct dn_list *list)
-{
-       struct ldb_context *ldb = module->ldb;
-       unsigned int i;
-       int ret;
-       const char *target = (const char *)tree->u.equality.value.data;
-       const char **subclasses;
-
-       list->count = 0;
-       list->dn = NULL;
-
-       ret = ltdb_index_dn_simple(module, tree, index_list, list);
-
-       subclasses = ldb_subclass_list(module->ldb, target);
-
-       if (subclasses == NULL) {
-               return ret;
-       }
-
-       for (i=0;subclasses[i];i++) {
-               struct ldb_parse_tree tree2;
-               struct dn_list *list2;
-               tree2.operation = LDB_OP_EQUALITY;
-               tree2.u.equality.attr = LTDB_OBJECTCLASS;
-               if (!tree2.u.equality.attr) {
-                       return -1;
-               }
-               tree2.u.equality.value.data = 
-                       (uint8_t *)talloc_strdup(list, subclasses[i]);
-               if (tree2.u.equality.value.data == NULL) {
-                       return -1;                      
-               }
-               tree2.u.equality.value.length = strlen(subclasses[i]);
-               list2 = talloc(list, struct dn_list);
-               if (list2 == NULL) {
-                       talloc_free(tree2.u.equality.value.data);
-                       return -1;
-               }
-               if (ltdb_index_dn_objectclass(module, &tree2, 
-                                             index_list, list2) == 1) {
-                       if (list->count == 0) {
-                               *list = *list2;
-                               ret = 1;
-                       } else {
-                               list_union(ldb, list, list2);
-                               talloc_free(list2);
-                       }
-               }
-               talloc_free(tree2.u.equality.value.data);
-       }
-
-       return ret;
-}
+static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
 
 /*
   return a list of dn's that might match a leaf indexed search
  */
-static int ltdb_index_dn_leaf(struct ldb_module *module, 
+static int ltdb_index_dn_leaf(struct ldb_module *module,
                              const struct ldb_parse_tree *tree,
                              const struct ldb_message *index_list,
                              struct dn_list *list)
 {
-       if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
-               return ltdb_index_dn_objectclass(module, tree, index_list, list);
-       }
        if (ldb_attr_dn(tree->u.equality.attr) == 0) {
-               list->dn = talloc_array(list, char *, 1);
+               list->dn = talloc_array(list, struct ldb_val, 1);
                if (list->dn == NULL) {
-                       ldb_oom(module->ldb);
-                       return -1;
-               }
-               list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
-               if (list->dn[0] == NULL) {
-                       ldb_oom(module->ldb);
-                       return -1;
+                       ldb_module_oom(module);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
+               list->dn[0] = tree->u.equality.value;
                list->count = 1;
-               return 1;
+               return LDB_SUCCESS;
        }
        return ltdb_index_dn_simple(module, tree, index_list, list);
 }
@@ -341,166 +509,174 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
 /*
   list intersection
   list = list & list2
-  relies on the lists being sorted
 */
-static int list_intersect(struct ldb_context *ldb,
-                         struct dn_list *list, const struct dn_list *list2)
+static bool list_intersect(struct ldb_context *ldb,
+                          struct dn_list *list, const struct dn_list *list2)
 {
        struct dn_list *list3;
        unsigned int i;
 
-       if (list->count == 0 || list2->count == 0) {
+       if (list->count == 0) {
                /* 0 & X == 0 */
-               return 0;
-       }
-
-       list3 = talloc(ldb, struct dn_list);
+               return true;
+       }
+       if (list2->count == 0) {
+               /* X & 0 == 0 */
+               list->count = 0;
+               list->dn = NULL;
+               return true;
+       }
+
+       /* the indexing code is allowed to return a longer list than
+          what really matches, as all results are filtered by the
+          full expression at the end - this shortcut avoids a lot of
+          work in some cases */
+       if (list->count < 2 && list2->count > 10) {
+               return true;
+       }
+       if (list2->count < 2 && list->count > 10) {
+               list->count = list2->count;
+               list->dn = list2->dn;
+               /* note that list2 may not be the parent of list2->dn,
+                  as list2->dn may be owned by ltdb->idxptr. In that
+                  case we expect this reparent call to fail, which is
+                  OK */
+               talloc_reparent(list2, list, list2->dn);
+               return true;
+       }
+
+       list3 = talloc_zero(list, struct dn_list);
        if (list3 == NULL) {
-               return -1;
+               return false;
        }
 
-       list3->dn = talloc_array(list3, char *, list->count);
+       list3->dn = talloc_array(list3, struct ldb_val, list->count);
        if (!list3->dn) {
                talloc_free(list3);
-               return -1;
+               return false;
        }
        list3->count = 0;
 
        for (i=0;i<list->count;i++) {
-               if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
-                             sizeof(char *), (comparison_fn_t)strcmp) != -1) {
-                       list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
+               if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
+                       list3->dn[list3->count] = list->dn[i];
                        list3->count++;
-               } else {
-                       talloc_free(list->dn[i]);
-               }               
+               }
        }
 
-       talloc_free(list->dn);
-       list->dn = talloc_move(list, &list3->dn);
+       list->dn = talloc_steal(list, list3->dn);
        list->count = list3->count;
        talloc_free(list3);
 
-       return 0;
+       return true;
 }
 
 
 /*
   list union
   list = list | list2
-  relies on the lists being sorted
 */
-static int list_union(struct ldb_context *ldb, 
-                     struct dn_list *list, const struct dn_list *list2)
+static bool list_union(struct ldb_context *ldb,
+                      struct dn_list *list, const struct dn_list *list2)
 {
-       unsigned int i;
-       char **d;
-       unsigned int count = list->count;
+       struct ldb_val *dn3;
 
-       if (list->count == 0 && list2->count == 0) {
-               /* 0 | 0 == 0 */
-               return 0;
+       if (list2->count == 0) {
+               /* X | 0 == X */
+               return true;
        }
 
-       d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
-       if (!d) {
-               return -1;
+       if (list->count == 0) {
+               /* 0 | X == X */
+               list->count = list2->count;
+               list->dn = list2->dn;
+               /* note that list2 may not be the parent of list2->dn,
+                  as list2->dn may be owned by ltdb->idxptr. In that
+                  case we expect this reparent call to fail, which is
+                  OK */
+               talloc_reparent(list2, list, list2->dn);
+               return true;
        }
-       list->dn = d;
 
-       for (i=0;i<list2->count;i++) {
-               if (ldb_list_find(list2->dn[i], list->dn, count, 
-                             sizeof(char *), (comparison_fn_t)strcmp) == -1) {
-                       list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
-                       if (!list->dn[list->count]) {
-                               return -1;
-                       }
-                       list->count++;
-               }               
+       dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
+       if (!dn3) {
+               ldb_oom(ldb);
+               return false;
        }
 
-       if (list->count != count) {
-               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
-       }
+       /* we allow for duplicates here, and get rid of them later */
+       memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
+       memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
 
-       return 0;
+       list->dn = dn3;
+       list->count += list2->count;
+
+       return true;
 }
 
-static int ltdb_index_dn(struct ldb_module *module, 
+static int ltdb_index_dn(struct ldb_module *module,
                         const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list);
 
 
 /*
-  OR two index results
+  process an OR list (a union)
  */
-static int ltdb_index_dn_or(struct ldb_module *module, 
+static int ltdb_index_dn_or(struct ldb_module *module,
                            const struct ldb_parse_tree *tree,
                            const struct ldb_message *index_list,
                            struct dn_list *list)
 {
-       struct ldb_context *ldb = module->ldb;
+       struct ldb_context *ldb;
        unsigned int i;
-       int ret;
-       
-       ret = -1;
+
+       ldb = ldb_module_get_ctx(module);
+
        list->dn = NULL;
        list->count = 0;
 
-       for (i=0;i<tree->u.list.num_elements;i++) {
+       for (i=0; i<tree->u.list.num_elements; i++) {
                struct dn_list *list2;
-               int v;
+               int ret;
 
-               list2 = talloc(module, struct dn_list);
+               list2 = talloc_zero(list, struct dn_list);
                if (list2 == NULL) {
-                       return -1;
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
+               ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
 
-               if (v == 0) {
-                       /* 0 || X == X */
-                       if (ret == -1) {
-                               ret = 0;
-                       }
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* X || 0 == X */
                        talloc_free(list2);
                        continue;
                }
 
-               if (v == -1) {
-                       /* 1 || X == 1 */
-                       talloc_free(list->dn);
+               if (ret != LDB_SUCCESS) {
+                       /* X || * == * */
                        talloc_free(list2);
-                       return -1;
+                       return ret;
                }
 
-               if (ret == -1) {
-                       ret = 1;
-                       list->dn = talloc_move(list, &list2->dn);
-                       list->count = list2->count;
-               } else {
-                       if (list_union(ldb, list, list2) == -1) {
-                               talloc_free(list2);
-                               return -1;
-                       }
-                       ret = 1;
+               if (!list_union(ldb, list, list2)) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               talloc_free(list2);
        }
 
        if (list->count == 0) {
-               return 0;
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       return ret;
+       return LDB_SUCCESS;
 }
 
 
 /*
   NOT an index results
  */
-static int ltdb_index_dn_not(struct ldb_module *module, 
+static int ltdb_index_dn_not(struct ldb_module *module,
                             const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
@@ -508,86 +684,174 @@ static int ltdb_index_dn_not(struct ldb_module *module,
        /* the only way to do an indexed not would be if we could
           negate the not via another not or if we knew the total
           number of database elements so we could know that the
-          existing expression covered the whole database. 
-          
+          existing expression covered the whole database.
+
           instead, we just give up, and rely on a full index scan
           (unless an outer & manages to reduce the list)
        */
-       return -1;
+       return LDB_ERR_OPERATIONS_ERROR;
+}
+
+
+static bool ltdb_index_unique(struct ldb_context *ldb,
+                             const char *attr)
+{
+       const struct ldb_schema_attribute *a;
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return true;
+       }
+       return false;
 }
 
 /*
-  AND two index results
+  process an AND expression (intersection)
  */
-static int ltdb_index_dn_and(struct ldb_module *module, 
+static int ltdb_index_dn_and(struct ldb_module *module,
                             const struct ldb_parse_tree *tree,
                             const struct ldb_message *index_list,
                             struct dn_list *list)
 {
-       struct ldb_context *ldb = module->ldb;
+       struct ldb_context *ldb;
        unsigned int i;
-       int ret;
-       
-       ret = -1;
+       bool found;
+
+       ldb = ldb_module_get_ctx(module);
+
        list->dn = NULL;
        list->count = 0;
 
-       for (i=0;i<tree->u.list.num_elements;i++) {
+       /* in the first pass we only look for unique simple
+          equality tests, in the hope of avoiding having to look
+          at any others */
+       for (i=0; i<tree->u.list.num_elements; i++) {
+               const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
+               int ret;
+
+               if (subtree->operation != LDB_OP_EQUALITY ||
+                   !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
+                       continue;
+               }
+               
+               ret = ltdb_index_dn(module, subtree, index_list, list);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* 0 && X == 0 */
+                       return LDB_ERR_NO_SUCH_OBJECT;
+               }
+               if (ret == LDB_SUCCESS) {
+                       /* a unique index match means we can
+                        * stop. Note that we don't care if we return
+                        * a few too many objects, due to later
+                        * filtering */
+                       return LDB_SUCCESS;
+               }
+       }       
+
+       /* now do a full intersection */
+       found = false;
+
+       for (i=0; i<tree->u.list.num_elements; i++) {
+               const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
                struct dn_list *list2;
-               int v;
+               int ret;
 
-               list2 = talloc(module, struct dn_list);
+               list2 = talloc_zero(list, struct dn_list);
                if (list2 == NULL) {
-                       return -1;
+                       ldb_module_oom(module);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
+                       
+               ret = ltdb_index_dn(module, subtree, index_list, list2);
 
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
-
-               if (v == 0) {
-                       /* 0 && X == 0 */
-                       talloc_free(list->dn);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* X && 0 == 0 */
+                       list->dn = NULL;
+                       list->count = 0;
                        talloc_free(list2);
-                       return 0;
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
-
-               if (v == -1) {
+               
+               if (ret != LDB_SUCCESS) {
+                       /* this didn't adding anything */
                        talloc_free(list2);
                        continue;
                }
 
-               if (ret == -1) {
-                       ret = 1;
-                       talloc_free(list->dn);
-                       list->dn = talloc_move(list, &list2->dn);
+               if (!found) {
+                       talloc_reparent(list2, list, list->dn);
+                       list->dn = list2->dn;
                        list->count = list2->count;
-               } else {
-                       if (list_intersect(ldb, list, list2) == -1) {
-                               talloc_free(list2);
-                               return -1;
-                       }
+                       found = true;
+               } else if (!list_intersect(ldb, list, list2)) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-
-               talloc_free(list2);
-
+                       
                if (list->count == 0) {
-                       talloc_free(list->dn);
-                       return 0;
+                       list->dn = NULL;
+                       return LDB_ERR_NO_SUCH_OBJECT;
                }
+                       
+               if (list->count < 2) {
+                       /* it isn't worth loading the next part of the tree */
+                       return LDB_SUCCESS;
+               }
+       }       
+
+       if (!found) {
+               /* none of the attributes were indexed */
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       return ret;
+       return LDB_SUCCESS;
+}
+       
+/*
+  return a list of matching objects using a one-level index
+ */
+static int ltdb_index_dn_one(struct ldb_module *module,
+                            struct ldb_dn *parent_dn,
+                            struct dn_list *list)
+{
+       struct ldb_context *ldb;
+       struct ldb_dn *key;
+       struct ldb_val val;
+       int ret;
+
+       ldb = ldb_module_get_ctx(module);
+
+       /* work out the index key from the parent DN */
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
+       val.length = strlen((char *)val.data);
+       key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
+       if (!key) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_dn_list_load(module, key, list);
+       talloc_free(key);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       if (list->count == 0) {
+               return LDB_ERR_NO_SUCH_OBJECT;
+       }
+
+       return LDB_SUCCESS;
 }
 
 /*
   return a list of dn's that might match a indexed search or
-  -1 if an error. return 0 for no matches, or 1 for matches
+  an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
  */
-static int ltdb_index_dn(struct ldb_module *module, 
+static int ltdb_index_dn(struct ldb_module *module,
                         const struct ldb_parse_tree *tree,
                         const struct ldb_message *index_list,
                         struct dn_list *list)
 {
-       int ret = -1;
+       int ret = LDB_ERR_OPERATIONS_ERROR;
 
        switch (tree->operation) {
        case LDB_OP_AND:
@@ -613,7 +877,7 @@ static int ltdb_index_dn(struct ldb_module *module,
        case LDB_OP_APPROX:
        case LDB_OP_EXTENDED:
                /* we can't index with fancy bitops yet */
-               ret = -1;
+               ret = LDB_ERR_OPERATIONS_ERROR;
                break;
        }
 
@@ -624,381 +888,432 @@ static int ltdb_index_dn(struct ldb_module *module,
   filter a candidate dn_list from an indexed search into a set of results
   extracting just the given attributes
 */
-static int ltdb_index_filter(const struct dn_list *dn_list, 
-                            struct ldb_handle *handle)
+static int ltdb_index_filter(const struct dn_list *dn_list,
+                            struct ltdb_context *ac, 
+                            uint32_t *match_count)
 {
-       struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
-       struct ldb_reply *ares = NULL;
+       struct ldb_context *ldb;
+       struct ldb_message *msg;
        unsigned int i;
 
+       ldb = ldb_module_get_ctx(ac->module);
+
        for (i = 0; i < dn_list->count; i++) {
                struct ldb_dn *dn;
                int ret;
 
-               ares = talloc_zero(ac, struct ldb_reply);
-               if (!ares) {
-                       handle->status = LDB_ERR_OPERATIONS_ERROR;
-                       handle->state = LDB_ASYNC_DONE;
+               msg = ldb_msg_new(ac);
+               if (!msg) {
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ares->message = ldb_msg_new(ares);
-               if (!ares->message) {
-                       handle->status = LDB_ERR_OPERATIONS_ERROR;
-                       handle->state = LDB_ASYNC_DONE;
-                       talloc_free(ares);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-
-               dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
+               dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
                if (dn == NULL) {
-                       talloc_free(ares);
+                       talloc_free(msg);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = ltdb_search_dn1(ac->module, dn, ares->message);
+               ret = ltdb_search_dn1(ac->module, dn, msg);
                talloc_free(dn);
-               if (ret == 0) {
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
-                       talloc_free(ares);
+                       talloc_free(msg);
                        continue;
                }
 
-               if (ret == -1) {
+               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                        /* an internal error */
-                       talloc_free(ares);
+                       talloc_free(msg);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
-                       talloc_free(ares);
+               if (!ldb_match_msg(ldb, msg,
+                                  ac->tree, ac->base, ac->scope)) {
+                       talloc_free(msg);
                        continue;
                }
 
                /* filter the attributes that the user wants */
-               ret = ltdb_filter_attrs(ares->message, ac->attrs);
+               ret = ltdb_filter_attrs(msg, ac->attrs);
 
                if (ret == -1) {
-                       handle->status = LDB_ERR_OPERATIONS_ERROR;
-                       handle->state = LDB_ASYNC_DONE;
-                       talloc_free(ares);
+                       talloc_free(msg);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ares->type = LDB_REPLY_ENTRY;
-               handle->state = LDB_ASYNC_PENDING;
-               handle->status = ac->callback(ac->module->ldb, ac->context, ares);
-
-               if (handle->status != LDB_SUCCESS) {
-                       handle->state = LDB_ASYNC_DONE;
-                       return handle->status;
+               ret = ldb_module_send_entry(ac->req, msg, NULL);
+               if (ret != LDB_SUCCESS) {
+                       ac->request_terminated = true;
+                       return ret;
                }
+
+               (*match_count)++;
        }
 
        return LDB_SUCCESS;
 }
 
+/*
+  remove any duplicated entries in a indexed result
+ */
+static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
+{
+       int i, new_count;
+
+       if (list->count < 2) {
+               return;
+       }
+
+       qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
+
+       new_count = 1;
+       for (i=1; i<list->count; i++) {
+               if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
+                       if (new_count != i) {
+                               list->dn[new_count] = list->dn[i];
+                       }
+                       new_count++;
+               }
+       }
+       
+       list->count = new_count;
+}
+
 /*
   search the database with a LDAP-like expression using indexes
   returns -1 if an indexed search is not possible, in which
-  case the caller should call ltdb_search_full() 
+  case the caller should call ltdb_search_full()
 */
-int ltdb_search_indexed(struct ldb_handle *handle)
+int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
 {
-       struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
-       struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
        struct dn_list *dn_list;
        int ret;
 
-       if (ltdb->cache->indexlist->num_elements == 0 && 
+       /* see if indexing is enabled */
+       if (!ltdb->cache->attribute_indexes && 
+           !ltdb->cache->one_level_indexes &&
            ac->scope != LDB_SCOPE_BASE) {
-               /* no index list? must do full search */
-               return -1;
+               /* fallback to a full search */
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       dn_list = talloc(handle, struct dn_list);
+       dn_list = talloc_zero(ac, struct dn_list);
        if (dn_list == NULL) {
-               return -1;
+               ldb_module_oom(ac->module);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (ac->scope == LDB_SCOPE_BASE) {
-               /* with BASE searches only one DN can match */
-               dn_list->dn = talloc_array(dn_list, char *, 1);
+       switch (ac->scope) {
+       case LDB_SCOPE_BASE:
+               dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
                if (dn_list->dn == NULL) {
-                       ldb_oom(ac->module->ldb);
-                       return -1;
+                       ldb_module_oom(ac->module);
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
-               if (dn_list->dn[0] == NULL) {
-                       ldb_oom(ac->module->ldb);
-                       return -1;
+               dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
+               if (dn_list->dn[0].data == NULL) {
+                       ldb_module_oom(ac->module);
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
+               dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
                dn_list->count = 1;
-               ret = 1;
-       } else {
-               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
-       }
+               break;          
 
-       if (ret == 1) {
-               /* we've got a candidate list - now filter by the full tree
-                  and extract the needed attributes */
-               ret = ltdb_index_filter(dn_list, handle);
-               handle->status = ret;
-               handle->state = LDB_ASYNC_DONE;
+       case LDB_SCOPE_ONELEVEL:
+               if (!ltdb->cache->one_level_indexes) {
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
+               break;
+
+       case LDB_SCOPE_SUBTREE:
+       case LDB_SCOPE_DEFAULT:
+               if (!ltdb->cache->attribute_indexes) {
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
+               ltdb_dn_list_remove_duplicates(dn_list);
+               break;
        }
 
+       ret = ltdb_index_filter(dn_list, ac, match_count);
        talloc_free(dn_list);
-
        return ret;
 }
 
 /*
-  add a index element where this is the first indexed DN for this value
+  add an index entry for one message element
 */
-static int ltdb_index_add1_new(struct ldb_context *ldb, 
-                              struct ldb_message *msg,
-                              struct ldb_message_element *el,
-                              const char *dn)
+static int ltdb_index_add1(struct ldb_module *module, const char *dn,
+                          struct ldb_message_element *el, int v_idx)
 {
-       struct ldb_message_element *el2;
+       struct ldb_context *ldb;
+       struct ldb_dn *dn_key;
+       int ret;
+       const struct ldb_schema_attribute *a;
+       struct dn_list *list;
+       unsigned alloc_len;
 
-       /* add another entry */
-       el2 = talloc_realloc(msg, msg->elements, 
-                              struct ldb_message_element, msg->num_elements+1);
-       if (!el2) {
-               return -1;
+       ldb = ldb_module_get_ctx(module);
+
+       list = talloc_zero(module, struct dn_list);
+       if (list == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg->elements = el2;
-       msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
-       if (!msg->elements[msg->num_elements].name) {
-               return -1;
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
+       if (!dn_key) {
+               talloc_free(list);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
-       msg->elements[msg->num_elements].num_values = 0;
-       msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
-       if (!msg->elements[msg->num_elements].values) {
-               return -1;
+       talloc_steal(list, dn_key);
+
+       ret = ltdb_dn_list_load(module, dn_key, list);
+       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+               talloc_free(list);
+               return ret;
        }
-       msg->elements[msg->num_elements].values[0].length = strlen(dn);
-       msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
-       msg->elements[msg->num_elements].num_values = 1;
-       msg->num_elements++;
 
-       return 0;
+       if (ltdb_dn_list_find_str(list, dn) != -1) {
+               talloc_free(list);
+               return LDB_SUCCESS;
+       }
+
+       if (list->count > 0 &&
+           a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return LDB_ERR_ENTRY_ALREADY_EXISTS;            
+       }
+
+       /* overallocate the list a bit, to reduce the number of
+        * realloc trigered copies */    
+       alloc_len = ((list->count+1)+7) & ~7;
+       list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
+       if (list->dn == NULL) {
+               talloc_free(list);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
+       list->dn[list->count].length = strlen(dn);
+       list->count++;
+
+       ret = ltdb_dn_list_store(module, dn_key, list);
+
+       talloc_free(list);
+
+       return ret;
 }
 
+/*
+  add index entries for one elements in a message
+ */
+static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
+                            struct ldb_message_element *el)
+{
+       unsigned int i;
+       for (i = 0; i < el->num_values; i++) {
+               int ret = ltdb_index_add1(module, dn, el, i);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
 
 /*
-  add a index element where this is not the first indexed DN for this
-  value
-*/
-static int ltdb_index_add1_add(struct ldb_context *ldb, 
-                              struct ldb_message *msg,
-                              struct ldb_message_element *el,
-                              int idx,
-                              const char *dn)
+  add index entries for all elements in a message
+ */
+static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
+                             struct ldb_message_element *elements, int num_el)
 {
-       struct ldb_val *v2;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        unsigned int i;
 
-       /* for multi-valued attributes we can end up with repeats */
-       for (i=0;i<msg->elements[idx].num_values;i++) {
-               if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
-                       return 0;
-               }
+       if (dn[0] == '@') {
+               return LDB_SUCCESS;
        }
 
-       v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
-                             struct ldb_val, 
-                             msg->elements[idx].num_values+1);
-       if (!v2) {
-               return -1;
+       if (ltdb->cache->indexlist->num_elements == 0) {
+               /* no indexed fields */
+               return LDB_SUCCESS;
        }
-       msg->elements[idx].values = v2;
 
-       msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
-       msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
-       msg->elements[idx].num_values++;
+       for (i = 0; i < num_el; i++) {
+               int ret;
+               if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
+                       continue;
+               }
+               ret = ltdb_index_add_el(module, dn, &elements[i]);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
+
 /*
-  add an index entry for one message element
+  insert a one level index for a message
 */
-static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
-                          struct ldb_message_element *el, int v_idx)
+static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
 {
-       struct ldb_context *ldb = module->ldb;
-       struct ldb_message *msg;
-       struct ldb_dn *dn_key;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       struct ldb_message_element el;
+       struct ldb_val val;
+       struct ldb_dn *pdn;
+       const char *dn;
        int ret;
-       unsigned int i;
-
-       msg = talloc(module, struct ldb_message);
-       if (msg == NULL) {
-               errno = ENOMEM;
-               return -1;
-       }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
-       if (!dn_key) {
-               talloc_free(msg);
-               errno = ENOMEM;
-               return -1;
+       /* We index for ONE Level only if requested */
+       if (!ltdb->cache->one_level_indexes) {
+               return LDB_SUCCESS;
        }
-       talloc_steal(msg, dn_key);
 
-       ret = ltdb_search_dn1(module, dn_key, msg);
-       if (ret == -1) {
-               talloc_free(msg);
-               return -1;
+       pdn = ldb_dn_get_parent(module, msg->dn);
+       if (pdn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (ret == 0) {
-               msg->dn = dn_key;
-               msg->num_elements = 0;
-               msg->elements = NULL;
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       for (i=0;i<msg->num_elements;i++) {
-               if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
-                       break;
-               }
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
+       if (val.data == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (i == msg->num_elements) {
-               ret = ltdb_index_add1_new(ldb, msg, el, dn);
-       } else {
-               ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
-       }
+       val.length = strlen((char *)val.data);
+       el.name = LTDB_IDXONE;
+       el.values = &val;
+       el.num_values = 1;
 
-       if (ret == 0) {
-               ret = ltdb_store(module, msg, TDB_REPLACE);
+       if (add) {
+               ret = ltdb_index_add1(module, dn, &el, 0);
+       } else { /* delete */
+               ret = ltdb_index_del_value(module, dn, &el, 0);
        }
 
-       talloc_free(msg);
+       talloc_free(pdn);
 
        return ret;
 }
 
-static int ltdb_index_add0(struct ldb_module *module, const char *dn,
-                          struct ldb_message_element *elements, int num_el)
+/*
+  add the index entries for a new element in a record
+  The caller guarantees that these element values are not yet indexed
+*/
+int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
+                          struct ldb_message_element *el)
 {
-       struct ltdb_private *ltdb = module->private_data;
-       int ret;
-       unsigned int i, j;
-
-       if (dn[0] == '@') {
-               return 0;
-       }
-
-       if (ltdb->cache->indexlist->num_elements == 0) {
-               /* no indexed fields */
-               return 0;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       if (ldb_dn_is_special(dn)) {
+               return LDB_SUCCESS;
        }
-
-       for (i = 0; i < num_el; i++) {
-               ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
-                                      NULL, LTDB_IDXATTR);
-               if (ret == -1) {
-                       continue;
-               }
-               for (j = 0; j < elements[i].num_values; j++) {
-                       ret = ltdb_index_add1(module, dn, &elements[i], j);
-                       if (ret == -1) {
-                               return -1;
-                       }
-               }
+       if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
+               return LDB_SUCCESS;
        }
-
-       return 0;
+       return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
 }
 
 /*
   add the index entries for a new record
-  return -1 on failure
 */
-int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
+int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
 {
        const char *dn;
        int ret;
 
+       if (ldb_dn_is_special(msg->dn)) {
+               return LDB_SUCCESS;
+       }
+
        dn = ldb_dn_get_linearized(msg->dn);
        if (dn == NULL) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
 
-       return ret;
+       return ltdb_index_onelevel(module, msg, 1);
 }
 
 
 /*
   delete an index entry for one message element
 */
-int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
+int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                         struct ldb_message_element *el, int v_idx)
 {
-       struct ldb_context *ldb = module->ldb;
-       struct ldb_message *msg;
+       struct ldb_context *ldb;
        struct ldb_dn *dn_key;
        int ret, i;
-       unsigned int j;
+       struct dn_list *list;
+
+       ldb = ldb_module_get_ctx(module);
 
        if (dn[0] == '@') {
-               return 0;
+               return LDB_SUCCESS;
        }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
        if (!dn_key) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg = talloc(dn_key, struct ldb_message);
-       if (msg == NULL) {
+       list = talloc_zero(dn_key, struct dn_list);
+       if (list == NULL) {
                talloc_free(dn_key);
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, dn_key, msg);
-       if (ret == -1) {
+       ret = ltdb_dn_list_load(module, dn_key, list);
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               /* it wasn't indexed. Did we have an earlier error? If we did then
+                  its gone now */
                talloc_free(dn_key);
-               return -1;
+               return LDB_SUCCESS;
        }
 
-       if (ret == 0) {
-               /* it wasn't indexed. Did we have an earlier error? If we did then
-                  its gone now */
+       if (ret != LDB_SUCCESS) {
                talloc_free(dn_key);
-               return 0;
+               return ret;
        }
 
-       i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
+       i = ltdb_dn_list_find_str(list, dn);
        if (i == -1) {
-               ldb_debug(ldb, LDB_DEBUG_ERROR,
-                               "ERROR: dn %s not found in %s\n", dn,
-                               ldb_dn_get_linearized(dn_key));
-               /* it ain't there. hmmm */
+               /* nothing to delete */
                talloc_free(dn_key);
-               return 0;
+               return LDB_SUCCESS;             
        }
 
-       if (j != msg->elements[i].num_values - 1) {
-               memmove(&msg->elements[i].values[j], 
-                       &msg->elements[i].values[j+1], 
-                       (msg->elements[i].num_values-(j+1)) * 
-                       sizeof(msg->elements[i].values[0]));
+       if (i != list->count-1) {
+               memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
        }
-       msg->elements[i].num_values--;
+       list->count--;
+       list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
 
-       if (msg->elements[i].num_values == 0) {
-               ret = ltdb_delete_noindex(module, dn_key);
-       } else {
-               ret = ltdb_store(module, msg, TDB_REPLACE);
-       }
+       ret = ltdb_dn_list_store(module, dn_key, list);
 
        talloc_free(dn_key);
 
@@ -1006,46 +1321,75 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
 }
 
 /*
-  delete the index entries for a record
+  delete the index entries for a element
   return -1 on failure
 */
-int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
+int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
-       const char *dn;
-       unsigned int i, j;
+       unsigned int i;
 
-       /* find the list of indexed fields */   
-       if (ltdb->cache->indexlist->num_elements == 0) {
+       if (!ltdb->cache->attribute_indexes) {
                /* no indexed fields */
-               return 0;
+               return LDB_SUCCESS;
+       }
+
+       if (dn[0] == '@') {
+               return LDB_SUCCESS;
+       }
+
+       if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
+               return LDB_SUCCESS;
        }
+       for (i = 0; i < el->num_values; i++) {
+               ret = ltdb_index_del_value(module, dn, el, i);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
+/*
+  delete the index entries for a record
+  return -1 on failure
+*/
+int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
+{
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       int ret;
+       const char *dn;
+       unsigned int i;
 
        if (ldb_dn_is_special(msg->dn)) {
-               return 0;
+               return LDB_SUCCESS;
+       }
+
+       ret = ltdb_index_onelevel(module, msg, 0);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       if (!ltdb->cache->attribute_indexes) {
+               /* no indexed fields */
+               return LDB_SUCCESS;
        }
 
        dn = ldb_dn_get_linearized(msg->dn);
        if (dn == NULL) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        for (i = 0; i < msg->num_elements; i++) {
-               ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
-                                      NULL, LTDB_IDXATTR);
-               if (ret == -1) {
-                       continue;
-               }
-               for (j = 0; j < msg->elements[i].num_values; j++) {
-                       ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
-                       if (ret == -1) {
-                               return -1;
-                       }
+               ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
        }
 
-       return 0;
+       return LDB_SUCCESS;
 }
 
 
@@ -1066,12 +1410,15 @@ static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, vo
 */
 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
-       struct ldb_module *module = state;
+       struct ldb_context *ldb;
+       struct ldb_module *module = (struct ldb_module *)state;
        struct ldb_message *msg;
        const char *dn = NULL;
        int ret;
        TDB_DATA key2;
 
+       ldb = ldb_module_get_ctx(module);
+
        if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
            strncmp((char *)key.dptr, "DN=", 3) != 0) {
                return 0;
@@ -1084,16 +1431,18 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 
        ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                         ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
 
-       /* check if the DN key has changed, perhaps due to the 
+       /* check if the DN key has changed, perhaps due to the
           case insensitivity of an element changing */
        key2 = ltdb_key(module, msg->dn);
        if (key2.dptr == NULL) {
                /* probably a corrupt record ... darn */
-               ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
                                                        ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return 0;
@@ -1110,11 +1459,22 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                dn = ldb_dn_get_linearized(msg->dn);
        }
 
-       ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       ret = ltdb_index_onelevel(module, msg, 1);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                       "Adding special ONE LEVEL index failed (%s)!",
+                       ldb_dn_get_linearized(msg->dn));
+               talloc_free(msg);
+               return -1;
+       }
+
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
 
        talloc_free(msg);
 
-       return ret;
+       if (ret != LDB_SUCCESS) return -1;
+
+       return 0;
 }
 
 /*
@@ -1122,24 +1482,33 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 */
 int ltdb_reindex(struct ldb_module *module)
 {
-       struct ltdb_private *ltdb = module->private_data;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
 
        if (ltdb_cache_reload(module) != 0) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* first traverse the database deleting any @INDEX records */
        ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
        if (ret == -1) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if we don't have indexes we have nothing todo */
+       if (ltdb->cache->indexlist->num_elements == 0) {
+               return LDB_SUCCESS;
        }
 
        /* now traverse adding any indexes for normal LDB records */
        ret = tdb_traverse(ltdb->tdb, re_index, module);
        if (ret == -1) {
-               return -1;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       return 0;
+       if (ltdb->idxptr) {
+               ltdb->idxptr->repack = true;
+       }
+
+       return LDB_SUCCESS;
 }