a04fb1a8bd2416f1ebbee435a5b70111d0a0ea96
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 ldb_module_oom(module);
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         msg->dn = dn;
223         if (list->count > 0) {
224                 struct ldb_message_element *el;
225
226                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
227                 if (ret != LDB_SUCCESS) {
228                         ldb_module_oom(module);
229                         talloc_free(msg);
230                         return LDB_ERR_OPERATIONS_ERROR;
231                 }
232                 el->values = list->dn;
233                 el->num_values = list->count;
234         }
235
236         ret = ltdb_store(module, msg, TDB_REPLACE);
237         talloc_free(msg);
238         return ret;
239 }
240
241 /*
242   save a dn_list into the database, in either @IDX or internal format
243  */
244 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
245                               struct dn_list *list)
246 {
247         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
248         TDB_DATA rec, key;
249         int ret;
250         struct dn_list *list2;
251
252         if (ltdb->idxptr == NULL) {
253                 return ltdb_dn_list_store_full(module, dn, list);
254         }
255
256         if (ltdb->idxptr->itdb == NULL) {
257                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
258                 if (ltdb->idxptr->itdb == NULL) {
259                         return LDB_ERR_OPERATIONS_ERROR;
260                 }
261         }
262
263         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
264         key.dsize = strlen((char *)key.dptr);
265
266         rec = tdb_fetch(ltdb->idxptr->itdb, key);
267         if (rec.dptr != NULL) {
268                 list2 = ltdb_index_idxptr(module, rec, false);
269                 if (list2 == NULL) {
270                         free(rec.dptr);
271                         return LDB_ERR_OPERATIONS_ERROR;
272                 }
273                 free(rec.dptr);
274                 list2->dn = talloc_steal(list2, list->dn);
275                 list2->count = list->count;
276                 return LDB_SUCCESS;
277         }
278
279         list2 = talloc(ltdb->idxptr, struct dn_list);
280         if (list2 == NULL) {
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283         list2->dn = talloc_steal(list2, list->dn);
284         list2->count = list->count;
285
286         rec.dptr = (uint8_t *)&list2;
287         rec.dsize = sizeof(void *);
288
289         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
290         if (ret == -1) {
291                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
292         }
293         return LDB_SUCCESS;
294 }
295
296 /*
297   traverse function for storing the in-memory index entries on disk
298  */
299 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
300 {
301         struct ldb_module *module = state;
302         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303         struct ldb_dn *dn;
304         struct ldb_context *ldb = ldb_module_get_ctx(module);
305         struct ldb_val v;
306         struct dn_list *list;
307
308         list = ltdb_index_idxptr(module, data, true);
309         if (list == NULL) {
310                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
311                 return -1;
312         }
313
314         v.data = key.dptr;
315         v.length = key.dsize;
316
317         dn = ldb_dn_from_ldb_val(module, ldb, &v);
318         if (dn == NULL) {
319                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
320                 return -1;
321         }
322
323         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
324         talloc_free(dn);
325         if (ltdb->idxptr->error != 0) {
326                 return -1;
327         }
328         return 0;
329 }
330
331 /* cleanup the idxptr mode when transaction commits */
332 int ltdb_index_transaction_commit(struct ldb_module *module)
333 {
334         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
335         int ret;
336
337         if (ltdb->idxptr->itdb) {
338                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
339                 tdb_close(ltdb->idxptr->itdb);
340         }
341
342         ret = ltdb->idxptr->error;
343
344         if (ret != LDB_SUCCESS) {
345                 struct ldb_context *ldb = ldb_module_get_ctx(module);
346                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
347         }
348
349         talloc_free(ltdb->idxptr);
350         ltdb->idxptr = NULL;
351         return ret;
352 }
353
354 /* cleanup the idxptr mode when transaction cancels */
355 int ltdb_index_transaction_cancel(struct ldb_module *module)
356 {
357         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
358         if (ltdb->idxptr && ltdb->idxptr->itdb) {
359                 tdb_close(ltdb->idxptr->itdb);
360         }
361         talloc_free(ltdb->idxptr);
362         ltdb->idxptr = NULL;
363         return LDB_SUCCESS;
364 }
365
366
367 /*
368   return the dn key to be used for an index
369   the caller is responsible for freeing
370 */
371 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
372                                      const char *attr, const struct ldb_val *value,
373                                      const struct ldb_schema_attribute **ap)
374 {
375         struct ldb_dn *ret;
376         struct ldb_val v;
377         const struct ldb_schema_attribute *a;
378         char *attr_folded;
379         int r;
380
381         attr_folded = ldb_attr_casefold(ldb, attr);
382         if (!attr_folded) {
383                 return NULL;
384         }
385
386         a = ldb_schema_attribute_by_name(ldb, attr);
387         if (ap) {
388                 *ap = a;
389         }
390         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
391         if (r != LDB_SUCCESS) {
392                 const char *errstr = ldb_errstring(ldb);
393                 /* canonicalisation can be refused. For example, 
394                    a attribute that takes wildcards will refuse to canonicalise
395                    if the value contains a wildcard */
396                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
397                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
398                 talloc_free(attr_folded);
399                 return NULL;
400         }
401         if (ldb_should_b64_encode(ldb, &v)) {
402                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
403                 if (!vstr) return NULL;
404                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
405                 talloc_free(vstr);
406         } else {
407                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
408         }
409
410         if (v.data != value->data) {
411                 talloc_free(v.data);
412         }
413         talloc_free(attr_folded);
414
415         return ret;
416 }
417
418 /*
419   see if a attribute value is in the list of indexed attributes
420 */
421 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
422 {
423         unsigned int i;
424         struct ldb_message_element *el;
425
426         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
427         if (el == NULL) {
428                 return false;
429         }
430         for (i=0; i<el->num_values; i++) {
431                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
432                         return true;
433                 }
434         }
435         return false;
436 }
437
438 /*
439   in the following logic functions, the return value is treated as
440   follows:
441
442      LDB_SUCCESS: we found some matching index values
443
444      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
445
446      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
447                                we'll need a full search
448  */
449
450 /*
451   return a list of dn's that might match a simple indexed search (an
452   equality search only)
453  */
454 static int ltdb_index_dn_simple(struct ldb_module *module,
455                                 const struct ldb_parse_tree *tree,
456                                 const struct ldb_message *index_list,
457                                 struct dn_list *list)
458 {
459         struct ldb_context *ldb;
460         struct ldb_dn *dn;
461         int ret;
462
463         ldb = ldb_module_get_ctx(module);
464
465         list->count = 0;
466         list->dn = NULL;
467
468         /* if the attribute isn't in the list of indexed attributes then
469            this node needs a full search */
470         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
471                 return LDB_ERR_OPERATIONS_ERROR;
472         }
473
474         /* the attribute is indexed. Pull the list of DNs that match the 
475            search criterion */
476         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
477         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
478
479         ret = ltdb_dn_list_load(module, dn, list);
480         talloc_free(dn);
481         return ret;
482 }
483
484
485 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
486
487 /*
488   return a list of dn's that might match a leaf indexed search
489  */
490 static int ltdb_index_dn_leaf(struct ldb_module *module,
491                               const struct ldb_parse_tree *tree,
492                               const struct ldb_message *index_list,
493                               struct dn_list *list)
494 {
495         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
496                 list->dn = talloc_array(list, struct ldb_val, 1);
497                 if (list->dn == NULL) {
498                         ldb_module_oom(module);
499                         return LDB_ERR_OPERATIONS_ERROR;
500                 }
501                 list->dn[0] = tree->u.equality.value;
502                 list->count = 1;
503                 return LDB_SUCCESS;
504         }
505         return ltdb_index_dn_simple(module, tree, index_list, list);
506 }
507
508
509 /*
510   list intersection
511   list = list & list2
512 */
513 static bool list_intersect(struct ldb_context *ldb,
514                            struct dn_list *list, const struct dn_list *list2)
515 {
516         struct dn_list *list3;
517         unsigned int i;
518
519         if (list->count == 0) {
520                 /* 0 & X == 0 */
521                 return true;
522         }
523         if (list2->count == 0) {
524                 /* X & 0 == 0 */
525                 list->count = 0;
526                 list->dn = NULL;
527                 return true;
528         }
529
530         /* the indexing code is allowed to return a longer list than
531            what really matches, as all results are filtered by the
532            full expression at the end - this shortcut avoids a lot of
533            work in some cases */
534         if (list->count < 2 && list2->count > 10) {
535                 return true;
536         }
537         if (list2->count < 2 && list->count > 10) {
538                 list->count = list2->count;
539                 list->dn = list2->dn;
540                 /* note that list2 may not be the parent of list2->dn,
541                    as list2->dn may be owned by ltdb->idxptr. In that
542                    case we expect this reparent call to fail, which is
543                    OK */
544                 talloc_reparent(list2, list, list2->dn);
545                 return true;
546         }
547
548         list3 = talloc_zero(list, struct dn_list);
549         if (list3 == NULL) {
550                 return false;
551         }
552
553         list3->dn = talloc_array(list3, struct ldb_val, list->count);
554         if (!list3->dn) {
555                 talloc_free(list3);
556                 return false;
557         }
558         list3->count = 0;
559
560         for (i=0;i<list->count;i++) {
561                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
562                         list3->dn[list3->count] = list->dn[i];
563                         list3->count++;
564                 }
565         }
566
567         list->dn = talloc_steal(list, list3->dn);
568         list->count = list3->count;
569         talloc_free(list3);
570
571         return true;
572 }
573
574
575 /*
576   list union
577   list = list | list2
578 */
579 static bool list_union(struct ldb_context *ldb,
580                        struct dn_list *list, const struct dn_list *list2)
581 {
582         struct ldb_val *dn3;
583
584         if (list2->count == 0) {
585                 /* X | 0 == X */
586                 return true;
587         }
588
589         if (list->count == 0) {
590                 /* 0 | X == X */
591                 list->count = list2->count;
592                 list->dn = list2->dn;
593                 /* note that list2 may not be the parent of list2->dn,
594                    as list2->dn may be owned by ltdb->idxptr. In that
595                    case we expect this reparent call to fail, which is
596                    OK */
597                 talloc_reparent(list2, list, list2->dn);
598                 return true;
599         }
600
601         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
602         if (!dn3) {
603                 ldb_oom(ldb);
604                 return false;
605         }
606
607         /* we allow for duplicates here, and get rid of them later */
608         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
609         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
610
611         list->dn = dn3;
612         list->count += list2->count;
613
614         return true;
615 }
616
617 static int ltdb_index_dn(struct ldb_module *module,
618                          const struct ldb_parse_tree *tree,
619                          const struct ldb_message *index_list,
620                          struct dn_list *list);
621
622
623 /*
624   process an OR list (a union)
625  */
626 static int ltdb_index_dn_or(struct ldb_module *module,
627                             const struct ldb_parse_tree *tree,
628                             const struct ldb_message *index_list,
629                             struct dn_list *list)
630 {
631         struct ldb_context *ldb;
632         unsigned int i;
633
634         ldb = ldb_module_get_ctx(module);
635
636         list->dn = NULL;
637         list->count = 0;
638
639         for (i=0; i<tree->u.list.num_elements; i++) {
640                 struct dn_list *list2;
641                 int ret;
642
643                 list2 = talloc_zero(list, struct dn_list);
644                 if (list2 == NULL) {
645                         return LDB_ERR_OPERATIONS_ERROR;
646                 }
647
648                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
649
650                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
651                         /* X || 0 == X */
652                         talloc_free(list2);
653                         continue;
654                 }
655
656                 if (ret != LDB_SUCCESS) {
657                         /* X || * == * */
658                         talloc_free(list2);
659                         return ret;
660                 }
661
662                 if (!list_union(ldb, list, list2)) {
663                         talloc_free(list2);
664                         return LDB_ERR_OPERATIONS_ERROR;
665                 }
666         }
667
668         if (list->count == 0) {
669                 return LDB_ERR_NO_SUCH_OBJECT;
670         }
671
672         return LDB_SUCCESS;
673 }
674
675
676 /*
677   NOT an index results
678  */
679 static int ltdb_index_dn_not(struct ldb_module *module,
680                              const struct ldb_parse_tree *tree,
681                              const struct ldb_message *index_list,
682                              struct dn_list *list)
683 {
684         /* the only way to do an indexed not would be if we could
685            negate the not via another not or if we knew the total
686            number of database elements so we could know that the
687            existing expression covered the whole database.
688
689            instead, we just give up, and rely on a full index scan
690            (unless an outer & manages to reduce the list)
691         */
692         return LDB_ERR_OPERATIONS_ERROR;
693 }
694
695
696 static bool ltdb_index_unique(struct ldb_context *ldb,
697                               const char *attr)
698 {
699         const struct ldb_schema_attribute *a;
700         a = ldb_schema_attribute_by_name(ldb, attr);
701         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
702                 return true;
703         }
704         return false;
705 }
706
707 /*
708   process an AND expression (intersection)
709  */
710 static int ltdb_index_dn_and(struct ldb_module *module,
711                              const struct ldb_parse_tree *tree,
712                              const struct ldb_message *index_list,
713                              struct dn_list *list)
714 {
715         struct ldb_context *ldb;
716         unsigned int i;
717         bool found;
718
719         ldb = ldb_module_get_ctx(module);
720
721         list->dn = NULL;
722         list->count = 0;
723
724         /* in the first pass we only look for unique simple
725            equality tests, in the hope of avoiding having to look
726            at any others */
727         for (i=0; i<tree->u.list.num_elements; i++) {
728                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
729                 int ret;
730
731                 if (subtree->operation != LDB_OP_EQUALITY ||
732                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
733                         continue;
734                 }
735                 
736                 ret = ltdb_index_dn(module, subtree, index_list, list);
737                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
738                         /* 0 && X == 0 */
739                         return LDB_ERR_NO_SUCH_OBJECT;
740                 }
741                 if (ret == LDB_SUCCESS) {
742                         /* a unique index match means we can
743                          * stop. Note that we don't care if we return
744                          * a few too many objects, due to later
745                          * filtering */
746                         return LDB_SUCCESS;
747                 }
748         }       
749
750         /* now do a full intersection */
751         found = false;
752
753         for (i=0; i<tree->u.list.num_elements; i++) {
754                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
755                 struct dn_list *list2;
756                 int ret;
757
758                 list2 = talloc_zero(list, struct dn_list);
759                 if (list2 == NULL) {
760                         ldb_module_oom(module);
761                         return LDB_ERR_OPERATIONS_ERROR;
762                 }
763                         
764                 ret = ltdb_index_dn(module, subtree, index_list, list2);
765
766                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
767                         /* X && 0 == 0 */
768                         list->dn = NULL;
769                         list->count = 0;
770                         talloc_free(list2);
771                         return LDB_ERR_NO_SUCH_OBJECT;
772                 }
773                 
774                 if (ret != LDB_SUCCESS) {
775                         /* this didn't adding anything */
776                         talloc_free(list2);
777                         continue;
778                 }
779
780                 if (!found) {
781                         talloc_reparent(list2, list, list->dn);
782                         list->dn = list2->dn;
783                         list->count = list2->count;
784                         found = true;
785                 } else if (!list_intersect(ldb, list, list2)) {
786                         talloc_free(list2);
787                         return LDB_ERR_OPERATIONS_ERROR;
788                 }
789                         
790                 if (list->count == 0) {
791                         list->dn = NULL;
792                         return LDB_ERR_NO_SUCH_OBJECT;
793                 }
794                         
795                 if (list->count < 2) {
796                         /* it isn't worth loading the next part of the tree */
797                         return LDB_SUCCESS;
798                 }
799         }       
800
801         if (!found) {
802                 /* none of the attributes were indexed */
803                 return LDB_ERR_OPERATIONS_ERROR;
804         }
805
806         return LDB_SUCCESS;
807 }
808         
809 /*
810   return a list of matching objects using a one-level index
811  */
812 static int ltdb_index_dn_one(struct ldb_module *module,
813                              struct ldb_dn *parent_dn,
814                              struct dn_list *list)
815 {
816         struct ldb_context *ldb;
817         struct ldb_dn *key;
818         struct ldb_val val;
819         int ret;
820
821         ldb = ldb_module_get_ctx(module);
822
823         /* work out the index key from the parent DN */
824         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
825         val.length = strlen((char *)val.data);
826         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
827         if (!key) {
828                 ldb_oom(ldb);
829                 return LDB_ERR_OPERATIONS_ERROR;
830         }
831
832         ret = ltdb_dn_list_load(module, key, list);
833         talloc_free(key);
834         if (ret != LDB_SUCCESS) {
835                 return ret;
836         }
837
838         if (list->count == 0) {
839                 return LDB_ERR_NO_SUCH_OBJECT;
840         }
841
842         return LDB_SUCCESS;
843 }
844
845 /*
846   return a list of dn's that might match a indexed search or
847   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
848  */
849 static int ltdb_index_dn(struct ldb_module *module,
850                          const struct ldb_parse_tree *tree,
851                          const struct ldb_message *index_list,
852                          struct dn_list *list)
853 {
854         int ret = LDB_ERR_OPERATIONS_ERROR;
855
856         switch (tree->operation) {
857         case LDB_OP_AND:
858                 ret = ltdb_index_dn_and(module, tree, index_list, list);
859                 break;
860
861         case LDB_OP_OR:
862                 ret = ltdb_index_dn_or(module, tree, index_list, list);
863                 break;
864
865         case LDB_OP_NOT:
866                 ret = ltdb_index_dn_not(module, tree, index_list, list);
867                 break;
868
869         case LDB_OP_EQUALITY:
870                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
871                 break;
872
873         case LDB_OP_SUBSTRING:
874         case LDB_OP_GREATER:
875         case LDB_OP_LESS:
876         case LDB_OP_PRESENT:
877         case LDB_OP_APPROX:
878         case LDB_OP_EXTENDED:
879                 /* we can't index with fancy bitops yet */
880                 ret = LDB_ERR_OPERATIONS_ERROR;
881                 break;
882         }
883
884         return ret;
885 }
886
887 /*
888   filter a candidate dn_list from an indexed search into a set of results
889   extracting just the given attributes
890 */
891 static int ltdb_index_filter(const struct dn_list *dn_list,
892                              struct ltdb_context *ac, 
893                              uint32_t *match_count)
894 {
895         struct ldb_context *ldb;
896         struct ldb_message *msg;
897         unsigned int i;
898
899         ldb = ldb_module_get_ctx(ac->module);
900
901         for (i = 0; i < dn_list->count; i++) {
902                 struct ldb_dn *dn;
903                 int ret;
904
905                 msg = ldb_msg_new(ac);
906                 if (!msg) {
907                         return LDB_ERR_OPERATIONS_ERROR;
908                 }
909
910                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
911                 if (dn == NULL) {
912                         talloc_free(msg);
913                         return LDB_ERR_OPERATIONS_ERROR;
914                 }
915
916                 ret = ltdb_search_dn1(ac->module, dn, msg);
917                 talloc_free(dn);
918                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
919                         /* the record has disappeared? yes, this can happen */
920                         talloc_free(msg);
921                         continue;
922                 }
923
924                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
925                         /* an internal error */
926                         talloc_free(msg);
927                         return LDB_ERR_OPERATIONS_ERROR;
928                 }
929
930                 if (!ldb_match_msg(ldb, msg,
931                                    ac->tree, ac->base, ac->scope)) {
932                         talloc_free(msg);
933                         continue;
934                 }
935
936                 /* filter the attributes that the user wants */
937                 ret = ltdb_filter_attrs(msg, ac->attrs);
938
939                 if (ret == -1) {
940                         talloc_free(msg);
941                         return LDB_ERR_OPERATIONS_ERROR;
942                 }
943
944                 ret = ldb_module_send_entry(ac->req, msg, NULL);
945                 if (ret != LDB_SUCCESS) {
946                         ac->request_terminated = true;
947                         return ret;
948                 }
949
950                 (*match_count)++;
951         }
952
953         return LDB_SUCCESS;
954 }
955
956 /*
957   remove any duplicated entries in a indexed result
958  */
959 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
960 {
961         int i, new_count;
962
963         if (list->count < 2) {
964                 return;
965         }
966
967         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
968
969         new_count = 1;
970         for (i=1; i<list->count; i++) {
971                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
972                         if (new_count != i) {
973                                 list->dn[new_count] = list->dn[i];
974                         }
975                         new_count++;
976                 }
977         }
978         
979         list->count = new_count;
980 }
981
982 /*
983   search the database with a LDAP-like expression using indexes
984   returns -1 if an indexed search is not possible, in which
985   case the caller should call ltdb_search_full()
986 */
987 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
988 {
989         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
990         struct dn_list *dn_list;
991         int ret;
992
993         /* see if indexing is enabled */
994         if (!ltdb->cache->attribute_indexes && 
995             !ltdb->cache->one_level_indexes &&
996             ac->scope != LDB_SCOPE_BASE) {
997                 /* fallback to a full search */
998                 return LDB_ERR_OPERATIONS_ERROR;
999         }
1000
1001         dn_list = talloc_zero(ac, struct dn_list);
1002         if (dn_list == NULL) {
1003                 ldb_module_oom(ac->module);
1004                 return LDB_ERR_OPERATIONS_ERROR;
1005         }
1006
1007         switch (ac->scope) {
1008         case LDB_SCOPE_BASE:
1009                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1010                 if (dn_list->dn == NULL) {
1011                         ldb_module_oom(ac->module);
1012                         talloc_free(dn_list);
1013                         return LDB_ERR_OPERATIONS_ERROR;
1014                 }
1015                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1016                 if (dn_list->dn[0].data == NULL) {
1017                         ldb_module_oom(ac->module);
1018                         talloc_free(dn_list);
1019                         return LDB_ERR_OPERATIONS_ERROR;
1020                 }
1021                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1022                 dn_list->count = 1;
1023                 break;          
1024
1025         case LDB_SCOPE_ONELEVEL:
1026                 if (!ltdb->cache->one_level_indexes) {
1027                         talloc_free(dn_list);
1028                         return LDB_ERR_OPERATIONS_ERROR;
1029                 }
1030                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1031                 if (ret != LDB_SUCCESS) {
1032                         talloc_free(dn_list);
1033                         return ret;
1034                 }
1035                 break;
1036
1037         case LDB_SCOPE_SUBTREE:
1038         case LDB_SCOPE_DEFAULT:
1039                 if (!ltdb->cache->attribute_indexes) {
1040                         talloc_free(dn_list);
1041                         return LDB_ERR_OPERATIONS_ERROR;
1042                 }
1043                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1044                 if (ret != LDB_SUCCESS) {
1045                         talloc_free(dn_list);
1046                         return ret;
1047                 }
1048                 ltdb_dn_list_remove_duplicates(dn_list);
1049                 break;
1050         }
1051
1052         ret = ltdb_index_filter(dn_list, ac, match_count);
1053         talloc_free(dn_list);
1054         return ret;
1055 }
1056
1057 /*
1058   add an index entry for one message element
1059 */
1060 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1061                            struct ldb_message_element *el, int v_idx)
1062 {
1063         struct ldb_context *ldb;
1064         struct ldb_dn *dn_key;
1065         int ret;
1066         const struct ldb_schema_attribute *a;
1067         struct dn_list *list;
1068         unsigned alloc_len;
1069
1070         ldb = ldb_module_get_ctx(module);
1071
1072         list = talloc_zero(module, struct dn_list);
1073         if (list == NULL) {
1074                 return LDB_ERR_OPERATIONS_ERROR;
1075         }
1076
1077         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1078         if (!dn_key) {
1079                 talloc_free(list);
1080                 return LDB_ERR_OPERATIONS_ERROR;
1081         }
1082         talloc_steal(list, dn_key);
1083
1084         ret = ltdb_dn_list_load(module, dn_key, list);
1085         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1086                 talloc_free(list);
1087                 return ret;
1088         }
1089
1090         if (ltdb_dn_list_find_str(list, dn) != -1) {
1091                 talloc_free(list);
1092                 return LDB_SUCCESS;
1093         }
1094
1095         if (list->count > 0 &&
1096             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1097                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1098         }
1099
1100         /* overallocate the list a bit, to reduce the number of
1101          * realloc trigered copies */    
1102         alloc_len = ((list->count+1)+7) & ~7;
1103         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1104         if (list->dn == NULL) {
1105                 talloc_free(list);
1106                 return LDB_ERR_OPERATIONS_ERROR;
1107         }
1108         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1109         list->dn[list->count].length = strlen(dn);
1110         list->count++;
1111
1112         ret = ltdb_dn_list_store(module, dn_key, list);
1113
1114         talloc_free(list);
1115
1116         return ret;
1117 }
1118
1119 /*
1120   add index entries for one elements in a message
1121  */
1122 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1123                              struct ldb_message_element *el)
1124 {
1125         unsigned int i;
1126         for (i = 0; i < el->num_values; i++) {
1127                 int ret = ltdb_index_add1(module, dn, el, i);
1128                 if (ret != LDB_SUCCESS) {
1129                         return ret;
1130                 }
1131         }
1132
1133         return LDB_SUCCESS;
1134 }
1135
1136 /*
1137   add index entries for all elements in a message
1138  */
1139 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1140                               struct ldb_message_element *elements, int num_el)
1141 {
1142         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1143         unsigned int i;
1144
1145         if (dn[0] == '@') {
1146                 return LDB_SUCCESS;
1147         }
1148
1149         if (ltdb->cache->indexlist->num_elements == 0) {
1150                 /* no indexed fields */
1151                 return LDB_SUCCESS;
1152         }
1153
1154         for (i = 0; i < num_el; i++) {
1155                 int ret;
1156                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1157                         continue;
1158                 }
1159                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1160                 if (ret != LDB_SUCCESS) {
1161                         return ret;
1162                 }
1163         }
1164
1165         return LDB_SUCCESS;
1166 }
1167
1168
1169 /*
1170   insert a one level index for a message
1171 */
1172 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1173 {
1174         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1175         struct ldb_message_element el;
1176         struct ldb_val val;
1177         struct ldb_dn *pdn;
1178         const char *dn;
1179         int ret;
1180
1181         /* We index for ONE Level only if requested */
1182         if (!ltdb->cache->one_level_indexes) {
1183                 return LDB_SUCCESS;
1184         }
1185
1186         pdn = ldb_dn_get_parent(module, msg->dn);
1187         if (pdn == NULL) {
1188                 return LDB_ERR_OPERATIONS_ERROR;
1189         }
1190
1191         dn = ldb_dn_get_linearized(msg->dn);
1192         if (dn == NULL) {
1193                 talloc_free(pdn);
1194                 return LDB_ERR_OPERATIONS_ERROR;
1195         }
1196
1197         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1198         if (val.data == NULL) {
1199                 talloc_free(pdn);
1200                 return LDB_ERR_OPERATIONS_ERROR;
1201         }
1202
1203         val.length = strlen((char *)val.data);
1204         el.name = LTDB_IDXONE;
1205         el.values = &val;
1206         el.num_values = 1;
1207
1208         if (add) {
1209                 ret = ltdb_index_add1(module, dn, &el, 0);
1210         } else { /* delete */
1211                 ret = ltdb_index_del_value(module, dn, &el, 0);
1212         }
1213
1214         talloc_free(pdn);
1215
1216         return ret;
1217 }
1218
1219 /*
1220   add the index entries for a new element in a record
1221   The caller guarantees that these element values are not yet indexed
1222 */
1223 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1224                            struct ldb_message_element *el)
1225 {
1226         if (ldb_dn_is_special(dn)) {
1227                 return LDB_SUCCESS;
1228         }
1229         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1230 }
1231
1232 /*
1233   add the index entries for a new record
1234 */
1235 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1236 {
1237         const char *dn;
1238         int ret;
1239
1240         if (ldb_dn_is_special(msg->dn)) {
1241                 return LDB_SUCCESS;
1242         }
1243
1244         dn = ldb_dn_get_linearized(msg->dn);
1245         if (dn == NULL) {
1246                 return LDB_ERR_OPERATIONS_ERROR;
1247         }
1248
1249         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1250         if (ret != LDB_SUCCESS) {
1251                 return ret;
1252         }
1253
1254         return ltdb_index_onelevel(module, msg, 1);
1255 }
1256
1257
1258 /*
1259   delete an index entry for one message element
1260 */
1261 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1262                          struct ldb_message_element *el, int v_idx)
1263 {
1264         struct ldb_context *ldb;
1265         struct ldb_dn *dn_key;
1266         int ret, i;
1267         struct dn_list *list;
1268
1269         ldb = ldb_module_get_ctx(module);
1270
1271         if (dn[0] == '@') {
1272                 return LDB_SUCCESS;
1273         }
1274
1275         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1276         if (!dn_key) {
1277                 return LDB_ERR_OPERATIONS_ERROR;
1278         }
1279
1280         list = talloc_zero(dn_key, struct dn_list);
1281         if (list == NULL) {
1282                 talloc_free(dn_key);
1283                 return LDB_ERR_OPERATIONS_ERROR;
1284         }
1285
1286         ret = ltdb_dn_list_load(module, dn_key, list);
1287         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1288                 /* it wasn't indexed. Did we have an earlier error? If we did then
1289                    its gone now */
1290                 talloc_free(dn_key);
1291                 return LDB_SUCCESS;
1292         }
1293
1294         if (ret != LDB_SUCCESS) {
1295                 talloc_free(dn_key);
1296                 return ret;
1297         }
1298
1299         i = ltdb_dn_list_find_str(list, dn);
1300         if (i == -1) {
1301                 /* nothing to delete */
1302                 talloc_free(dn_key);
1303                 return LDB_SUCCESS;             
1304         }
1305
1306         if (i != list->count-1) {
1307                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1308         }
1309         list->count--;
1310         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1311
1312         ret = ltdb_dn_list_store(module, dn_key, list);
1313
1314         talloc_free(dn_key);
1315
1316         return ret;
1317 }
1318
1319 /*
1320   delete the index entries for a element
1321   return -1 on failure
1322 */
1323 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1324 {
1325         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1326         int ret;
1327         unsigned int i;
1328
1329         if (!ltdb->cache->attribute_indexes) {
1330                 /* no indexed fields */
1331                 return LDB_SUCCESS;
1332         }
1333
1334         if (dn[0] == '@') {
1335                 return LDB_SUCCESS;
1336         }
1337
1338         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1339                 return LDB_SUCCESS;
1340         }
1341         for (i = 0; i < el->num_values; i++) {
1342                 ret = ltdb_index_del_value(module, dn, el, i);
1343                 if (ret != LDB_SUCCESS) {
1344                         return ret;
1345                 }
1346         }
1347
1348         return LDB_SUCCESS;
1349 }
1350
1351 /*
1352   delete the index entries for a record
1353   return -1 on failure
1354 */
1355 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1356 {
1357         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1358         int ret;
1359         const char *dn;
1360         unsigned int i;
1361
1362         if (ldb_dn_is_special(msg->dn)) {
1363                 return LDB_SUCCESS;
1364         }
1365
1366         ret = ltdb_index_onelevel(module, msg, 0);
1367         if (ret != LDB_SUCCESS) {
1368                 return ret;
1369         }
1370
1371         if (!ltdb->cache->attribute_indexes) {
1372                 /* no indexed fields */
1373                 return LDB_SUCCESS;
1374         }
1375
1376         dn = ldb_dn_get_linearized(msg->dn);
1377         if (dn == NULL) {
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         for (i = 0; i < msg->num_elements; i++) {
1382                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1383                 if (ret != LDB_SUCCESS) {
1384                         return ret;
1385                 }
1386         }
1387
1388         return LDB_SUCCESS;
1389 }
1390
1391
1392 /*
1393   traversal function that deletes all @INDEX records
1394 */
1395 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1396 {
1397         const char *dn = "DN=" LTDB_INDEX ":";
1398         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1399                 return tdb_delete(tdb, key);
1400         }
1401         return 0;
1402 }
1403
1404 /*
1405   traversal function that adds @INDEX records during a re index
1406 */
1407 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1408 {
1409         struct ldb_context *ldb;
1410         struct ldb_module *module = (struct ldb_module *)state;
1411         struct ldb_message *msg;
1412         const char *dn = NULL;
1413         int ret;
1414         TDB_DATA key2;
1415
1416         ldb = ldb_module_get_ctx(module);
1417
1418         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1419             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1420                 return 0;
1421         }
1422
1423         msg = talloc(module, struct ldb_message);
1424         if (msg == NULL) {
1425                 return -1;
1426         }
1427
1428         ret = ltdb_unpack_data(module, &data, msg);
1429         if (ret != 0) {
1430                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1431                           ldb_dn_get_linearized(msg->dn));
1432                 talloc_free(msg);
1433                 return -1;
1434         }
1435
1436         /* check if the DN key has changed, perhaps due to the
1437            case insensitivity of an element changing */
1438         key2 = ltdb_key(module, msg->dn);
1439         if (key2.dptr == NULL) {
1440                 /* probably a corrupt record ... darn */
1441                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1442                                                         ldb_dn_get_linearized(msg->dn));
1443                 talloc_free(msg);
1444                 return 0;
1445         }
1446         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1447                 tdb_delete(tdb, key);
1448                 tdb_store(tdb, key2, data, 0);
1449         }
1450         talloc_free(key2.dptr);
1451
1452         if (msg->dn == NULL) {
1453                 dn = (char *)key.dptr + 3;
1454         } else {
1455                 dn = ldb_dn_get_linearized(msg->dn);
1456         }
1457
1458         ret = ltdb_index_onelevel(module, msg, 1);
1459         if (ret != LDB_SUCCESS) {
1460                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1461                         "Adding special ONE LEVEL index failed (%s)!",
1462                         ldb_dn_get_linearized(msg->dn));
1463                 talloc_free(msg);
1464                 return -1;
1465         }
1466
1467         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1468
1469         talloc_free(msg);
1470
1471         if (ret != LDB_SUCCESS) return -1;
1472
1473         return 0;
1474 }
1475
1476 /*
1477   force a complete reindex of the database
1478 */
1479 int ltdb_reindex(struct ldb_module *module)
1480 {
1481         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1482         int ret;
1483
1484         if (ltdb_cache_reload(module) != 0) {
1485                 return LDB_ERR_OPERATIONS_ERROR;
1486         }
1487
1488         /* first traverse the database deleting any @INDEX records */
1489         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1490         if (ret == -1) {
1491                 return LDB_ERR_OPERATIONS_ERROR;
1492         }
1493
1494         /* if we don't have indexes we have nothing todo */
1495         if (ltdb->cache->indexlist->num_elements == 0) {
1496                 return LDB_SUCCESS;
1497         }
1498
1499         /* now traverse adding any indexes for normal LDB records */
1500         ret = tdb_traverse(ltdb->tdb, re_index, module);
1501         if (ret == -1) {
1502                 return LDB_ERR_OPERATIONS_ERROR;
1503         }
1504
1505         if (ltdb->idxptr) {
1506                 ltdb->idxptr->repack = true;
1507         }
1508
1509         return LDB_SUCCESS;
1510 }