s4:ldb Add detail to failures in the indexing code
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
64         if (ret != 0) return ret;
65         if (v2->length > v1->length && v2->data[v1->length] != 0) {
66                 return 1;
67         }
68         return 0;
69 }
70
71
72 /*
73   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
74   comparison with the dn returns -1 if not found
75  */
76 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
77 {
78         int i;
79         for (i=0; i<list->count; i++) {
80                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
81         }
82         return -1;
83 }
84
85 /*
86   find a entry in a dn_list. Uses a case sensitive comparison with the dn
87   returns -1 if not found
88  */
89 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
90 {
91         struct ldb_val v;
92         v.data = discard_const_p(unsigned char, dn);
93         v.length = strlen(dn);
94         return ltdb_dn_list_find_val(list, &v);
95 }
96
97 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
98 {
99         struct dn_list *list;
100         if (rec.dsize != sizeof(void *)) {
101                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
102                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
103                 return NULL;
104         }
105         
106         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
107         if (list == NULL) {
108                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
109                                        "Bad type '%s' for idxptr", 
110                                        talloc_get_name(*(struct dn_list **)rec.dptr));
111                 return NULL;
112         }
113         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
114                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
115                                        "Bad parent '%s' for idxptr", 
116                                        talloc_get_name(talloc_parent(list->dn)));
117                 return NULL;
118         }
119         return list;
120 }
121
122 /*
123   return the @IDX list in an index entry for a dn as a 
124   struct dn_list
125  */
126 static int ltdb_dn_list_load(struct ldb_module *module,
127                              struct ldb_dn *dn, struct dn_list *list)
128 {
129         struct ldb_message *msg;
130         int ret;
131         struct ldb_message_element *el;
132         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
133         TDB_DATA rec;
134         struct dn_list *list2;
135         TDB_DATA key;
136
137         list->dn = NULL;
138         list->count = 0;
139
140         /* see if we have any in-memory index entries */
141         if (ltdb->idxptr == NULL ||
142             ltdb->idxptr->itdb == NULL) {
143                 goto normal_index;
144         }
145
146         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
147         key.dsize = strlen((char *)key.dptr);
148
149         rec = tdb_fetch(ltdb->idxptr->itdb, key);
150         if (rec.dptr == NULL) {
151                 goto normal_index;
152         }
153
154         /* we've found an in-memory index entry */
155         list2 = ltdb_index_idxptr(module, rec, true);
156         if (list2 == NULL) {
157                 free(rec.dptr);
158                 return LDB_ERR_OPERATIONS_ERROR;
159         }
160         free(rec.dptr);
161
162         *list = *list2;
163         return LDB_SUCCESS;
164
165 normal_index:
166         msg = ldb_msg_new(list);
167         if (msg == NULL) {
168                 return LDB_ERR_OPERATIONS_ERROR;
169         }
170
171         ret = ltdb_search_dn1(module, dn, msg);
172         if (ret != LDB_SUCCESS) {
173                 return ret;
174         }
175
176         /* TODO: check indexing version number */
177
178         el = ldb_msg_find_element(msg, LTDB_IDX);
179         if (!el) {
180                 talloc_free(msg);
181                 return LDB_SUCCESS;
182         }
183
184         /* we avoid copying the strings by stealing the list */
185         list->dn = talloc_steal(list, el->values);
186         list->count = el->num_values;
187
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193   save a dn_list into a full @IDX style record
194  */
195 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
196                                    struct dn_list *list)
197 {
198         struct ldb_message *msg;
199         int ret;
200
201         if (list->count == 0) {
202                 ret = ltdb_delete_noindex(module, dn);
203                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
204                         return LDB_SUCCESS;
205                 }
206                 return ret;
207         }
208
209         msg = ldb_msg_new(module);
210         if (!msg) {
211                 ldb_module_oom(module);
212                 return LDB_ERR_OPERATIONS_ERROR;
213         }
214
215         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
216         if (ret != LDB_SUCCESS) {
217                 talloc_free(msg);
218                 ldb_module_oom(module);
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         msg->dn = dn;
223         if (list->count > 0) {
224                 struct ldb_message_element *el;
225
226                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
227                 if (ret != LDB_SUCCESS) {
228                         ldb_module_oom(module);
229                         talloc_free(msg);
230                         return LDB_ERR_OPERATIONS_ERROR;
231                 }
232                 el->values = list->dn;
233                 el->num_values = list->count;
234         }
235
236         ret = ltdb_store(module, msg, TDB_REPLACE);
237         talloc_free(msg);
238         return ret;
239 }
240
241 /*
242   save a dn_list into the database, in either @IDX or internal format
243  */
244 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
245                               struct dn_list *list)
246 {
247         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
248         TDB_DATA rec, key;
249         int ret;
250         struct dn_list *list2;
251
252         if (ltdb->idxptr == NULL) {
253                 return ltdb_dn_list_store_full(module, dn, list);
254         }
255
256         if (ltdb->idxptr->itdb == NULL) {
257                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
258                 if (ltdb->idxptr->itdb == NULL) {
259                         return LDB_ERR_OPERATIONS_ERROR;
260                 }
261         }
262
263         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
264         key.dsize = strlen((char *)key.dptr);
265
266         rec = tdb_fetch(ltdb->idxptr->itdb, key);
267         if (rec.dptr != NULL) {
268                 list2 = ltdb_index_idxptr(module, rec, false);
269                 if (list2 == NULL) {
270                         free(rec.dptr);
271                         return LDB_ERR_OPERATIONS_ERROR;
272                 }
273                 free(rec.dptr);
274                 list2->dn = talloc_steal(list2, list->dn);
275                 list2->count = list->count;
276                 return LDB_SUCCESS;
277         }
278
279         list2 = talloc(ltdb->idxptr, struct dn_list);
280         if (list2 == NULL) {
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283         list2->dn = talloc_steal(list2, list->dn);
284         list2->count = list->count;
285
286         rec.dptr = (uint8_t *)&list2;
287         rec.dsize = sizeof(void *);
288
289         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
290         if (ret == -1) {
291                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
292         }
293         return LDB_SUCCESS;
294 }
295
296 /*
297   traverse function for storing the in-memory index entries on disk
298  */
299 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
300 {
301         struct ldb_module *module = state;
302         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303         struct ldb_dn *dn;
304         struct ldb_context *ldb = ldb_module_get_ctx(module);
305         struct ldb_val v;
306         struct dn_list *list;
307
308         list = ltdb_index_idxptr(module, data, true);
309         if (list == NULL) {
310                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
311                 return -1;
312         }
313
314         v.data = key.dptr;
315         v.length = strnlen((char *)key.dptr, key.dsize);
316
317         dn = ldb_dn_from_ldb_val(module, ldb, &v);
318         if (dn == NULL) {
319                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
320                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
321                 return -1;
322         }
323
324         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
325         talloc_free(dn);
326         if (ltdb->idxptr->error != 0) {
327                 return -1;
328         }
329         return 0;
330 }
331
332 /* cleanup the idxptr mode when transaction commits */
333 int ltdb_index_transaction_commit(struct ldb_module *module)
334 {
335         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
336         int ret;
337
338         struct ldb_context *ldb = ldb_module_get_ctx(module);
339
340         ldb_reset_err_string(ldb);
341         if (ltdb->idxptr->itdb) {
342                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
343                 tdb_close(ltdb->idxptr->itdb);
344         }
345
346         ret = ltdb->idxptr->error;
347
348         if (ret != LDB_SUCCESS) {
349                 if (!ldb_errstring(ldb)) {
350                         ldb_set_errstring(ldb, ldb_strerror(ret));
351                 }
352                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
353         }
354
355         talloc_free(ltdb->idxptr);
356         ltdb->idxptr = NULL;
357         return ret;
358 }
359
360 /* cleanup the idxptr mode when transaction cancels */
361 int ltdb_index_transaction_cancel(struct ldb_module *module)
362 {
363         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
364         if (ltdb->idxptr && ltdb->idxptr->itdb) {
365                 tdb_close(ltdb->idxptr->itdb);
366         }
367         talloc_free(ltdb->idxptr);
368         ltdb->idxptr = NULL;
369         return LDB_SUCCESS;
370 }
371
372
373 /*
374   return the dn key to be used for an index
375   the caller is responsible for freeing
376 */
377 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
378                                      const char *attr, const struct ldb_val *value,
379                                      const struct ldb_schema_attribute **ap)
380 {
381         struct ldb_dn *ret;
382         struct ldb_val v;
383         const struct ldb_schema_attribute *a;
384         char *attr_folded;
385         int r;
386
387         attr_folded = ldb_attr_casefold(ldb, attr);
388         if (!attr_folded) {
389                 return NULL;
390         }
391
392         a = ldb_schema_attribute_by_name(ldb, attr);
393         if (ap) {
394                 *ap = a;
395         }
396         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
397         if (r != LDB_SUCCESS) {
398                 const char *errstr = ldb_errstring(ldb);
399                 /* canonicalisation can be refused. For example, 
400                    a attribute that takes wildcards will refuse to canonicalise
401                    if the value contains a wildcard */
402                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
403                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
404                 talloc_free(attr_folded);
405                 return NULL;
406         }
407         if (ldb_should_b64_encode(ldb, &v)) {
408                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
409                 if (!vstr) return NULL;
410                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
411                 talloc_free(vstr);
412         } else {
413                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
414         }
415
416         if (v.data != value->data) {
417                 talloc_free(v.data);
418         }
419         talloc_free(attr_folded);
420
421         return ret;
422 }
423
424 /*
425   see if a attribute value is in the list of indexed attributes
426 */
427 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
428 {
429         unsigned int i;
430         struct ldb_message_element *el;
431
432         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
433         if (el == NULL) {
434                 return false;
435         }
436
437         /* TODO: this is too expensive! At least use a binary search */
438         for (i=0; i<el->num_values; i++) {
439                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
440                         return true;
441                 }
442         }
443         return false;
444 }
445
446 /*
447   in the following logic functions, the return value is treated as
448   follows:
449
450      LDB_SUCCESS: we found some matching index values
451
452      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
453
454      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
455                                we'll need a full search
456  */
457
458 /*
459   return a list of dn's that might match a simple indexed search (an
460   equality search only)
461  */
462 static int ltdb_index_dn_simple(struct ldb_module *module,
463                                 const struct ldb_parse_tree *tree,
464                                 const struct ldb_message *index_list,
465                                 struct dn_list *list)
466 {
467         struct ldb_context *ldb;
468         struct ldb_dn *dn;
469         int ret;
470
471         ldb = ldb_module_get_ctx(module);
472
473         list->count = 0;
474         list->dn = NULL;
475
476         /* if the attribute isn't in the list of indexed attributes then
477            this node needs a full search */
478         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
479                 return LDB_ERR_OPERATIONS_ERROR;
480         }
481
482         /* the attribute is indexed. Pull the list of DNs that match the 
483            search criterion */
484         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
485         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
486
487         ret = ltdb_dn_list_load(module, dn, list);
488         talloc_free(dn);
489         return ret;
490 }
491
492
493 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
494
495 /*
496   return a list of dn's that might match a leaf indexed search
497  */
498 static int ltdb_index_dn_leaf(struct ldb_module *module,
499                               const struct ldb_parse_tree *tree,
500                               const struct ldb_message *index_list,
501                               struct dn_list *list)
502 {
503         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
504                 list->dn = talloc_array(list, struct ldb_val, 1);
505                 if (list->dn == NULL) {
506                         ldb_module_oom(module);
507                         return LDB_ERR_OPERATIONS_ERROR;
508                 }
509                 list->dn[0] = tree->u.equality.value;
510                 list->count = 1;
511                 return LDB_SUCCESS;
512         }
513         return ltdb_index_dn_simple(module, tree, index_list, list);
514 }
515
516
517 /*
518   list intersection
519   list = list & list2
520 */
521 static bool list_intersect(struct ldb_context *ldb,
522                            struct dn_list *list, const struct dn_list *list2)
523 {
524         struct dn_list *list3;
525         unsigned int i;
526
527         if (list->count == 0) {
528                 /* 0 & X == 0 */
529                 return true;
530         }
531         if (list2->count == 0) {
532                 /* X & 0 == 0 */
533                 list->count = 0;
534                 list->dn = NULL;
535                 return true;
536         }
537
538         /* the indexing code is allowed to return a longer list than
539            what really matches, as all results are filtered by the
540            full expression at the end - this shortcut avoids a lot of
541            work in some cases */
542         if (list->count < 2 && list2->count > 10) {
543                 return true;
544         }
545         if (list2->count < 2 && list->count > 10) {
546                 list->count = list2->count;
547                 list->dn = list2->dn;
548                 /* note that list2 may not be the parent of list2->dn,
549                    as list2->dn may be owned by ltdb->idxptr. In that
550                    case we expect this reparent call to fail, which is
551                    OK */
552                 talloc_reparent(list2, list, list2->dn);
553                 return true;
554         }
555
556         list3 = talloc_zero(list, struct dn_list);
557         if (list3 == NULL) {
558                 return false;
559         }
560
561         list3->dn = talloc_array(list3, struct ldb_val, list->count);
562         if (!list3->dn) {
563                 talloc_free(list3);
564                 return false;
565         }
566         list3->count = 0;
567
568         for (i=0;i<list->count;i++) {
569                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
570                         list3->dn[list3->count] = list->dn[i];
571                         list3->count++;
572                 }
573         }
574
575         list->dn = talloc_steal(list, list3->dn);
576         list->count = list3->count;
577         talloc_free(list3);
578
579         return true;
580 }
581
582
583 /*
584   list union
585   list = list | list2
586 */
587 static bool list_union(struct ldb_context *ldb,
588                        struct dn_list *list, const struct dn_list *list2)
589 {
590         struct ldb_val *dn3;
591
592         if (list2->count == 0) {
593                 /* X | 0 == X */
594                 return true;
595         }
596
597         if (list->count == 0) {
598                 /* 0 | X == X */
599                 list->count = list2->count;
600                 list->dn = list2->dn;
601                 /* note that list2 may not be the parent of list2->dn,
602                    as list2->dn may be owned by ltdb->idxptr. In that
603                    case we expect this reparent call to fail, which is
604                    OK */
605                 talloc_reparent(list2, list, list2->dn);
606                 return true;
607         }
608
609         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
610         if (!dn3) {
611                 ldb_oom(ldb);
612                 return false;
613         }
614
615         /* we allow for duplicates here, and get rid of them later */
616         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
617         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
618
619         list->dn = dn3;
620         list->count += list2->count;
621
622         return true;
623 }
624
625 static int ltdb_index_dn(struct ldb_module *module,
626                          const struct ldb_parse_tree *tree,
627                          const struct ldb_message *index_list,
628                          struct dn_list *list);
629
630
631 /*
632   process an OR list (a union)
633  */
634 static int ltdb_index_dn_or(struct ldb_module *module,
635                             const struct ldb_parse_tree *tree,
636                             const struct ldb_message *index_list,
637                             struct dn_list *list)
638 {
639         struct ldb_context *ldb;
640         unsigned int i;
641
642         ldb = ldb_module_get_ctx(module);
643
644         list->dn = NULL;
645         list->count = 0;
646
647         for (i=0; i<tree->u.list.num_elements; i++) {
648                 struct dn_list *list2;
649                 int ret;
650
651                 list2 = talloc_zero(list, struct dn_list);
652                 if (list2 == NULL) {
653                         return LDB_ERR_OPERATIONS_ERROR;
654                 }
655
656                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
657
658                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
659                         /* X || 0 == X */
660                         talloc_free(list2);
661                         continue;
662                 }
663
664                 if (ret != LDB_SUCCESS) {
665                         /* X || * == * */
666                         talloc_free(list2);
667                         return ret;
668                 }
669
670                 if (!list_union(ldb, list, list2)) {
671                         talloc_free(list2);
672                         return LDB_ERR_OPERATIONS_ERROR;
673                 }
674         }
675
676         if (list->count == 0) {
677                 return LDB_ERR_NO_SUCH_OBJECT;
678         }
679
680         return LDB_SUCCESS;
681 }
682
683
684 /*
685   NOT an index results
686  */
687 static int ltdb_index_dn_not(struct ldb_module *module,
688                              const struct ldb_parse_tree *tree,
689                              const struct ldb_message *index_list,
690                              struct dn_list *list)
691 {
692         /* the only way to do an indexed not would be if we could
693            negate the not via another not or if we knew the total
694            number of database elements so we could know that the
695            existing expression covered the whole database.
696
697            instead, we just give up, and rely on a full index scan
698            (unless an outer & manages to reduce the list)
699         */
700         return LDB_ERR_OPERATIONS_ERROR;
701 }
702
703
704 static bool ltdb_index_unique(struct ldb_context *ldb,
705                               const char *attr)
706 {
707         const struct ldb_schema_attribute *a;
708         a = ldb_schema_attribute_by_name(ldb, attr);
709         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
710                 return true;
711         }
712         return false;
713 }
714
715 /*
716   process an AND expression (intersection)
717  */
718 static int ltdb_index_dn_and(struct ldb_module *module,
719                              const struct ldb_parse_tree *tree,
720                              const struct ldb_message *index_list,
721                              struct dn_list *list)
722 {
723         struct ldb_context *ldb;
724         unsigned int i;
725         bool found;
726
727         ldb = ldb_module_get_ctx(module);
728
729         list->dn = NULL;
730         list->count = 0;
731
732         /* in the first pass we only look for unique simple
733            equality tests, in the hope of avoiding having to look
734            at any others */
735         for (i=0; i<tree->u.list.num_elements; i++) {
736                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
737                 int ret;
738
739                 if (subtree->operation != LDB_OP_EQUALITY ||
740                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
741                         continue;
742                 }
743                 
744                 ret = ltdb_index_dn(module, subtree, index_list, list);
745                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
746                         /* 0 && X == 0 */
747                         return LDB_ERR_NO_SUCH_OBJECT;
748                 }
749                 if (ret == LDB_SUCCESS) {
750                         /* a unique index match means we can
751                          * stop. Note that we don't care if we return
752                          * a few too many objects, due to later
753                          * filtering */
754                         return LDB_SUCCESS;
755                 }
756         }       
757
758         /* now do a full intersection */
759         found = false;
760
761         for (i=0; i<tree->u.list.num_elements; i++) {
762                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
763                 struct dn_list *list2;
764                 int ret;
765
766                 list2 = talloc_zero(list, struct dn_list);
767                 if (list2 == NULL) {
768                         ldb_module_oom(module);
769                         return LDB_ERR_OPERATIONS_ERROR;
770                 }
771                         
772                 ret = ltdb_index_dn(module, subtree, index_list, list2);
773
774                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
775                         /* X && 0 == 0 */
776                         list->dn = NULL;
777                         list->count = 0;
778                         talloc_free(list2);
779                         return LDB_ERR_NO_SUCH_OBJECT;
780                 }
781                 
782                 if (ret != LDB_SUCCESS) {
783                         /* this didn't adding anything */
784                         talloc_free(list2);
785                         continue;
786                 }
787
788                 if (!found) {
789                         talloc_reparent(list2, list, list->dn);
790                         list->dn = list2->dn;
791                         list->count = list2->count;
792                         found = true;
793                 } else if (!list_intersect(ldb, list, list2)) {
794                         talloc_free(list2);
795                         return LDB_ERR_OPERATIONS_ERROR;
796                 }
797                         
798                 if (list->count == 0) {
799                         list->dn = NULL;
800                         return LDB_ERR_NO_SUCH_OBJECT;
801                 }
802                         
803                 if (list->count < 2) {
804                         /* it isn't worth loading the next part of the tree */
805                         return LDB_SUCCESS;
806                 }
807         }       
808
809         if (!found) {
810                 /* none of the attributes were indexed */
811                 return LDB_ERR_OPERATIONS_ERROR;
812         }
813
814         return LDB_SUCCESS;
815 }
816         
817 /*
818   return a list of matching objects using a one-level index
819  */
820 static int ltdb_index_dn_one(struct ldb_module *module,
821                              struct ldb_dn *parent_dn,
822                              struct dn_list *list)
823 {
824         struct ldb_context *ldb;
825         struct ldb_dn *key;
826         struct ldb_val val;
827         int ret;
828
829         ldb = ldb_module_get_ctx(module);
830
831         /* work out the index key from the parent DN */
832         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
833         val.length = strlen((char *)val.data);
834         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
835         if (!key) {
836                 ldb_oom(ldb);
837                 return LDB_ERR_OPERATIONS_ERROR;
838         }
839
840         ret = ltdb_dn_list_load(module, key, list);
841         talloc_free(key);
842         if (ret != LDB_SUCCESS) {
843                 return ret;
844         }
845
846         if (list->count == 0) {
847                 return LDB_ERR_NO_SUCH_OBJECT;
848         }
849
850         return LDB_SUCCESS;
851 }
852
853 /*
854   return a list of dn's that might match a indexed search or
855   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
856  */
857 static int ltdb_index_dn(struct ldb_module *module,
858                          const struct ldb_parse_tree *tree,
859                          const struct ldb_message *index_list,
860                          struct dn_list *list)
861 {
862         int ret = LDB_ERR_OPERATIONS_ERROR;
863
864         switch (tree->operation) {
865         case LDB_OP_AND:
866                 ret = ltdb_index_dn_and(module, tree, index_list, list);
867                 break;
868
869         case LDB_OP_OR:
870                 ret = ltdb_index_dn_or(module, tree, index_list, list);
871                 break;
872
873         case LDB_OP_NOT:
874                 ret = ltdb_index_dn_not(module, tree, index_list, list);
875                 break;
876
877         case LDB_OP_EQUALITY:
878                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
879                 break;
880
881         case LDB_OP_SUBSTRING:
882         case LDB_OP_GREATER:
883         case LDB_OP_LESS:
884         case LDB_OP_PRESENT:
885         case LDB_OP_APPROX:
886         case LDB_OP_EXTENDED:
887                 /* we can't index with fancy bitops yet */
888                 ret = LDB_ERR_OPERATIONS_ERROR;
889                 break;
890         }
891
892         return ret;
893 }
894
895 /*
896   filter a candidate dn_list from an indexed search into a set of results
897   extracting just the given attributes
898 */
899 static int ltdb_index_filter(const struct dn_list *dn_list,
900                              struct ltdb_context *ac, 
901                              uint32_t *match_count)
902 {
903         struct ldb_context *ldb;
904         struct ldb_message *msg;
905         unsigned int i;
906
907         ldb = ldb_module_get_ctx(ac->module);
908
909         for (i = 0; i < dn_list->count; i++) {
910                 struct ldb_dn *dn;
911                 int ret;
912
913                 msg = ldb_msg_new(ac);
914                 if (!msg) {
915                         return LDB_ERR_OPERATIONS_ERROR;
916                 }
917
918                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
919                 if (dn == NULL) {
920                         talloc_free(msg);
921                         return LDB_ERR_OPERATIONS_ERROR;
922                 }
923
924                 ret = ltdb_search_dn1(ac->module, dn, msg);
925                 talloc_free(dn);
926                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
927                         /* the record has disappeared? yes, this can happen */
928                         talloc_free(msg);
929                         continue;
930                 }
931
932                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
933                         /* an internal error */
934                         talloc_free(msg);
935                         return LDB_ERR_OPERATIONS_ERROR;
936                 }
937
938                 if (!ldb_match_msg(ldb, msg,
939                                    ac->tree, ac->base, ac->scope)) {
940                         talloc_free(msg);
941                         continue;
942                 }
943
944                 /* filter the attributes that the user wants */
945                 ret = ltdb_filter_attrs(msg, ac->attrs);
946
947                 if (ret == -1) {
948                         talloc_free(msg);
949                         return LDB_ERR_OPERATIONS_ERROR;
950                 }
951
952                 ret = ldb_module_send_entry(ac->req, msg, NULL);
953                 if (ret != LDB_SUCCESS) {
954                         ac->request_terminated = true;
955                         return ret;
956                 }
957
958                 (*match_count)++;
959         }
960
961         return LDB_SUCCESS;
962 }
963
964 /*
965   remove any duplicated entries in a indexed result
966  */
967 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
968 {
969         int i, new_count;
970
971         if (list->count < 2) {
972                 return;
973         }
974
975         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
976
977         new_count = 1;
978         for (i=1; i<list->count; i++) {
979                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
980                         if (new_count != i) {
981                                 list->dn[new_count] = list->dn[i];
982                         }
983                         new_count++;
984                 }
985         }
986         
987         list->count = new_count;
988 }
989
990 /*
991   search the database with a LDAP-like expression using indexes
992   returns -1 if an indexed search is not possible, in which
993   case the caller should call ltdb_search_full()
994 */
995 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
996 {
997         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
998         struct dn_list *dn_list;
999         int ret;
1000
1001         /* see if indexing is enabled */
1002         if (!ltdb->cache->attribute_indexes && 
1003             !ltdb->cache->one_level_indexes &&
1004             ac->scope != LDB_SCOPE_BASE) {
1005                 /* fallback to a full search */
1006                 return LDB_ERR_OPERATIONS_ERROR;
1007         }
1008
1009         dn_list = talloc_zero(ac, struct dn_list);
1010         if (dn_list == NULL) {
1011                 ldb_module_oom(ac->module);
1012                 return LDB_ERR_OPERATIONS_ERROR;
1013         }
1014
1015         switch (ac->scope) {
1016         case LDB_SCOPE_BASE:
1017                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1018                 if (dn_list->dn == NULL) {
1019                         ldb_module_oom(ac->module);
1020                         talloc_free(dn_list);
1021                         return LDB_ERR_OPERATIONS_ERROR;
1022                 }
1023                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1024                 if (dn_list->dn[0].data == NULL) {
1025                         ldb_module_oom(ac->module);
1026                         talloc_free(dn_list);
1027                         return LDB_ERR_OPERATIONS_ERROR;
1028                 }
1029                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1030                 dn_list->count = 1;
1031                 break;          
1032
1033         case LDB_SCOPE_ONELEVEL:
1034                 if (!ltdb->cache->one_level_indexes) {
1035                         talloc_free(dn_list);
1036                         return LDB_ERR_OPERATIONS_ERROR;
1037                 }
1038                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1039                 if (ret != LDB_SUCCESS) {
1040                         talloc_free(dn_list);
1041                         return ret;
1042                 }
1043                 break;
1044
1045         case LDB_SCOPE_SUBTREE:
1046         case LDB_SCOPE_DEFAULT:
1047                 if (!ltdb->cache->attribute_indexes) {
1048                         talloc_free(dn_list);
1049                         return LDB_ERR_OPERATIONS_ERROR;
1050                 }
1051                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1052                 if (ret != LDB_SUCCESS) {
1053                         talloc_free(dn_list);
1054                         return ret;
1055                 }
1056                 ltdb_dn_list_remove_duplicates(dn_list);
1057                 break;
1058         }
1059
1060         ret = ltdb_index_filter(dn_list, ac, match_count);
1061         talloc_free(dn_list);
1062         return ret;
1063 }
1064
1065 /*
1066   add an index entry for one message element
1067 */
1068 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1069                            struct ldb_message_element *el, int v_idx)
1070 {
1071         struct ldb_context *ldb;
1072         struct ldb_dn *dn_key;
1073         int ret;
1074         const struct ldb_schema_attribute *a;
1075         struct dn_list *list;
1076         unsigned alloc_len;
1077
1078         ldb = ldb_module_get_ctx(module);
1079
1080         list = talloc_zero(module, struct dn_list);
1081         if (list == NULL) {
1082                 return LDB_ERR_OPERATIONS_ERROR;
1083         }
1084
1085         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1086         if (!dn_key) {
1087                 talloc_free(list);
1088                 return LDB_ERR_OPERATIONS_ERROR;
1089         }
1090         talloc_steal(list, dn_key);
1091
1092         ret = ltdb_dn_list_load(module, dn_key, list);
1093         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1094                 talloc_free(list);
1095                 return ret;
1096         }
1097
1098         if (ltdb_dn_list_find_str(list, dn) != -1) {
1099                 talloc_free(list);
1100                 return LDB_SUCCESS;
1101         }
1102
1103         if (list->count > 0 &&
1104             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1105                 talloc_free(list);
1106                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1107         }
1108
1109         /* overallocate the list a bit, to reduce the number of
1110          * realloc trigered copies */    
1111         alloc_len = ((list->count+1)+7) & ~7;
1112         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1113         if (list->dn == NULL) {
1114                 talloc_free(list);
1115                 return LDB_ERR_OPERATIONS_ERROR;
1116         }
1117         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1118         list->dn[list->count].length = strlen(dn);
1119         list->count++;
1120
1121         ret = ltdb_dn_list_store(module, dn_key, list);
1122
1123         talloc_free(list);
1124
1125         return ret;
1126 }
1127
1128 /*
1129   add index entries for one elements in a message
1130  */
1131 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1132                              struct ldb_message_element *el)
1133 {
1134         unsigned int i;
1135         for (i = 0; i < el->num_values; i++) {
1136                 int ret = ltdb_index_add1(module, dn, el, i);
1137                 if (ret != LDB_SUCCESS) {
1138                         return ret;
1139                 }
1140         }
1141
1142         return LDB_SUCCESS;
1143 }
1144
1145 /*
1146   add index entries for all elements in a message
1147  */
1148 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1149                               struct ldb_message_element *elements, int num_el)
1150 {
1151         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1152         unsigned int i;
1153
1154         if (dn[0] == '@') {
1155                 return LDB_SUCCESS;
1156         }
1157
1158         if (ltdb->cache->indexlist->num_elements == 0) {
1159                 /* no indexed fields */
1160                 return LDB_SUCCESS;
1161         }
1162
1163         for (i = 0; i < num_el; i++) {
1164                 int ret;
1165                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1166                         continue;
1167                 }
1168                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1169                 if (ret != LDB_SUCCESS) {
1170                         return ret;
1171                 }
1172         }
1173
1174         return LDB_SUCCESS;
1175 }
1176
1177
1178 /*
1179   insert a one level index for a message
1180 */
1181 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1182 {
1183         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1184         struct ldb_message_element el;
1185         struct ldb_val val;
1186         struct ldb_dn *pdn;
1187         const char *dn;
1188         int ret;
1189
1190         /* We index for ONE Level only if requested */
1191         if (!ltdb->cache->one_level_indexes) {
1192                 return LDB_SUCCESS;
1193         }
1194
1195         pdn = ldb_dn_get_parent(module, msg->dn);
1196         if (pdn == NULL) {
1197                 return LDB_ERR_OPERATIONS_ERROR;
1198         }
1199
1200         dn = ldb_dn_get_linearized(msg->dn);
1201         if (dn == NULL) {
1202                 talloc_free(pdn);
1203                 return LDB_ERR_OPERATIONS_ERROR;
1204         }
1205
1206         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1207         if (val.data == NULL) {
1208                 talloc_free(pdn);
1209                 return LDB_ERR_OPERATIONS_ERROR;
1210         }
1211
1212         val.length = strlen((char *)val.data);
1213         el.name = LTDB_IDXONE;
1214         el.values = &val;
1215         el.num_values = 1;
1216
1217         if (add) {
1218                 ret = ltdb_index_add1(module, dn, &el, 0);
1219         } else { /* delete */
1220                 ret = ltdb_index_del_value(module, dn, &el, 0);
1221         }
1222
1223         talloc_free(pdn);
1224
1225         return ret;
1226 }
1227
1228 /*
1229   add the index entries for a new element in a record
1230   The caller guarantees that these element values are not yet indexed
1231 */
1232 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1233                            struct ldb_message_element *el)
1234 {
1235         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1236         if (ldb_dn_is_special(dn)) {
1237                 return LDB_SUCCESS;
1238         }
1239         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1240                 return LDB_SUCCESS;
1241         }
1242         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1243 }
1244
1245 /*
1246   add the index entries for a new record
1247 */
1248 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1249 {
1250         const char *dn;
1251         int ret;
1252
1253         if (ldb_dn_is_special(msg->dn)) {
1254                 return LDB_SUCCESS;
1255         }
1256
1257         dn = ldb_dn_get_linearized(msg->dn);
1258         if (dn == NULL) {
1259                 return LDB_ERR_OPERATIONS_ERROR;
1260         }
1261
1262         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1263         if (ret != LDB_SUCCESS) {
1264                 return ret;
1265         }
1266
1267         return ltdb_index_onelevel(module, msg, 1);
1268 }
1269
1270
1271 /*
1272   delete an index entry for one message element
1273 */
1274 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1275                          struct ldb_message_element *el, int v_idx)
1276 {
1277         struct ldb_context *ldb;
1278         struct ldb_dn *dn_key;
1279         int ret, i;
1280         struct dn_list *list;
1281
1282         ldb = ldb_module_get_ctx(module);
1283
1284         if (dn[0] == '@') {
1285                 return LDB_SUCCESS;
1286         }
1287
1288         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1289         if (!dn_key) {
1290                 return LDB_ERR_OPERATIONS_ERROR;
1291         }
1292
1293         list = talloc_zero(dn_key, struct dn_list);
1294         if (list == NULL) {
1295                 talloc_free(dn_key);
1296                 return LDB_ERR_OPERATIONS_ERROR;
1297         }
1298
1299         ret = ltdb_dn_list_load(module, dn_key, list);
1300         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1301                 /* it wasn't indexed. Did we have an earlier error? If we did then
1302                    its gone now */
1303                 talloc_free(dn_key);
1304                 return LDB_SUCCESS;
1305         }
1306
1307         if (ret != LDB_SUCCESS) {
1308                 talloc_free(dn_key);
1309                 return ret;
1310         }
1311
1312         i = ltdb_dn_list_find_str(list, dn);
1313         if (i == -1) {
1314                 /* nothing to delete */
1315                 talloc_free(dn_key);
1316                 return LDB_SUCCESS;             
1317         }
1318
1319         if (i != list->count-1) {
1320                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1321         }
1322         list->count--;
1323         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1324
1325         ret = ltdb_dn_list_store(module, dn_key, list);
1326
1327         talloc_free(dn_key);
1328
1329         return ret;
1330 }
1331
1332 /*
1333   delete the index entries for a element
1334   return -1 on failure
1335 */
1336 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1337 {
1338         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1339         int ret;
1340         unsigned int i;
1341
1342         if (!ltdb->cache->attribute_indexes) {
1343                 /* no indexed fields */
1344                 return LDB_SUCCESS;
1345         }
1346
1347         if (dn[0] == '@') {
1348                 return LDB_SUCCESS;
1349         }
1350
1351         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1352                 return LDB_SUCCESS;
1353         }
1354         for (i = 0; i < el->num_values; i++) {
1355                 ret = ltdb_index_del_value(module, dn, el, i);
1356                 if (ret != LDB_SUCCESS) {
1357                         return ret;
1358                 }
1359         }
1360
1361         return LDB_SUCCESS;
1362 }
1363
1364 /*
1365   delete the index entries for a record
1366   return -1 on failure
1367 */
1368 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1369 {
1370         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1371         int ret;
1372         const char *dn;
1373         unsigned int i;
1374
1375         if (ldb_dn_is_special(msg->dn)) {
1376                 return LDB_SUCCESS;
1377         }
1378
1379         ret = ltdb_index_onelevel(module, msg, 0);
1380         if (ret != LDB_SUCCESS) {
1381                 return ret;
1382         }
1383
1384         if (!ltdb->cache->attribute_indexes) {
1385                 /* no indexed fields */
1386                 return LDB_SUCCESS;
1387         }
1388
1389         dn = ldb_dn_get_linearized(msg->dn);
1390         if (dn == NULL) {
1391                 return LDB_ERR_OPERATIONS_ERROR;
1392         }
1393
1394         for (i = 0; i < msg->num_elements; i++) {
1395                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1396                 if (ret != LDB_SUCCESS) {
1397                         return ret;
1398                 }
1399         }
1400
1401         return LDB_SUCCESS;
1402 }
1403
1404
1405 /*
1406   traversal function that deletes all @INDEX records
1407 */
1408 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1409 {
1410         struct ldb_module *module = state;
1411         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1412         const char *dnstr = "DN=" LTDB_INDEX ":";
1413         struct dn_list list;
1414         struct ldb_dn *dn;
1415         struct ldb_val v;
1416         int ret;
1417
1418         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1419                 return 0;
1420         }
1421         /* we need to put a empty list in the internal tdb for this
1422          * index entry */
1423         list.dn = NULL;
1424         list.count = 0;
1425         v.data = key.dptr;
1426         v.length = strnlen((char *)key.dptr, key.dsize);
1427
1428         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1429         ret = ltdb_dn_list_store(module, dn, &list);
1430         if (ret != LDB_SUCCESS) {
1431                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1432                                        "Unable to store null index for %s\n",
1433                                        ldb_dn_get_linearized(dn));
1434                 talloc_free(dn);
1435                 return -1;
1436         }
1437         talloc_free(dn);
1438         return 0;
1439 }
1440
1441 /*
1442   traversal function that adds @INDEX records during a re index
1443 */
1444 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1445 {
1446         struct ldb_context *ldb;
1447         struct ldb_module *module = (struct ldb_module *)state;
1448         struct ldb_message *msg;
1449         const char *dn = NULL;
1450         int ret;
1451         TDB_DATA key2;
1452
1453         ldb = ldb_module_get_ctx(module);
1454
1455         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1456             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1457                 return 0;
1458         }
1459
1460         msg = ldb_msg_new(module);
1461         if (msg == NULL) {
1462                 return -1;
1463         }
1464
1465         ret = ltdb_unpack_data(module, &data, msg);
1466         if (ret != 0) {
1467                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1468                           ldb_dn_get_linearized(msg->dn));
1469                 talloc_free(msg);
1470                 return -1;
1471         }
1472
1473         /* check if the DN key has changed, perhaps due to the
1474            case insensitivity of an element changing */
1475         key2 = ltdb_key(module, msg->dn);
1476         if (key2.dptr == NULL) {
1477                 /* probably a corrupt record ... darn */
1478                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1479                                                         ldb_dn_get_linearized(msg->dn));
1480                 talloc_free(msg);
1481                 return 0;
1482         }
1483         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1484                 tdb_delete(tdb, key);
1485                 tdb_store(tdb, key2, data, 0);
1486         }
1487         talloc_free(key2.dptr);
1488
1489         if (msg->dn == NULL) {
1490                 dn = (char *)key.dptr + 3;
1491         } else {
1492                 dn = ldb_dn_get_linearized(msg->dn);
1493         }
1494
1495         ret = ltdb_index_onelevel(module, msg, 1);
1496         if (ret != LDB_SUCCESS) {
1497                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1498                         "Adding special ONE LEVEL index failed (%s)!",
1499                         ldb_dn_get_linearized(msg->dn));
1500                 talloc_free(msg);
1501                 return -1;
1502         }
1503
1504         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1505
1506         talloc_free(msg);
1507
1508         if (ret != LDB_SUCCESS) return -1;
1509
1510         return 0;
1511 }
1512
1513 /*
1514   force a complete reindex of the database
1515 */
1516 int ltdb_reindex(struct ldb_module *module)
1517 {
1518         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1519         int ret;
1520
1521         if (ltdb_cache_reload(module) != 0) {
1522                 return LDB_ERR_OPERATIONS_ERROR;
1523         }
1524
1525         /* first traverse the database deleting any @INDEX records by
1526          * putting NULL entries in the in-memory tdb
1527          */
1528         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1529         if (ret == -1) {
1530                 return LDB_ERR_OPERATIONS_ERROR;
1531         }
1532
1533         /* if we don't have indexes we have nothing todo */
1534         if (ltdb->cache->indexlist->num_elements == 0) {
1535                 return LDB_SUCCESS;
1536         }
1537
1538         /* now traverse adding any indexes for normal LDB records */
1539         ret = tdb_traverse(ltdb->tdb, re_index, module);
1540         if (ret == -1) {
1541                 return LDB_ERR_OPERATIONS_ERROR;
1542         }
1543
1544         return LDB_SUCCESS;
1545 }