s4-ldb: when taking a list intersection, the result can be as long as the first list
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index db0c31572e0f6a3c7a0f4a7f09871de3cd4f2856..709a2e1626df067e7b154c31e1d0e60ea0fdd554 100644 (file)
@@ -1,7 +1,7 @@
 /*
    ldb database library
 
-   Copyright (C) Andrew Tridgell  2004
+   Copyright (C) Andrew Tridgell  2004-2009
 
      ** NOTE! The following LGPL license applies to the ldb
      ** library. This does NOT imply that all of Samba is released
  */
 
 #include "ldb_tdb.h"
-#include "dlinklist.h"
 
-/*
-  the idxptr code is a bit unusual. The way it works is to replace
-  @IDX elements in records during a transaction with @IDXPTR
-  elements. The @IDXPTR elements don't contain the actual index entry
-  values, but contain a pointer to a linked list of values.
-
-  This means we are storing pointers in a database, which is normally
-  not allowed, but in this case we are storing them only for the
-  duration of a transaction, and re-writing them into the normal @IDX
-  format at the end of the transaction. That means no other processes
-  are ever exposed to the @IDXPTR values.
-
-  The advantage is that the linked list doesn't cause huge
-  fragmentation during a transaction. Without the @IDXPTR method we
-  often ended up with a ldb that was between 10x and 100x larger then
-  it needs to be due to massive fragmentation caused by re-writing
-  @INDEX records many times during indexing.
- */
-struct ldb_index_pointer {
-       struct ldb_index_pointer *next, *prev;
-       struct ldb_val value;
+struct dn_list {
+       unsigned int count;
+       struct ldb_val *dn;
 };
 
 struct ltdb_idxptr {
-       int num_dns;
-       const char **dn_list;
+       struct tdb_context *itdb;
        bool repack;
+       int error;
 };
 
-/*
-  add to the list of DNs that need to be fixed on transaction end
- */
-static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
+/* we put a @IDXVERSION attribute on index entries. This
+   allows us to tell if it was written by an older version
+*/
+#define LTDB_INDEXING_VERSION 2
+
+/* enable the idxptr mode when transactions start */
+int ltdb_index_transaction_start(struct ldb_module *module)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
-                                              const char *, ltdb->idxptr->num_dns+1);
-       if (ltdb->idxptr->dn_list == NULL) {
-               ltdb->idxptr->num_dns = 0;
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
-               talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
-       if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ltdb->idxptr->num_dns++;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
        return LDB_SUCCESS;
 }
 
-/* free an idxptr record */
-static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+/* compare two DN entries in a dn_list. Take account of possible
+ * differences in string termination */
+static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
 {
-       struct ldb_val val;
-       struct ldb_index_pointer *ptr;
-
-       if (el->num_values != 1) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       val = el->values[0];
-       if (val.length != sizeof(void *)) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
+       if (ret != 0) return ret;
+       if (v2->length > v1->length && v2->data[v1->length] != 0) {
+               return 1;
        }
+       return 0;
+}
 
-       ptr = *(struct ldb_index_pointer **)val.data;
-       if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
-       while (ptr) {
-               struct ldb_index_pointer *tmp = ptr;
-               DLIST_REMOVE(ptr, ptr);
-               talloc_free(tmp);
+/*
+  find a entry in a dn_list, using a ldb_val. Uses a case sensitive
+  comparison with the dn returns -1 if not found
+ */
+static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
+{
+       int i;
+       for (i=0; i<list->count; i++) {
+               if (dn_list_cmp(&list->dn[i], v) == 0) return i;
        }
-
-       return LDB_SUCCESS;
+       return -1;
 }
 
+/*
+  find a entry in a dn_list. Uses a case sensitive comparison with the dn
+  returns -1 if not found
+ */
+static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
+{
+       struct ldb_val v;
+       v.data = discard_const_p(unsigned char, dn);
+       v.length = strlen(dn);
+       return ltdb_dn_list_find_val(list, &v);
+}
 
-/* convert from the IDXPTR format to a ldb_message_element format */
-static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+/*
+  return the @IDX list in an index entry for a dn as a 
+  struct dn_list
+ */
+static int ltdb_dn_list_load(struct ldb_module *module,
+                            struct ldb_dn *dn, struct dn_list *list)
 {
-       struct ldb_val val;
-       struct ldb_index_pointer *ptr, *tmp;
-       int i;
-       struct ldb_val *val2;
+       struct ldb_message *msg;
+       int ret;
+       struct ldb_message_element *el;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       TDB_DATA rec;
+       struct dn_list *list2;
+       TDB_DATA key;
 
-       if (el->num_values != 1) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       list->dn = NULL;
+       list->count = 0;
+
+       /* see if we have any in-memory index entries */
+       if (ltdb->idxptr == NULL ||
+           ltdb->idxptr->itdb == NULL) {
+               goto normal_index;
        }
 
-       val = el->values[0];
-       if (val.length != sizeof(void *)) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
+       key.dsize = strlen((char *)key.dptr);
+
+       rec = tdb_fetch(ltdb->idxptr->itdb, key);
+       if (rec.dptr == NULL) {
+               goto normal_index;
        }
 
-       ptr = *(struct ldb_index_pointer **)val.data;
-       if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
+       /* we've found an in-memory index entry */
+       if (rec.dsize != sizeof(void *)) {
+               free(rec.dptr);
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                      "Bad internal index size %u", (unsigned)rec.dsize);
                return LDB_ERR_OPERATIONS_ERROR;
        }
+       list2 = *(struct dn_list **)rec.dptr;
+       free(rec.dptr);
 
-       /* count the length of the list */
-       for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
-               i++;
-       }
+       *list = *list2;
+       return LDB_SUCCESS;
 
-       /* allocate the new values array */
-       val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
-       if (val2 == NULL) {
+normal_index:
+       msg = ldb_msg_new(list);
+       if (msg == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       el->values = val2;
-       el->num_values = i;
 
-       /* populate the values array */
-       for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
-               el->values[i].length = tmp->value.length;
-               /* we need to over-allocate here as there are still some places
-                  in ldb that rely on null termination. */
-               el->values[i].data = talloc_size(el->values, tmp->value.length+1);
-               if (el->values[i].data == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
-               el->values[i].data[tmp->value.length] = 0;
+       ret = ltdb_search_dn1(module, dn, msg);
+       if (ret != LDB_SUCCESS) {
+               return ret;
        }
 
-       /* update the name */
-       el->name = LTDB_IDX;
+       /* TODO: check indexing version number */
 
-               return LDB_SUCCESS;
+       el = ldb_msg_find_element(msg, LTDB_IDX);
+       if (!el) {
+               talloc_free(msg);
+               return LDB_SUCCESS;
+       }
+
+       /* we avoid copying the strings by stealing the list */
+       list->dn = talloc_steal(list, el->values);
+       list->count = el->num_values;
+
+       return LDB_SUCCESS;
 }
 
 
-/* convert to the IDXPTR format from a ldb_message_element format */
-static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+/*
+  save a dn_list into a full @IDX style record
+ */
+static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
+                                  struct dn_list *list)
 {
-       struct ldb_index_pointer *ptr, *tmp;
-       int i;
-       struct ldb_val *val2;
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-
-       ptr = NULL;
+       struct ldb_message *msg;
+       int ret;
 
-       for (i=0;i<el->num_values;i++) {
-               tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
-               if (tmp == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               tmp->value = el->values[i];
-               tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
-               if (tmp->value.data == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               DLIST_ADD(ptr, tmp);
+       msg = ldb_msg_new(module);
+       if (!msg) {
+               ldb_module_oom(module);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* allocate the new values array */
-       val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
-       if (val2 == NULL) {
+       ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
+       if (ret != LDB_SUCCESS) {
+               ldb_module_oom(module);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       el->values = val2;
-       el->num_values = 1;
 
-       el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
-       el->values[0].length = sizeof(ptr);
-
-       /* update the name */
-       el->name = LTDB_IDXPTR;
-
-               return LDB_SUCCESS;
-}
+       msg->dn = dn;
+       if (list->count > 0) {
+               struct ldb_message_element *el;
 
+               ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
+               if (ret != LDB_SUCCESS) {
+                       ldb_module_oom(module);
+                       talloc_free(msg);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               el->values = list->dn;
+               el->num_values = list->count;
+       }
 
-/* enable the idxptr mode when transactions start */
-int ltdb_index_transaction_start(struct ldb_module *module)
-{
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
-       return LDB_SUCCESS;
+       ret = ltdb_store(module, msg, TDB_REPLACE);
+       talloc_free(msg);
+       return ret;
 }
 
 /*
-  a wrapper around ltdb_search_dn1() which translates pointer based index records
-  and maps them into normal ldb message structures
+  save a dn_list into the database, in either @IDX or internal format
  */
-static int ltdb_search_dn1_index(struct ldb_module *module,
-                               struct ldb_dn *dn, struct ldb_message *msg)
+static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
+                             struct dn_list *list)
 {
-       int ret, i;
-       ret = ltdb_search_dn1(module, dn, msg);
-       if (ret != LDB_SUCCESS) {
-               return ret;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       TDB_DATA rec, key;
+       int ret;
+       struct dn_list *list2;
+
+       if (ltdb->idxptr == NULL) {
+               return ltdb_dn_list_store_full(module, dn, list);
        }
 
-       /* if this isn't a @INDEX record then don't munge it */
-       if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ltdb->idxptr->itdb == NULL) {
+               ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
+               if (ltdb->idxptr->itdb == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        }
 
-       for (i=0;i<msg->num_elements;i++) {
-               struct ldb_message_element *el = &msg->elements[i];
-               if (strcmp(el->name, LTDB_IDXPTR) == 0) {
-                       ret = ltdb_convert_from_idxptr(module, el);
-                       if (ret != LDB_SUCCESS) {
-                               return ret;
-                       }
+       key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
+       key.dsize = strlen((char *)key.dptr);
+
+       rec = tdb_fetch(ltdb->idxptr->itdb, key);
+       if (rec.dptr != NULL) {
+               if (rec.dsize != sizeof(void *)) {
+                       free(rec.dptr);
+                       ldb_asprintf_errstring(ldb_module_get_ctx(module), 
+                                              "Bad internal index size %u", (unsigned)rec.dsize);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
+               list2 = *(struct dn_list **)rec.dptr;
+               free(rec.dptr);
+               list2->dn = talloc_steal(list2, list->dn);
+               list2->count = list->count;
+               return LDB_SUCCESS;
        }
 
-       return ret;
-}
+       list2 = talloc(ltdb->idxptr, struct dn_list);
+       if (list2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       list2->dn = talloc_steal(list2, list->dn);
+       list2->count = list->count;
+
+       rec.dptr = (uint8_t *)&list2;
+       rec.dsize = sizeof(void *);
 
+       ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
 
+       return ret;     
+}
 
 /*
-  fixup the idxptr for one DN
+  traverse function for storing the in-memory index entries on disk
  */
-static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
+static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
 {
-       struct ldb_context *ldb;
+       struct ldb_module *module = state;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        struct ldb_dn *dn;
-       struct ldb_message *msg = ldb_msg_new(module);
-       int ret;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_val v;
+       struct dn_list *list;
 
-       ldb = ldb_module_get_ctx(module);
+       if (data.dsize != sizeof(void *)) {
+               ldb_asprintf_errstring(ldb, "Bad internal index size %u", (unsigned)data.dsize);
+               ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+       
+       list = *(struct dn_list **)data.dptr;
+
+       v.data = key.dptr;
+       v.length = key.dsize;
 
-       dn = ldb_dn_new(msg, ldb, strdn);
-       if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
-               ret = ltdb_store(module, msg, TDB_REPLACE);
+       dn = ldb_dn_from_ldb_val(module, ldb, &v);
+       if (dn == NULL) {
+               ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
        }
-       talloc_free(msg);
-       return ret;
+
+       ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
+       talloc_free(dn);
+       return ltdb->idxptr->error;
 }
 
 /* cleanup the idxptr mode when transaction commits */
 int ltdb_index_transaction_commit(struct ldb_module *module)
 {
-       int i;
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       int ret;
 
-       /* fix all the DNs that we have modified */
-       if (ltdb->idxptr) {
-               for (i=0;i<ltdb->idxptr->num_dns;i++) {
-                       ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
-               }
+       if (ltdb->idxptr->itdb) {
+               tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
+               tdb_close(ltdb->idxptr->itdb);
+       }
 
-               if (ltdb->idxptr->repack) {
-                       tdb_repack(ltdb->tdb);
-               }
+       ret = ltdb->idxptr->error;
+
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(module);
+               ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
        }
 
        talloc_free(ltdb->idxptr);
        ltdb->idxptr = NULL;
-       return LDB_SUCCESS;
+       return ret;
 }
 
 /* cleanup the idxptr mode when transaction cancels */
 int ltdb_index_transaction_cancel(struct ldb_module *module)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       if (ltdb->idxptr && ltdb->idxptr->itdb) {
+               tdb_close(ltdb->idxptr->itdb);
+       }
        talloc_free(ltdb->idxptr);
        ltdb->idxptr = NULL;
        return LDB_SUCCESS;
 }
 
 
-
-/* a wrapper around ltdb_store() for the index code which
-   stores in IDXPTR format when idxptr mode is enabled
-
-   WARNING: This modifies the msg which is passed in
-*/
-int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
-{
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       int ret;
-
-       if (ltdb->idxptr) {
-               int i;
-               struct ldb_message *msg2 = ldb_msg_new(module);
-
-               /* free any old pointer */
-               ret = ltdb_search_dn1(module, msg->dn, msg2);
-               if (ret == 0) {
-                       for (i=0;i<msg2->num_elements;i++) {
-                               struct ldb_message_element *el = &msg2->elements[i];
-                               if (strcmp(el->name, LTDB_IDXPTR) == 0) {
-                                       ret = ltdb_free_idxptr(module, el);
-                                       if (ret != LDB_SUCCESS) {
-                                               return ret;
-                                       }
-                               }
-                       }
-               }
-               talloc_free(msg2);
-
-               for (i=0;i<msg->num_elements;i++) {
-                       struct ldb_message_element *el = &msg->elements[i];
-                       if (strcmp(el->name, LTDB_IDX) == 0) {
-                               ret = ltdb_convert_to_idxptr(module, el);
-                               if (ret != LDB_SUCCESS) {
-                                       return ret;
-                               }
-                       }
-               }
-
-               if (ltdb_idxptr_add(module, msg) != 0) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-       }
-
-       ret = ltdb_store(module, msg, flgs);
-       return ret;
-}
-
-
-/*
-  find an element in a list, using the given comparison function and
-  assuming that the list is already sorted using comp_fn
-
-  return -1 if not found, or the index of the first occurance of needle if found
-*/
-static int ldb_list_find(const void *needle,
-                        const void *base, size_t nmemb, size_t size,
-                        comparison_fn_t comp_fn)
-{
-       const char *base_p = (const char *)base;
-       size_t min_i, max_i, test_i;
-
-       if (nmemb == 0) {
-               return -1;
-       }
-
-       min_i = 0;
-       max_i = nmemb-1;
-
-       while (min_i < max_i) {
-               int r;
-
-               test_i = (min_i + max_i) / 2;
-               /* the following cast looks strange, but is
-                correct. The key to understanding it is that base_p
-                is a pointer to an array of pointers, so we have to
-                dereference it after casting to void **. The strange
-                const in the middle gives us the right type of pointer
-                after the dereference  (tridge) */
-               r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
-               if (r == 0) {
-                       /* scan back for first element */
-                       while (test_i > 0 &&
-                              comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
-                               test_i--;
-                       }
-                       return test_i;
-               }
-               if (r < 0) {
-                       if (test_i == 0) {
-                               return -1;
-                       }
-                       max_i = test_i - 1;
-               }
-               if (r > 0) {
-                       min_i = test_i + 1;
-               }
-       }
-
-       if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
-               return min_i;
-       }
-
-       return -1;
-}
-
-struct dn_list {
-       unsigned int count;
-       char **dn;
-};
-
 /*
   return the dn key to be used for an index
-  caller frees
+  the caller is responsible for freeing
 */
 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
-                                    const char *attr, const struct ldb_val *value)
+                                    const char *attr, const struct ldb_val *value,
+                                    const struct ldb_schema_attribute **ap)
 {
        struct ldb_dn *ret;
        struct ldb_val v;
@@ -440,6 +352,9 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
        }
 
        a = ldb_schema_attribute_by_name(ldb, attr);
+       if (ap) {
+               *ap = a;
+       }
        r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
        if (r != LDB_SUCCESS) {
                const char *errstr = ldb_errstring(ldb);
@@ -451,7 +366,7 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
                talloc_free(attr_folded);
                return NULL;
        }
-       if (ldb_should_b64_encode(&v)) {
+       if (ldb_should_b64_encode(ldb, &v)) {
                char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
                if (!vstr) return NULL;
                ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
@@ -471,41 +386,38 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
 /*
   see if a attribute value is in the list of indexed attributes
 */
-static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
-                           unsigned int *v_idx, const char *key)
+static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
 {
-       unsigned int i, j;
-       for (i=0;i<msg->num_elements;i++) {
-               if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
-                       const struct ldb_message_element *el = &msg->elements[i];
-
-                       if (attr == NULL) {
-                               /* in this case we are just looking to see if key is present,
-                                  we are not spearching for a specific index */
-                               return 0;
-                       }
+       unsigned int i;
+       struct ldb_message_element *el;
 
-                       for (j=0;j<el->num_values;j++) {
-                               if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
-                                       if (v_idx) {
-                                               *v_idx = j;
-                                       }
-                                       return i;
-                               }
-                       }
+       el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
+       if (el == NULL) {
+               return false;
+       }
+       for (i=0; i<el->num_values; i++) {
+               if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
+                       return true;
                }
        }
-       return -1;
+       return false;
 }
 
-/* used in sorting dn lists */
-static int list_cmp(const char **s1, const char **s2)
-{
-       return strcmp(*s1, *s2);
-}
+/*
+  in the following logic functions, the return value is treated as
+  follows:
+
+     LDB_SUCCESS: we found some matching index values
+
+     LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
+
+     LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
+                               we'll need a full search
+ */
 
 /*
-  return a list of dn's that might match a simple indexed search or
+  return a list of dn's that might match a simple indexed search (an
+  equality search only)
  */
 static int ltdb_index_dn_simple(struct ldb_module *module,
                                const struct ldb_parse_tree *tree,
@@ -515,8 +427,6 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
        struct ldb_context *ldb;
        struct ldb_dn *dn;
        int ret;
-       unsigned int i, j;
-       struct ldb_message *msg;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -525,63 +435,22 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
 
        /* if the attribute isn't in the list of indexed attributes then
           this node needs a full search */
-       if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
+       if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* the attribute is indexed. Pull the list of DNs that match the 
           search criterion */
-       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
+       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
        if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       msg = talloc(list, struct ldb_message);
-       if (msg == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       ret = ltdb_search_dn1_index(module, dn, msg);
+       ret = ltdb_dn_list_load(module, dn, list);
        talloc_free(dn);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
-       for (i=0;i<msg->num_elements;i++) {
-               struct ldb_message_element *el;
-
-               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
-                       continue;
-               }
-
-               el = &msg->elements[i];
-
-               list->dn = talloc_array(list, char *, el->num_values);
-               if (!list->dn) {
-                       talloc_free(msg);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               for (j=0;j<el->num_values;j++) {
-                       list->dn[list->count] =
-                               talloc_strdup(list->dn, (char *)el->values[j].data);
-                       if (!list->dn[list->count]) {
-                               talloc_free(msg);
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
-                       list->count++;
-               }
-       }
-
-       talloc_free(msg);
-
-       if (list->count > 1) {
-               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
-       }
-
-       return LDB_SUCCESS;
+       return ret;
 }
 
 
-static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
+static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
 
 /*
   return a list of dn's that might match a leaf indexed search
@@ -591,20 +460,13 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
                              const struct ldb_message *index_list,
                              struct dn_list *list)
 {
-       struct ldb_context *ldb;
-       ldb = ldb_module_get_ctx(module);
-
        if (ldb_attr_dn(tree->u.equality.attr) == 0) {
-               list->dn = talloc_array(list, char *, 1);
+               list->dn = talloc_array(list, struct ldb_val, 1);
                if (list->dn == NULL) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
-               if (list->dn[0] == NULL) {
-                       ldb_oom(ldb);
+                       ldb_module_oom(module);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
+               list->dn[0] = tree->u.equality.value;
                list->count = 1;
                return LDB_SUCCESS;
        }
@@ -615,89 +477,109 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
 /*
   list intersection
   list = list & list2
-  relies on the lists being sorted
 */
-static int list_intersect(struct ldb_context *ldb,
-                         struct dn_list *list, const struct dn_list *list2)
+static bool list_intersect(struct ldb_context *ldb,
+                          struct dn_list *list, const struct dn_list *list2)
 {
        struct dn_list *list3;
        unsigned int i;
 
-       if (list->count == 0 || list2->count == 0) {
+       if (list->count == 0) {
                /* 0 & X == 0 */
-               return LDB_ERR_NO_SUCH_OBJECT;
+               return true;
+       }
+       if (list2->count == 0) {
+               /* X & 0 == 0 */
+               list->count = 0;
+               list->dn = NULL;
+               return true;
        }
 
-       list3 = talloc(ldb, struct dn_list);
+       /* the indexing code is allowed to return a longer list than
+          what really matches, as all results are filtered by the
+          full expression at the end - this shortcut avoids a lot of
+          work in some cases */
+       if (list->count < 2 && list2->count > 10) {
+               return true;
+       }
+       if (list2->count < 2 && list->count > 10) {
+               list->count = list2->count;
+               list->dn = list2->dn;
+               /* note that list2 may not be the parent of list2->dn,
+                  as list2->dn may be owned by ltdb->idxptr. In that
+                  case we expect this reparent call to fail, which is
+                  OK */
+               talloc_reparent(list2, list, list2->dn);
+               return true;
+       }
+
+       list3 = talloc_zero(list, struct dn_list);
        if (list3 == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
+               return false;
        }
 
-       list3->dn = talloc_array(list3, char *, list->count);
+       list3->dn = talloc_array(list3, struct ldb_val, list->count);
        if (!list3->dn) {
                talloc_free(list3);
-               return LDB_ERR_OPERATIONS_ERROR;
+               return false;
        }
        list3->count = 0;
 
        for (i=0;i<list->count;i++) {
-               if (ldb_list_find(list->dn[i], list2->dn, list2->count,
-                             sizeof(char *), (comparison_fn_t)strcmp) != -1) {
-                       list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
+               if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
+                       list3->dn[list3->count] = list->dn[i];
                        list3->count++;
-               } else {
-                       talloc_free(list->dn[i]);
                }
        }
 
-       talloc_free(list->dn);
-       list->dn = talloc_move(list, &list3->dn);
+       list->dn = talloc_steal(list, list3->dn);
        list->count = list3->count;
        talloc_free(list3);
 
-       return LDB_ERR_NO_SUCH_OBJECT;
+       return true;
 }
 
 
 /*
   list union
   list = list | list2
-  relies on the lists being sorted
 */
-static int list_union(struct ldb_context *ldb,
-                     struct dn_list *list, const struct dn_list *list2)
+static bool list_union(struct ldb_context *ldb,
+                      struct dn_list *list, const struct dn_list *list2)
 {
-       unsigned int i;
-       char **d;
-       unsigned int count = list->count;
+       struct ldb_val *dn3;
 
-       if (list->count == 0 && list2->count == 0) {
-               /* 0 | 0 == 0 */
-               return LDB_ERR_NO_SUCH_OBJECT;
+       if (list2->count == 0) {
+               /* X | 0 == X */
+               return true;
        }
 
-       d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
-       if (!d) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (list->count == 0) {
+               /* 0 | X == X */
+               list->count = list2->count;
+               list->dn = list2->dn;
+               /* note that list2 may not be the parent of list2->dn,
+                  as list2->dn may be owned by ltdb->idxptr. In that
+                  case we expect this reparent call to fail, which is
+                  OK */
+               talloc_reparent(list2, list, list2->dn);
+               return true;
        }
-       list->dn = d;
 
-       for (i=0;i<list2->count;i++) {
-               if (ldb_list_find(list2->dn[i], list->dn, count,
-                             sizeof(char *), (comparison_fn_t)strcmp) == -1) {
-                       list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
-                       if (!list->dn[list->count]) {
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
-                       list->count++;
-               }
+       dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
+       if (!dn3) {
+               ldb_oom(ldb);
+               return false;
        }
 
-       if (list->count != count) {
-               qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
-       }
+       /* we allow for duplicates here, and get rid of them later */
+       memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
+       memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
+
+       list->dn = dn3;
+       list->count += list2->count;
 
-       return LDB_ERR_NO_SUCH_OBJECT;
+       return true;
 }
 
 static int ltdb_index_dn(struct ldb_module *module,
@@ -707,7 +589,7 @@ static int ltdb_index_dn(struct ldb_module *module,
 
 
 /*
-  OR two index results
+  process an OR list (a union)
  */
 static int ltdb_index_dn_or(struct ldb_module *module,
                            const struct ldb_parse_tree *tree,
@@ -716,60 +598,46 @@ static int ltdb_index_dn_or(struct ldb_module *module,
 {
        struct ldb_context *ldb;
        unsigned int i;
-       int ret;
 
        ldb = ldb_module_get_ctx(module);
 
-       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
-       for (i=0;i<tree->u.list.num_elements;i++) {
+       for (i=0; i<tree->u.list.num_elements; i++) {
                struct dn_list *list2;
-               int v;
+               int ret;
 
-               list2 = talloc(module, struct dn_list);
+               list2 = talloc_zero(list, struct dn_list);
                if (list2 == NULL) {
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
+               ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
 
-               if (v == LDB_ERR_NO_SUCH_OBJECT) {
-                       /* 0 || X == X */
-                       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-                               ret = v;
-                       }
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* X || 0 == X */
                        talloc_free(list2);
                        continue;
                }
 
-               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
-                       /* 1 || X == 1 */
-                       talloc_free(list->dn);
+               if (ret != LDB_SUCCESS) {
+                       /* X || * == * */
                        talloc_free(list2);
-                       return v;
+                       return ret;
                }
 
-               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-                       ret = LDB_SUCCESS;
-                       list->dn = talloc_move(list, &list2->dn);
-                       list->count = list2->count;
-               } else {
-                       if (list_union(ldb, list, list2) == -1) {
-                               talloc_free(list2);
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
-                       ret = LDB_SUCCESS;
+               if (!list_union(ldb, list, list2)) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-               talloc_free(list2);
        }
 
        if (list->count == 0) {
                return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       return ret;
+       return LDB_SUCCESS;
 }
 
 
@@ -792,8 +660,20 @@ static int ltdb_index_dn_not(struct ldb_module *module,
        return LDB_ERR_OPERATIONS_ERROR;
 }
 
+
+static bool ltdb_index_unique(struct ldb_context *ldb,
+                             const char *attr)
+{
+       const struct ldb_schema_attribute *a;
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return true;
+       }
+       return false;
+}
+
 /*
-  AND two index results
+  process an AND expression (intersection)
  */
 static int ltdb_index_dn_and(struct ldb_module *module,
                             const struct ldb_parse_tree *tree,
@@ -802,156 +682,131 @@ static int ltdb_index_dn_and(struct ldb_module *module,
 {
        struct ldb_context *ldb;
        unsigned int i;
-       int ret;
+       bool found;
 
        ldb = ldb_module_get_ctx(module);
 
-       ret = LDB_ERR_OPERATIONS_ERROR;
        list->dn = NULL;
        list->count = 0;
 
-       for (i=0;i<tree->u.list.num_elements;i++) {
+       /* in the first pass we only look for unique simple
+          equality tests, in the hope of avoiding having to look
+          at any others */
+       for (i=0; i<tree->u.list.num_elements; i++) {
+               const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
+               int ret;
+
+               if (subtree->operation != LDB_OP_EQUALITY ||
+                   !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
+                       continue;
+               }
+               
+               ret = ltdb_index_dn(module, subtree, index_list, list);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* 0 && X == 0 */
+                       return LDB_ERR_NO_SUCH_OBJECT;
+               }
+               if (ret == LDB_SUCCESS) {
+                       /* a unique index match means we can
+                        * stop. Note that we don't care if we return
+                        * a few too many objects, due to later
+                        * filtering */
+                       return LDB_SUCCESS;
+               }
+       }       
+
+       /* now do a full intersection */
+       found = false;
+
+       for (i=0; i<tree->u.list.num_elements; i++) {
+               const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
                struct dn_list *list2;
-               int v;
+               int ret;
 
-               list2 = talloc(module, struct dn_list);
+               list2 = talloc_zero(list, struct dn_list);
                if (list2 == NULL) {
+                       ldb_module_oom(module);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
+                       
+               ret = ltdb_index_dn(module, subtree, index_list, list2);
 
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
-
-               if (v == LDB_ERR_NO_SUCH_OBJECT) {
-                       /* 0 && X == 0 */
-                       talloc_free(list->dn);
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       /* X && 0 == 0 */
+                       list->dn = NULL;
+                       list->count = 0;
                        talloc_free(list2);
                        return LDB_ERR_NO_SUCH_OBJECT;
                }
-
-               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
+               
+               if (ret != LDB_SUCCESS) {
+                       /* this didn't adding anything */
                        talloc_free(list2);
                        continue;
                }
 
-               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-                       ret = LDB_SUCCESS;
-                       talloc_free(list->dn);
-                       list->dn = talloc_move(list, &list2->dn);
+               if (!found) {
+                       talloc_reparent(list2, list, list->dn);
+                       list->dn = list2->dn;
                        list->count = list2->count;
-               } else {
-                       if (list_intersect(ldb, list, list2) == -1) {
-                               talloc_free(list2);
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
+                       found = true;
+               } else if (!list_intersect(ldb, list, list2)) {
+                       talloc_free(list2);
+                       return LDB_ERR_OPERATIONS_ERROR;
                }
-
-               talloc_free(list2);
-
+                       
                if (list->count == 0) {
-                       talloc_free(list->dn);
+                       list->dn = NULL;
                        return LDB_ERR_NO_SUCH_OBJECT;
                }
+                       
+               if (list->count < 2) {
+                       /* it isn't worth loading the next part of the tree */
+                       return LDB_SUCCESS;
+               }
+       }       
+
+       if (!found) {
+               /* none of the attributes were indexed */
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       return ret;
+       return LDB_SUCCESS;
 }
-
+       
 /*
-  AND index results and ONE level special index
+  return a list of matching objects using a one-level index
  */
 static int ltdb_index_dn_one(struct ldb_module *module,
                             struct ldb_dn *parent_dn,
                             struct dn_list *list)
 {
        struct ldb_context *ldb;
-       struct dn_list *list2;
-       struct ldb_message *msg;
        struct ldb_dn *key;
        struct ldb_val val;
-       unsigned int i, j;
        int ret;
 
        ldb = ldb_module_get_ctx(module);
 
-       list2 = talloc_zero(module, struct dn_list);
-       if (list2 == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       /* the attribute is indexed. Pull the list of DNs that match the
-          search criterion */
+       /* work out the index key from the parent DN */
        val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
        val.length = strlen((char *)val.data);
-       key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
+       key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
        if (!key) {
-               talloc_free(list2);
+               ldb_oom(ldb);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg = talloc(list2, struct ldb_message);
-       if (msg == NULL) {
-               talloc_free(list2);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       ret = ltdb_search_dn1_index(module, key, msg);
+       ret = ltdb_dn_list_load(module, key, list);
        talloc_free(key);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       for (i = 0; i < msg->num_elements; i++) {
-               struct ldb_message_element *el;
-
-               if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
-                       continue;
-               }
-
-               el = &msg->elements[i];
-
-               list2->dn = talloc_array(list2, char *, el->num_values);
-               if (!list2->dn) {
-                       talloc_free(list2);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               for (j = 0; j < el->num_values; j++) {
-                       list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
-                       if (!list2->dn[list2->count]) {
-                               talloc_free(list2);
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
-                       list2->count++;
-               }
-       }
-
-       if (list2->count == 0) {
-               talloc_free(list2);
+       if (list->count == 0) {
                return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       if (list2->count > 1) {
-               qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
-       }
-
-       if (list->count > 0) {
-               if (list_intersect(ldb, list, list2) == -1) {
-                       talloc_free(list2);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               if (list->count == 0) {
-                       talloc_free(list->dn);
-                       talloc_free(list2);
-                       return LDB_ERR_NO_SUCH_OBJECT;
-               }
-       } else {
-               list->dn = talloc_move(list, &list2->dn);
-               list->count = list2->count;
-       }
-
-       talloc_free(list2);
-
        return LDB_SUCCESS;
 }
 
@@ -1002,7 +857,8 @@ static int ltdb_index_dn(struct ldb_module *module,
   extracting just the given attributes
 */
 static int ltdb_index_filter(const struct dn_list *dn_list,
-                            struct ltdb_context *ac)
+                            struct ltdb_context *ac, 
+                            uint32_t *match_count)
 {
        struct ldb_context *ldb;
        struct ldb_message *msg;
@@ -1019,7 +875,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
+               dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
                if (dn == NULL) {
                        talloc_free(msg);
                        return LDB_ERR_OPERATIONS_ERROR;
@@ -1058,156 +914,114 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        ac->request_terminated = true;
                        return ret;
                }
+
+               (*match_count)++;
        }
 
        return LDB_SUCCESS;
 }
 
+/*
+  remove any duplicated entries in a indexed result
+ */
+static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
+{
+       int i, new_count;
+
+       if (list->count < 2) {
+               return;
+       }
+
+       qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
+
+       new_count = 1;
+       for (i=1; i<list->count; i++) {
+               if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
+                       if (new_count != i) {
+                               list->dn[new_count] = list->dn[i];
+                       }
+                       new_count++;
+               }
+       }
+       
+       list->count = new_count;
+}
+
 /*
   search the database with a LDAP-like expression using indexes
   returns -1 if an indexed search is not possible, in which
   case the caller should call ltdb_search_full()
 */
-int ltdb_search_indexed(struct ltdb_context *ac)
+int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
 {
-       struct ldb_context *ldb;
-       void *data = ldb_module_get_private(ac->module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
        struct dn_list *dn_list;
-       int ret, idxattr, idxone;
-
-       ldb = ldb_module_get_ctx(ac->module);
-
-       idxattr = idxone = 0;
-       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
-       if (ret == 0 ) {
-               idxattr = 1;
-       }
-
-       /* We do one level indexing only if requested */
-       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
-       if (ret == 0 ) {
-               idxone = 1;
-       }
+       int ret;
 
-       if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
-           (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
-               /* no indexes? must do full search */
+       /* see if indexing is enabled */
+       if (!ltdb->cache->attribute_indexes && 
+           !ltdb->cache->one_level_indexes &&
+           ac->scope != LDB_SCOPE_BASE) {
+               /* fallback to a full search */
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = LDB_ERR_OPERATIONS_ERROR;
-
        dn_list = talloc_zero(ac, struct dn_list);
        if (dn_list == NULL) {
+               ldb_module_oom(ac->module);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (ac->scope == LDB_SCOPE_BASE) {
-               /* with BASE searches only one DN can match */
-               dn_list->dn = talloc_array(dn_list, char *, 1);
+       switch (ac->scope) {
+       case LDB_SCOPE_BASE:
+               dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
                if (dn_list->dn == NULL) {
-                       ldb_oom(ldb);
+                       ldb_module_oom(ac->module);
+                       talloc_free(dn_list);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
-               dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
-               if (dn_list->dn[0] == NULL) {
-                       ldb_oom(ldb);
+               dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
+               if (dn_list->dn[0].data == NULL) {
+                       ldb_module_oom(ac->module);
+                       talloc_free(dn_list);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
+               dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
                dn_list->count = 1;
-               ret = LDB_SUCCESS;
-       }
-
-       if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
-               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
-       }
+               break;          
 
-       if (ret == LDB_ERR_OPERATIONS_ERROR &&
-           ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
+       case LDB_SCOPE_ONELEVEL:
+               if (!ltdb->cache->one_level_indexes) {
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
                ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
-       }
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
+               break;
 
-       if (ret == LDB_SUCCESS) {
-               /* we've got a candidate list - now filter by the full tree
-                  and extract the needed attributes */
-               ret = ltdb_index_filter(dn_list, ac);
+       case LDB_SCOPE_SUBTREE:
+       case LDB_SCOPE_DEFAULT:
+               if (!ltdb->cache->attribute_indexes) {
+                       talloc_free(dn_list);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(dn_list);
+                       return ret;
+               }
+               ltdb_dn_list_remove_duplicates(dn_list);
+               break;
        }
 
+       ret = ltdb_index_filter(dn_list, ac, match_count);
        talloc_free(dn_list);
-
        return ret;
 }
 
-/*
-  add a index element where this is the first indexed DN for this value
-*/
-static int ltdb_index_add1_new(struct ldb_context *ldb,
-                              struct ldb_message *msg,
-                              const char *dn)
-{
-       struct ldb_message_element *el;
-
-       /* add another entry */
-       el = talloc_realloc(msg, msg->elements,
-                              struct ldb_message_element, msg->num_elements+1);
-       if (!el) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       msg->elements = el;
-       msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
-       if (!msg->elements[msg->num_elements].name) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       msg->elements[msg->num_elements].num_values = 0;
-       msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
-       if (!msg->elements[msg->num_elements].values) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       msg->elements[msg->num_elements].values[0].length = strlen(dn);
-       msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
-       msg->elements[msg->num_elements].num_values = 1;
-       msg->num_elements++;
-
-       return LDB_SUCCESS;
-}
-
-
-/*
-  add a index element where this is not the first indexed DN for this
-  value
-*/
-static int ltdb_index_add1_add(struct ldb_context *ldb,
-                              struct ldb_message *msg,
-                              int idx,
-                              const char *dn)
-{
-       struct ldb_val *v2;
-       unsigned int i;
-
-       /* for multi-valued attributes we can end up with repeats */
-       for (i=0;i<msg->elements[idx].num_values;i++) {
-               if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
-                       return LDB_SUCCESS;
-               }
-       }
-
-       v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
-                             struct ldb_val,
-                             msg->elements[idx].num_values+1);
-       if (!v2) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       msg->elements[idx].values = v2;
-
-       msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
-       msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
-       msg->elements[idx].num_values++;
-
-       return LDB_SUCCESS;
-}
-
 /*
   add an index entry for one message element
 */
@@ -1215,66 +1029,82 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
                           struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb;
-       struct ldb_message *msg;
        struct ldb_dn *dn_key;
        int ret;
-       unsigned int i;
+       const struct ldb_schema_attribute *a;
+       struct dn_list *list;
 
        ldb = ldb_module_get_ctx(module);
 
-       msg = talloc(module, struct ldb_message);
-       if (msg == NULL) {
-               errno = ENOMEM;
+       list = talloc_zero(module, struct dn_list);
+       if (list == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
        if (!dn_key) {
-               talloc_free(msg);
+               talloc_free(list);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       talloc_steal(msg, dn_key);
+       talloc_steal(list, dn_key);
 
-       ret = ltdb_search_dn1_index(module, dn_key, msg);
+       ret = ltdb_dn_list_load(module, dn_key, list);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-               talloc_free(msg);
+               talloc_free(list);
                return ret;
        }
 
-       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
-               msg->dn = dn_key;
-               msg->num_elements = 0;
-               msg->elements = NULL;
+       if (ltdb_dn_list_find_str(list, dn) != -1) {
+               talloc_free(list);
+               return LDB_SUCCESS;
        }
 
-       for (i=0;i<msg->num_elements;i++) {
-               if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
-                       break;
-               }
+       if (list->count > 0 &&
+           a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return LDB_ERR_ENTRY_ALREADY_EXISTS;            
        }
 
-       if (i == msg->num_elements) {
-               ret = ltdb_index_add1_new(ldb, msg, dn);
-       } else {
-               ret = ltdb_index_add1_add(ldb, msg, i, dn);
+       list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
+       if (list->dn == NULL) {
+               talloc_free(list);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
+       list->dn[list->count].data = discard_const_p(unsigned char, dn);
+       list->dn[list->count].length = strlen(dn);
+       list->count++;
 
-       if (ret == LDB_SUCCESS) {
-               ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
-       }
+       ret = ltdb_dn_list_store(module, dn_key, list);
 
-       talloc_free(msg);
+       talloc_free(list);
 
        return ret;
 }
 
-static int ltdb_index_add0(struct ldb_module *module, const char *dn,
-                          struct ldb_message_element *elements, int num_el)
+/*
+  add index entries for one elements in a message
+ */
+static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
+                            struct ldb_message_element *el)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       int ret;
-       unsigned int i, j;
+       unsigned int i;
+       for (i = 0; i < el->num_values; i++) {
+               int ret = ltdb_index_add1(module, dn, el, i);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
+/*
+  add index entries for all elements in a message
+ */
+static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
+                             struct ldb_message_element *elements, int num_el)
+{
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       unsigned int i;
 
        if (dn[0] == '@') {
                return LDB_SUCCESS;
@@ -1286,40 +1116,108 @@ static int ltdb_index_add0(struct ldb_module *module, const char *dn,
        }
 
        for (i = 0; i < num_el; i++) {
-               ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
-                                      NULL, LTDB_IDXATTR);
-               if (ret == -1) {
+               int ret;
+               if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
                        continue;
                }
-               for (j = 0; j < elements[i].num_values; j++) {
-                       ret = ltdb_index_add1(module, dn, &elements[i], j);
-                       if (ret != LDB_SUCCESS) {
-                               return ret;
-                       }
+               ret = ltdb_index_add_el(module, dn, &elements[i]);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
        }
 
        return LDB_SUCCESS;
 }
 
+
 /*
-  add the index entries for a new record
+  insert a one level index for a message
 */
-int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
+static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
 {
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
+       struct ldb_message_element el;
+       struct ldb_val val;
+       struct ldb_dn *pdn;
        const char *dn;
        int ret;
 
+       /* We index for ONE Level only if requested */
+       if (!ltdb->cache->one_level_indexes) {
+               return LDB_SUCCESS;
+       }
+
+       pdn = ldb_dn_get_parent(module, msg->dn);
+       if (pdn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        dn = ldb_dn_get_linearized(msg->dn);
        if (dn == NULL) {
+               talloc_free(pdn);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
+       if (val.data == NULL) {
+               talloc_free(pdn);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
+       val.length = strlen((char *)val.data);
+       el.name = LTDB_IDXONE;
+       el.values = &val;
+       el.num_values = 1;
+
+       if (add) {
+               ret = ltdb_index_add1(module, dn, &el, 0);
+       } else { /* delete */
+               ret = ltdb_index_del_value(module, dn, &el, 0);
+       }
+
+       talloc_free(pdn);
 
        return ret;
 }
 
+/*
+  add the index entries for a new element in a record
+  The caller guarantees that these element values are not yet indexed
+*/
+int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
+                          struct ldb_message_element *el)
+{
+       if (ldb_dn_is_special(dn)) {
+               return LDB_SUCCESS;
+       }
+       return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
+}
+
+/*
+  add the index entries for a new record
+*/
+int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
+{
+       const char *dn;
+       int ret;
+
+       if (ldb_dn_is_special(msg->dn)) {
+               return LDB_SUCCESS;
+       }
+
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       return ltdb_index_onelevel(module, msg, 1);
+}
+
 
 /*
   delete an index entry for one message element
@@ -1328,10 +1226,9 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                         struct ldb_message_element *el, int v_idx)
 {
        struct ldb_context *ldb;
-       struct ldb_message *msg;
        struct ldb_dn *dn_key;
        int ret, i;
-       unsigned int j;
+       struct dn_list *list;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -1339,23 +1236,18 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                return LDB_SUCCESS;
        }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
        if (!dn_key) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg = talloc(dn_key, struct ldb_message);
-       if (msg == NULL) {
+       list = talloc_zero(dn_key, struct dn_list);
+       if (list == NULL) {
                talloc_free(dn_key);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1_index(module, dn_key, msg);
-       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-               talloc_free(dn_key);
-               return ret;
-       }
-
+       ret = ltdb_dn_list_load(module, dn_key, list);
        if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
@@ -1363,35 +1255,25 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                return LDB_SUCCESS;
        }
 
-       i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
-       if (i == -1) {
-               struct ldb_ldif ldif;
-
-               ldb_debug(ldb, LDB_DEBUG_ERROR,
-                               "ERROR: dn %s not found in %s\n", dn,
-                               ldb_dn_get_linearized(dn_key));
-               ldif.changetype = LDB_CHANGETYPE_NONE;
-               ldif.msg = msg;
-               ldb_ldif_write_file(ldb, stdout, &ldif);
-               sleep(100);
-               /* it ain't there. hmmm */
+       if (ret != LDB_SUCCESS) {
                talloc_free(dn_key);
-               return LDB_SUCCESS;
+               return ret;
        }
 
-       if (j != msg->elements[i].num_values - 1) {
-               memmove(&msg->elements[i].values[j],
-                       &msg->elements[i].values[j+1],
-                       (msg->elements[i].num_values-(j+1)) *
-                       sizeof(msg->elements[i].values[0]));
+       i = ltdb_dn_list_find_str(list, dn);
+       if (i == -1) {
+               /* nothing to delete */
+               talloc_free(dn_key);
+               return LDB_SUCCESS;             
        }
-       msg->elements[i].num_values--;
 
-       if (msg->elements[i].num_values == 0) {
-               ret = ltdb_delete_noindex(module, dn_key);
-       } else {
-               ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
+       if (i != list->count-1) {
+               memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
        }
+       list->count--;
+       list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
+
+       ret = ltdb_dn_list_store(module, dn_key, list);
 
        talloc_free(dn_key);
 
@@ -1399,43 +1281,31 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
 }
 
 /*
-  delete the index entries for a record
+  delete the index entries for a element
   return -1 on failure
 */
-int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
+int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
-       const char *dn;
-       unsigned int i, j;
+       unsigned int i;
 
-       /* find the list of indexed fields */
-       if (ltdb->cache->indexlist->num_elements == 0) {
+       if (!ltdb->cache->attribute_indexes) {
                /* no indexed fields */
                return LDB_SUCCESS;
        }
 
-       if (ldb_dn_is_special(msg->dn)) {
+       if (dn[0] == '@') {
                return LDB_SUCCESS;
        }
 
-       dn = ldb_dn_get_linearized(msg->dn);
-       if (dn == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
+               return LDB_SUCCESS;
        }
-
-       for (i = 0; i < msg->num_elements; i++) {
-               ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
-                                      NULL, LTDB_IDXATTR);
-               if (ret == -1) {
-                       continue;
-               }
-               for (j = 0; j < msg->elements[i].num_values; j++) {
-                       ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
-                       if (ret != LDB_SUCCESS) {
-                               return ret;
-                       }
+       for (i = 0; i < el->num_values; i++) {
+               ret = ltdb_index_del_value(module, dn, el, i);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
        }
 
@@ -1443,55 +1313,43 @@ int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
 }
 
 /*
-  handle special index for one level searches
+  delete the index entries for a record
+  return -1 on failure
 */
-int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
+int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-       struct ldb_message_element el;
-       struct ldb_val val;
-       struct ldb_dn *pdn;
-       const char *dn;
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
+       const char *dn;
+       unsigned int i;
 
-       /* We index for ONE Level only if requested */
-       ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
-       if (ret != 0) {
+       if (ldb_dn_is_special(msg->dn)) {
                return LDB_SUCCESS;
        }
 
-       pdn = ldb_dn_get_parent(module, msg->dn);
-       if (pdn == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       ret = ltdb_index_onelevel(module, msg, 0);
+       if (ret != LDB_SUCCESS) {
+               return ret;
        }
 
-       dn = ldb_dn_get_linearized(msg->dn);
-       if (dn == NULL) {
-               talloc_free(pdn);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (!ltdb->cache->attribute_indexes) {
+               /* no indexed fields */
+               return LDB_SUCCESS;
        }
 
-       val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
-       if (val.data == NULL) {
-               talloc_free(pdn);
+       dn = ldb_dn_get_linearized(msg->dn);
+       if (dn == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       val.length = strlen((char *)val.data);
-       el.name = LTDB_IDXONE;
-       el.values = &val;
-       el.num_values = 1;
-
-       if (add) {
-               ret = ltdb_index_add1(module, dn, &el, 0);
-       } else { /* delete */
-               ret = ltdb_index_del_value(module, dn, &el, 0);
+       for (i = 0; i < msg->num_elements; i++) {
+               ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
        }
 
-       talloc_free(pdn);
-
-       return ret;
+       return LDB_SUCCESS;
 }
 
 
@@ -1533,6 +1391,8 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 
        ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                         ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
@@ -1542,7 +1402,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        key2 = ltdb_key(module, msg->dn);
        if (key2.dptr == NULL) {
                /* probably a corrupt record ... darn */
-               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
                                                        ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return 0;
@@ -1559,15 +1419,17 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                dn = ldb_dn_get_linearized(msg->dn);
        }
 
-       ret = ltdb_index_one(module, msg, 1);
-       if (ret == LDB_SUCCESS) {
-               ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
-       } else {
+       ret = ltdb_index_onelevel(module, msg, 1);
+       if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_ERROR,
-                       "Adding special ONE LEVEL index failed (%s)!\n",
+                       "Adding special ONE LEVEL index failed (%s)!",
                        ldb_dn_get_linearized(msg->dn));
+               talloc_free(msg);
+               return -1;
        }
 
+       ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
+
        talloc_free(msg);
 
        if (ret != LDB_SUCCESS) return -1;
@@ -1580,8 +1442,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 */
 int ltdb_reindex(struct ldb_module *module)
 {
-       void *data = ldb_module_get_private(module);
-       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+       struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
        int ret;
 
        if (ltdb_cache_reload(module) != 0) {