c02e74b61db55d53bf9e3c7f9f75f0483cfe6e2a
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         msg = ldb_msg_new(module);
203         if (!msg) {
204                 ldb_module_oom(module);
205                 return LDB_ERR_OPERATIONS_ERROR;
206         }
207
208         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
209         if (ret != LDB_SUCCESS) {
210                 ldb_module_oom(module);
211                 return LDB_ERR_OPERATIONS_ERROR;
212         }
213
214         msg->dn = dn;
215         if (list->count > 0) {
216                 struct ldb_message_element *el;
217
218                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
219                 if (ret != LDB_SUCCESS) {
220                         ldb_module_oom(module);
221                         talloc_free(msg);
222                         return LDB_ERR_OPERATIONS_ERROR;
223                 }
224                 el->values = list->dn;
225                 el->num_values = list->count;
226         }
227
228         ret = ltdb_store(module, msg, TDB_REPLACE);
229         talloc_free(msg);
230         return ret;
231 }
232
233 /*
234   save a dn_list into the database, in either @IDX or internal format
235  */
236 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
237                               struct dn_list *list)
238 {
239         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
240         TDB_DATA rec, key;
241         int ret;
242         struct dn_list *list2;
243
244         if (ltdb->idxptr == NULL) {
245                 return ltdb_dn_list_store_full(module, dn, list);
246         }
247
248         if (ltdb->idxptr->itdb == NULL) {
249                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
250                 if (ltdb->idxptr->itdb == NULL) {
251                         return LDB_ERR_OPERATIONS_ERROR;
252                 }
253         }
254
255         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
256         key.dsize = strlen((char *)key.dptr);
257
258         rec = tdb_fetch(ltdb->idxptr->itdb, key);
259         if (rec.dptr != NULL) {
260                 list2 = ltdb_index_idxptr(module, rec, false);
261                 if (list2 == NULL) {
262                         free(rec.dptr);
263                         return LDB_ERR_OPERATIONS_ERROR;
264                 }
265                 free(rec.dptr);
266                 list2->dn = talloc_steal(list2, list->dn);
267                 list2->count = list->count;
268                 return LDB_SUCCESS;
269         }
270
271         list2 = talloc(ltdb->idxptr, struct dn_list);
272         if (list2 == NULL) {
273                 return LDB_ERR_OPERATIONS_ERROR;
274         }
275         list2->dn = talloc_steal(list2, list->dn);
276         list2->count = list->count;
277
278         rec.dptr = (uint8_t *)&list2;
279         rec.dsize = sizeof(void *);
280
281         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
282
283         return ret;     
284 }
285
286 /*
287   traverse function for storing the in-memory index entries on disk
288  */
289 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
290 {
291         struct ldb_module *module = state;
292         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
293         struct ldb_dn *dn;
294         struct ldb_context *ldb = ldb_module_get_ctx(module);
295         struct ldb_val v;
296         struct dn_list *list;
297
298         list = ltdb_index_idxptr(module, data, true);
299         if (list == NULL) {
300                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
301                 return -1;
302         }
303
304         v.data = key.dptr;
305         v.length = key.dsize;
306
307         dn = ldb_dn_from_ldb_val(module, ldb, &v);
308         if (dn == NULL) {
309                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
310                 return -1;
311         }
312
313         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
314         talloc_free(dn);
315         return ltdb->idxptr->error;
316 }
317
318 /* cleanup the idxptr mode when transaction commits */
319 int ltdb_index_transaction_commit(struct ldb_module *module)
320 {
321         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
322         int ret;
323
324         if (ltdb->idxptr->itdb) {
325                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
326                 tdb_close(ltdb->idxptr->itdb);
327         }
328
329         ret = ltdb->idxptr->error;
330
331         if (ret != LDB_SUCCESS) {
332                 struct ldb_context *ldb = ldb_module_get_ctx(module);
333                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
334         }
335
336         talloc_free(ltdb->idxptr);
337         ltdb->idxptr = NULL;
338         return ret;
339 }
340
341 /* cleanup the idxptr mode when transaction cancels */
342 int ltdb_index_transaction_cancel(struct ldb_module *module)
343 {
344         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
345         if (ltdb->idxptr && ltdb->idxptr->itdb) {
346                 tdb_close(ltdb->idxptr->itdb);
347         }
348         talloc_free(ltdb->idxptr);
349         ltdb->idxptr = NULL;
350         return LDB_SUCCESS;
351 }
352
353
354 /*
355   return the dn key to be used for an index
356   the caller is responsible for freeing
357 */
358 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
359                                      const char *attr, const struct ldb_val *value,
360                                      const struct ldb_schema_attribute **ap)
361 {
362         struct ldb_dn *ret;
363         struct ldb_val v;
364         const struct ldb_schema_attribute *a;
365         char *attr_folded;
366         int r;
367
368         attr_folded = ldb_attr_casefold(ldb, attr);
369         if (!attr_folded) {
370                 return NULL;
371         }
372
373         a = ldb_schema_attribute_by_name(ldb, attr);
374         if (ap) {
375                 *ap = a;
376         }
377         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
378         if (r != LDB_SUCCESS) {
379                 const char *errstr = ldb_errstring(ldb);
380                 /* canonicalisation can be refused. For example, 
381                    a attribute that takes wildcards will refuse to canonicalise
382                    if the value contains a wildcard */
383                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
384                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
385                 talloc_free(attr_folded);
386                 return NULL;
387         }
388         if (ldb_should_b64_encode(ldb, &v)) {
389                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
390                 if (!vstr) return NULL;
391                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
392                 talloc_free(vstr);
393         } else {
394                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
395         }
396
397         if (v.data != value->data) {
398                 talloc_free(v.data);
399         }
400         talloc_free(attr_folded);
401
402         return ret;
403 }
404
405 /*
406   see if a attribute value is in the list of indexed attributes
407 */
408 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
409 {
410         unsigned int i;
411         struct ldb_message_element *el;
412
413         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
414         if (el == NULL) {
415                 return false;
416         }
417         for (i=0; i<el->num_values; i++) {
418                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
419                         return true;
420                 }
421         }
422         return false;
423 }
424
425 /*
426   in the following logic functions, the return value is treated as
427   follows:
428
429      LDB_SUCCESS: we found some matching index values
430
431      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
432
433      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
434                                we'll need a full search
435  */
436
437 /*
438   return a list of dn's that might match a simple indexed search (an
439   equality search only)
440  */
441 static int ltdb_index_dn_simple(struct ldb_module *module,
442                                 const struct ldb_parse_tree *tree,
443                                 const struct ldb_message *index_list,
444                                 struct dn_list *list)
445 {
446         struct ldb_context *ldb;
447         struct ldb_dn *dn;
448         int ret;
449
450         ldb = ldb_module_get_ctx(module);
451
452         list->count = 0;
453         list->dn = NULL;
454
455         /* if the attribute isn't in the list of indexed attributes then
456            this node needs a full search */
457         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
458                 return LDB_ERR_OPERATIONS_ERROR;
459         }
460
461         /* the attribute is indexed. Pull the list of DNs that match the 
462            search criterion */
463         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
464         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
465
466         ret = ltdb_dn_list_load(module, dn, list);
467         talloc_free(dn);
468         return ret;
469 }
470
471
472 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
473
474 /*
475   return a list of dn's that might match a leaf indexed search
476  */
477 static int ltdb_index_dn_leaf(struct ldb_module *module,
478                               const struct ldb_parse_tree *tree,
479                               const struct ldb_message *index_list,
480                               struct dn_list *list)
481 {
482         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
483                 list->dn = talloc_array(list, struct ldb_val, 1);
484                 if (list->dn == NULL) {
485                         ldb_module_oom(module);
486                         return LDB_ERR_OPERATIONS_ERROR;
487                 }
488                 list->dn[0] = tree->u.equality.value;
489                 list->count = 1;
490                 return LDB_SUCCESS;
491         }
492         return ltdb_index_dn_simple(module, tree, index_list, list);
493 }
494
495
496 /*
497   list intersection
498   list = list & list2
499 */
500 static bool list_intersect(struct ldb_context *ldb,
501                            struct dn_list *list, const struct dn_list *list2)
502 {
503         struct dn_list *list3;
504         unsigned int i;
505
506         if (list->count == 0) {
507                 /* 0 & X == 0 */
508                 return true;
509         }
510         if (list2->count == 0) {
511                 /* X & 0 == 0 */
512                 list->count = 0;
513                 list->dn = NULL;
514                 return true;
515         }
516
517         /* the indexing code is allowed to return a longer list than
518            what really matches, as all results are filtered by the
519            full expression at the end - this shortcut avoids a lot of
520            work in some cases */
521         if (list->count < 2 && list2->count > 10) {
522                 return true;
523         }
524         if (list2->count < 2 && list->count > 10) {
525                 list->count = list2->count;
526                 list->dn = list2->dn;
527                 /* note that list2 may not be the parent of list2->dn,
528                    as list2->dn may be owned by ltdb->idxptr. In that
529                    case we expect this reparent call to fail, which is
530                    OK */
531                 talloc_reparent(list2, list, list2->dn);
532                 return true;
533         }
534
535         list3 = talloc_zero(list, struct dn_list);
536         if (list3 == NULL) {
537                 return false;
538         }
539
540         list3->dn = talloc_array(list3, struct ldb_val, list->count);
541         if (!list3->dn) {
542                 talloc_free(list3);
543                 return false;
544         }
545         list3->count = 0;
546
547         for (i=0;i<list->count;i++) {
548                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
549                         list3->dn[list3->count] = list->dn[i];
550                         list3->count++;
551                 }
552         }
553
554         list->dn = talloc_steal(list, list3->dn);
555         list->count = list3->count;
556         talloc_free(list3);
557
558         return true;
559 }
560
561
562 /*
563   list union
564   list = list | list2
565 */
566 static bool list_union(struct ldb_context *ldb,
567                        struct dn_list *list, const struct dn_list *list2)
568 {
569         struct ldb_val *dn3;
570
571         if (list2->count == 0) {
572                 /* X | 0 == X */
573                 return true;
574         }
575
576         if (list->count == 0) {
577                 /* 0 | X == X */
578                 list->count = list2->count;
579                 list->dn = list2->dn;
580                 /* note that list2 may not be the parent of list2->dn,
581                    as list2->dn may be owned by ltdb->idxptr. In that
582                    case we expect this reparent call to fail, which is
583                    OK */
584                 talloc_reparent(list2, list, list2->dn);
585                 return true;
586         }
587
588         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
589         if (!dn3) {
590                 ldb_oom(ldb);
591                 return false;
592         }
593
594         /* we allow for duplicates here, and get rid of them later */
595         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
596         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
597
598         list->dn = dn3;
599         list->count += list2->count;
600
601         return true;
602 }
603
604 static int ltdb_index_dn(struct ldb_module *module,
605                          const struct ldb_parse_tree *tree,
606                          const struct ldb_message *index_list,
607                          struct dn_list *list);
608
609
610 /*
611   process an OR list (a union)
612  */
613 static int ltdb_index_dn_or(struct ldb_module *module,
614                             const struct ldb_parse_tree *tree,
615                             const struct ldb_message *index_list,
616                             struct dn_list *list)
617 {
618         struct ldb_context *ldb;
619         unsigned int i;
620
621         ldb = ldb_module_get_ctx(module);
622
623         list->dn = NULL;
624         list->count = 0;
625
626         for (i=0; i<tree->u.list.num_elements; i++) {
627                 struct dn_list *list2;
628                 int ret;
629
630                 list2 = talloc_zero(list, struct dn_list);
631                 if (list2 == NULL) {
632                         return LDB_ERR_OPERATIONS_ERROR;
633                 }
634
635                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
636
637                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
638                         /* X || 0 == X */
639                         talloc_free(list2);
640                         continue;
641                 }
642
643                 if (ret != LDB_SUCCESS) {
644                         /* X || * == * */
645                         talloc_free(list2);
646                         return ret;
647                 }
648
649                 if (!list_union(ldb, list, list2)) {
650                         talloc_free(list2);
651                         return LDB_ERR_OPERATIONS_ERROR;
652                 }
653         }
654
655         if (list->count == 0) {
656                 return LDB_ERR_NO_SUCH_OBJECT;
657         }
658
659         return LDB_SUCCESS;
660 }
661
662
663 /*
664   NOT an index results
665  */
666 static int ltdb_index_dn_not(struct ldb_module *module,
667                              const struct ldb_parse_tree *tree,
668                              const struct ldb_message *index_list,
669                              struct dn_list *list)
670 {
671         /* the only way to do an indexed not would be if we could
672            negate the not via another not or if we knew the total
673            number of database elements so we could know that the
674            existing expression covered the whole database.
675
676            instead, we just give up, and rely on a full index scan
677            (unless an outer & manages to reduce the list)
678         */
679         return LDB_ERR_OPERATIONS_ERROR;
680 }
681
682
683 static bool ltdb_index_unique(struct ldb_context *ldb,
684                               const char *attr)
685 {
686         const struct ldb_schema_attribute *a;
687         a = ldb_schema_attribute_by_name(ldb, attr);
688         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
689                 return true;
690         }
691         return false;
692 }
693
694 /*
695   process an AND expression (intersection)
696  */
697 static int ltdb_index_dn_and(struct ldb_module *module,
698                              const struct ldb_parse_tree *tree,
699                              const struct ldb_message *index_list,
700                              struct dn_list *list)
701 {
702         struct ldb_context *ldb;
703         unsigned int i;
704         bool found;
705
706         ldb = ldb_module_get_ctx(module);
707
708         list->dn = NULL;
709         list->count = 0;
710
711         /* in the first pass we only look for unique simple
712            equality tests, in the hope of avoiding having to look
713            at any others */
714         for (i=0; i<tree->u.list.num_elements; i++) {
715                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
716                 int ret;
717
718                 if (subtree->operation != LDB_OP_EQUALITY ||
719                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
720                         continue;
721                 }
722                 
723                 ret = ltdb_index_dn(module, subtree, index_list, list);
724                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
725                         /* 0 && X == 0 */
726                         return LDB_ERR_NO_SUCH_OBJECT;
727                 }
728                 if (ret == LDB_SUCCESS) {
729                         /* a unique index match means we can
730                          * stop. Note that we don't care if we return
731                          * a few too many objects, due to later
732                          * filtering */
733                         return LDB_SUCCESS;
734                 }
735         }       
736
737         /* now do a full intersection */
738         found = false;
739
740         for (i=0; i<tree->u.list.num_elements; i++) {
741                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
742                 struct dn_list *list2;
743                 int ret;
744
745                 list2 = talloc_zero(list, struct dn_list);
746                 if (list2 == NULL) {
747                         ldb_module_oom(module);
748                         return LDB_ERR_OPERATIONS_ERROR;
749                 }
750                         
751                 ret = ltdb_index_dn(module, subtree, index_list, list2);
752
753                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
754                         /* X && 0 == 0 */
755                         list->dn = NULL;
756                         list->count = 0;
757                         talloc_free(list2);
758                         return LDB_ERR_NO_SUCH_OBJECT;
759                 }
760                 
761                 if (ret != LDB_SUCCESS) {
762                         /* this didn't adding anything */
763                         talloc_free(list2);
764                         continue;
765                 }
766
767                 if (!found) {
768                         talloc_reparent(list2, list, list->dn);
769                         list->dn = list2->dn;
770                         list->count = list2->count;
771                         found = true;
772                 } else if (!list_intersect(ldb, list, list2)) {
773                         talloc_free(list2);
774                         return LDB_ERR_OPERATIONS_ERROR;
775                 }
776                         
777                 if (list->count == 0) {
778                         list->dn = NULL;
779                         return LDB_ERR_NO_SUCH_OBJECT;
780                 }
781                         
782                 if (list->count < 2) {
783                         /* it isn't worth loading the next part of the tree */
784                         return LDB_SUCCESS;
785                 }
786         }       
787
788         if (!found) {
789                 /* none of the attributes were indexed */
790                 return LDB_ERR_OPERATIONS_ERROR;
791         }
792
793         return LDB_SUCCESS;
794 }
795         
796 /*
797   return a list of matching objects using a one-level index
798  */
799 static int ltdb_index_dn_one(struct ldb_module *module,
800                              struct ldb_dn *parent_dn,
801                              struct dn_list *list)
802 {
803         struct ldb_context *ldb;
804         struct ldb_dn *key;
805         struct ldb_val val;
806         int ret;
807
808         ldb = ldb_module_get_ctx(module);
809
810         /* work out the index key from the parent DN */
811         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
812         val.length = strlen((char *)val.data);
813         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
814         if (!key) {
815                 ldb_oom(ldb);
816                 return LDB_ERR_OPERATIONS_ERROR;
817         }
818
819         ret = ltdb_dn_list_load(module, key, list);
820         talloc_free(key);
821         if (ret != LDB_SUCCESS) {
822                 return ret;
823         }
824
825         if (list->count == 0) {
826                 return LDB_ERR_NO_SUCH_OBJECT;
827         }
828
829         return LDB_SUCCESS;
830 }
831
832 /*
833   return a list of dn's that might match a indexed search or
834   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
835  */
836 static int ltdb_index_dn(struct ldb_module *module,
837                          const struct ldb_parse_tree *tree,
838                          const struct ldb_message *index_list,
839                          struct dn_list *list)
840 {
841         int ret = LDB_ERR_OPERATIONS_ERROR;
842
843         switch (tree->operation) {
844         case LDB_OP_AND:
845                 ret = ltdb_index_dn_and(module, tree, index_list, list);
846                 break;
847
848         case LDB_OP_OR:
849                 ret = ltdb_index_dn_or(module, tree, index_list, list);
850                 break;
851
852         case LDB_OP_NOT:
853                 ret = ltdb_index_dn_not(module, tree, index_list, list);
854                 break;
855
856         case LDB_OP_EQUALITY:
857                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
858                 break;
859
860         case LDB_OP_SUBSTRING:
861         case LDB_OP_GREATER:
862         case LDB_OP_LESS:
863         case LDB_OP_PRESENT:
864         case LDB_OP_APPROX:
865         case LDB_OP_EXTENDED:
866                 /* we can't index with fancy bitops yet */
867                 ret = LDB_ERR_OPERATIONS_ERROR;
868                 break;
869         }
870
871         return ret;
872 }
873
874 /*
875   filter a candidate dn_list from an indexed search into a set of results
876   extracting just the given attributes
877 */
878 static int ltdb_index_filter(const struct dn_list *dn_list,
879                              struct ltdb_context *ac, 
880                              uint32_t *match_count)
881 {
882         struct ldb_context *ldb;
883         struct ldb_message *msg;
884         unsigned int i;
885
886         ldb = ldb_module_get_ctx(ac->module);
887
888         for (i = 0; i < dn_list->count; i++) {
889                 struct ldb_dn *dn;
890                 int ret;
891
892                 msg = ldb_msg_new(ac);
893                 if (!msg) {
894                         return LDB_ERR_OPERATIONS_ERROR;
895                 }
896
897                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
898                 if (dn == NULL) {
899                         talloc_free(msg);
900                         return LDB_ERR_OPERATIONS_ERROR;
901                 }
902
903                 ret = ltdb_search_dn1(ac->module, dn, msg);
904                 talloc_free(dn);
905                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
906                         /* the record has disappeared? yes, this can happen */
907                         talloc_free(msg);
908                         continue;
909                 }
910
911                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
912                         /* an internal error */
913                         talloc_free(msg);
914                         return LDB_ERR_OPERATIONS_ERROR;
915                 }
916
917                 if (!ldb_match_msg(ldb, msg,
918                                    ac->tree, ac->base, ac->scope)) {
919                         talloc_free(msg);
920                         continue;
921                 }
922
923                 /* filter the attributes that the user wants */
924                 ret = ltdb_filter_attrs(msg, ac->attrs);
925
926                 if (ret == -1) {
927                         talloc_free(msg);
928                         return LDB_ERR_OPERATIONS_ERROR;
929                 }
930
931                 ret = ldb_module_send_entry(ac->req, msg, NULL);
932                 if (ret != LDB_SUCCESS) {
933                         ac->request_terminated = true;
934                         return ret;
935                 }
936
937                 (*match_count)++;
938         }
939
940         return LDB_SUCCESS;
941 }
942
943 /*
944   remove any duplicated entries in a indexed result
945  */
946 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
947 {
948         int i, new_count;
949
950         if (list->count < 2) {
951                 return;
952         }
953
954         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
955
956         new_count = 1;
957         for (i=1; i<list->count; i++) {
958                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
959                         if (new_count != i) {
960                                 list->dn[new_count] = list->dn[i];
961                         }
962                         new_count++;
963                 }
964         }
965         
966         list->count = new_count;
967 }
968
969 /*
970   search the database with a LDAP-like expression using indexes
971   returns -1 if an indexed search is not possible, in which
972   case the caller should call ltdb_search_full()
973 */
974 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
975 {
976         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
977         struct dn_list *dn_list;
978         int ret;
979
980         /* see if indexing is enabled */
981         if (!ltdb->cache->attribute_indexes && 
982             !ltdb->cache->one_level_indexes &&
983             ac->scope != LDB_SCOPE_BASE) {
984                 /* fallback to a full search */
985                 return LDB_ERR_OPERATIONS_ERROR;
986         }
987
988         dn_list = talloc_zero(ac, struct dn_list);
989         if (dn_list == NULL) {
990                 ldb_module_oom(ac->module);
991                 return LDB_ERR_OPERATIONS_ERROR;
992         }
993
994         switch (ac->scope) {
995         case LDB_SCOPE_BASE:
996                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
997                 if (dn_list->dn == NULL) {
998                         ldb_module_oom(ac->module);
999                         talloc_free(dn_list);
1000                         return LDB_ERR_OPERATIONS_ERROR;
1001                 }
1002                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1003                 if (dn_list->dn[0].data == NULL) {
1004                         ldb_module_oom(ac->module);
1005                         talloc_free(dn_list);
1006                         return LDB_ERR_OPERATIONS_ERROR;
1007                 }
1008                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1009                 dn_list->count = 1;
1010                 break;          
1011
1012         case LDB_SCOPE_ONELEVEL:
1013                 if (!ltdb->cache->one_level_indexes) {
1014                         talloc_free(dn_list);
1015                         return LDB_ERR_OPERATIONS_ERROR;
1016                 }
1017                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1018                 if (ret != LDB_SUCCESS) {
1019                         talloc_free(dn_list);
1020                         return ret;
1021                 }
1022                 break;
1023
1024         case LDB_SCOPE_SUBTREE:
1025         case LDB_SCOPE_DEFAULT:
1026                 if (!ltdb->cache->attribute_indexes) {
1027                         talloc_free(dn_list);
1028                         return LDB_ERR_OPERATIONS_ERROR;
1029                 }
1030                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1031                 if (ret != LDB_SUCCESS) {
1032                         talloc_free(dn_list);
1033                         return ret;
1034                 }
1035                 ltdb_dn_list_remove_duplicates(dn_list);
1036                 break;
1037         }
1038
1039         ret = ltdb_index_filter(dn_list, ac, match_count);
1040         talloc_free(dn_list);
1041         return ret;
1042 }
1043
1044 /*
1045   add an index entry for one message element
1046 */
1047 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1048                            struct ldb_message_element *el, int v_idx)
1049 {
1050         struct ldb_context *ldb;
1051         struct ldb_dn *dn_key;
1052         int ret;
1053         const struct ldb_schema_attribute *a;
1054         struct dn_list *list;
1055
1056         ldb = ldb_module_get_ctx(module);
1057
1058         list = talloc_zero(module, struct dn_list);
1059         if (list == NULL) {
1060                 return LDB_ERR_OPERATIONS_ERROR;
1061         }
1062
1063         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1064         if (!dn_key) {
1065                 talloc_free(list);
1066                 return LDB_ERR_OPERATIONS_ERROR;
1067         }
1068         talloc_steal(list, dn_key);
1069
1070         ret = ltdb_dn_list_load(module, dn_key, list);
1071         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1072                 talloc_free(list);
1073                 return ret;
1074         }
1075
1076         if (ltdb_dn_list_find_str(list, dn) != -1) {
1077                 talloc_free(list);
1078                 return LDB_SUCCESS;
1079         }
1080
1081         if (list->count > 0 &&
1082             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1083                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1084         }
1085
1086         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
1087         if (list->dn == NULL) {
1088                 talloc_free(list);
1089                 return LDB_ERR_OPERATIONS_ERROR;
1090         }
1091         list->dn[list->count].data = discard_const_p(unsigned char, dn);
1092         list->dn[list->count].length = strlen(dn);
1093         list->count++;
1094
1095         ret = ltdb_dn_list_store(module, dn_key, list);
1096
1097         talloc_free(list);
1098
1099         return ret;
1100 }
1101
1102 /*
1103   add index entries for one elements in a message
1104  */
1105 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1106                              struct ldb_message_element *el)
1107 {
1108         unsigned int i;
1109         for (i = 0; i < el->num_values; i++) {
1110                 int ret = ltdb_index_add1(module, dn, el, i);
1111                 if (ret != LDB_SUCCESS) {
1112                         return ret;
1113                 }
1114         }
1115
1116         return LDB_SUCCESS;
1117 }
1118
1119 /*
1120   add index entries for all elements in a message
1121  */
1122 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1123                               struct ldb_message_element *elements, int num_el)
1124 {
1125         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1126         unsigned int i;
1127
1128         if (dn[0] == '@') {
1129                 return LDB_SUCCESS;
1130         }
1131
1132         if (ltdb->cache->indexlist->num_elements == 0) {
1133                 /* no indexed fields */
1134                 return LDB_SUCCESS;
1135         }
1136
1137         for (i = 0; i < num_el; i++) {
1138                 int ret;
1139                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1140                         continue;
1141                 }
1142                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1143                 if (ret != LDB_SUCCESS) {
1144                         return ret;
1145                 }
1146         }
1147
1148         return LDB_SUCCESS;
1149 }
1150
1151
1152 /*
1153   insert a one level index for a message
1154 */
1155 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1156 {
1157         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1158         struct ldb_message_element el;
1159         struct ldb_val val;
1160         struct ldb_dn *pdn;
1161         const char *dn;
1162         int ret;
1163
1164         /* We index for ONE Level only if requested */
1165         if (!ltdb->cache->one_level_indexes) {
1166                 return LDB_SUCCESS;
1167         }
1168
1169         pdn = ldb_dn_get_parent(module, msg->dn);
1170         if (pdn == NULL) {
1171                 return LDB_ERR_OPERATIONS_ERROR;
1172         }
1173
1174         dn = ldb_dn_get_linearized(msg->dn);
1175         if (dn == NULL) {
1176                 talloc_free(pdn);
1177                 return LDB_ERR_OPERATIONS_ERROR;
1178         }
1179
1180         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1181         if (val.data == NULL) {
1182                 talloc_free(pdn);
1183                 return LDB_ERR_OPERATIONS_ERROR;
1184         }
1185
1186         val.length = strlen((char *)val.data);
1187         el.name = LTDB_IDXONE;
1188         el.values = &val;
1189         el.num_values = 1;
1190
1191         if (add) {
1192                 ret = ltdb_index_add1(module, dn, &el, 0);
1193         } else { /* delete */
1194                 ret = ltdb_index_del_value(module, dn, &el, 0);
1195         }
1196
1197         talloc_free(pdn);
1198
1199         return ret;
1200 }
1201
1202 /*
1203   add the index entries for a new element in a record
1204   The caller guarantees that these element values are not yet indexed
1205 */
1206 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1207                            struct ldb_message_element *el)
1208 {
1209         if (ldb_dn_is_special(dn)) {
1210                 return LDB_SUCCESS;
1211         }
1212         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1213 }
1214
1215 /*
1216   add the index entries for a new record
1217 */
1218 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1219 {
1220         const char *dn;
1221         int ret;
1222
1223         if (ldb_dn_is_special(msg->dn)) {
1224                 return LDB_SUCCESS;
1225         }
1226
1227         dn = ldb_dn_get_linearized(msg->dn);
1228         if (dn == NULL) {
1229                 return LDB_ERR_OPERATIONS_ERROR;
1230         }
1231
1232         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1233         if (ret != LDB_SUCCESS) {
1234                 return ret;
1235         }
1236
1237         return ltdb_index_onelevel(module, msg, 1);
1238 }
1239
1240
1241 /*
1242   delete an index entry for one message element
1243 */
1244 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1245                          struct ldb_message_element *el, int v_idx)
1246 {
1247         struct ldb_context *ldb;
1248         struct ldb_dn *dn_key;
1249         int ret, i;
1250         struct dn_list *list;
1251
1252         ldb = ldb_module_get_ctx(module);
1253
1254         if (dn[0] == '@') {
1255                 return LDB_SUCCESS;
1256         }
1257
1258         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1259         if (!dn_key) {
1260                 return LDB_ERR_OPERATIONS_ERROR;
1261         }
1262
1263         list = talloc_zero(dn_key, struct dn_list);
1264         if (list == NULL) {
1265                 talloc_free(dn_key);
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         ret = ltdb_dn_list_load(module, dn_key, list);
1270         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1271                 /* it wasn't indexed. Did we have an earlier error? If we did then
1272                    its gone now */
1273                 talloc_free(dn_key);
1274                 return LDB_SUCCESS;
1275         }
1276
1277         if (ret != LDB_SUCCESS) {
1278                 talloc_free(dn_key);
1279                 return ret;
1280         }
1281
1282         i = ltdb_dn_list_find_str(list, dn);
1283         if (i == -1) {
1284                 /* nothing to delete */
1285                 talloc_free(dn_key);
1286                 return LDB_SUCCESS;             
1287         }
1288
1289         if (i != list->count-1) {
1290                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1291         }
1292         list->count--;
1293         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1294
1295         ret = ltdb_dn_list_store(module, dn_key, list);
1296
1297         talloc_free(dn_key);
1298
1299         return ret;
1300 }
1301
1302 /*
1303   delete the index entries for a element
1304   return -1 on failure
1305 */
1306 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1307 {
1308         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1309         int ret;
1310         unsigned int i;
1311
1312         if (!ltdb->cache->attribute_indexes) {
1313                 /* no indexed fields */
1314                 return LDB_SUCCESS;
1315         }
1316
1317         if (dn[0] == '@') {
1318                 return LDB_SUCCESS;
1319         }
1320
1321         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1322                 return LDB_SUCCESS;
1323         }
1324         for (i = 0; i < el->num_values; i++) {
1325                 ret = ltdb_index_del_value(module, dn, el, i);
1326                 if (ret != LDB_SUCCESS) {
1327                         return ret;
1328                 }
1329         }
1330
1331         return LDB_SUCCESS;
1332 }
1333
1334 /*
1335   delete the index entries for a record
1336   return -1 on failure
1337 */
1338 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1339 {
1340         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1341         int ret;
1342         const char *dn;
1343         unsigned int i;
1344
1345         if (ldb_dn_is_special(msg->dn)) {
1346                 return LDB_SUCCESS;
1347         }
1348
1349         ret = ltdb_index_onelevel(module, msg, 0);
1350         if (ret != LDB_SUCCESS) {
1351                 return ret;
1352         }
1353
1354         if (!ltdb->cache->attribute_indexes) {
1355                 /* no indexed fields */
1356                 return LDB_SUCCESS;
1357         }
1358
1359         dn = ldb_dn_get_linearized(msg->dn);
1360         if (dn == NULL) {
1361                 return LDB_ERR_OPERATIONS_ERROR;
1362         }
1363
1364         for (i = 0; i < msg->num_elements; i++) {
1365                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1366                 if (ret != LDB_SUCCESS) {
1367                         return ret;
1368                 }
1369         }
1370
1371         return LDB_SUCCESS;
1372 }
1373
1374
1375 /*
1376   traversal function that deletes all @INDEX records
1377 */
1378 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1379 {
1380         const char *dn = "DN=" LTDB_INDEX ":";
1381         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1382                 return tdb_delete(tdb, key);
1383         }
1384         return 0;
1385 }
1386
1387 /*
1388   traversal function that adds @INDEX records during a re index
1389 */
1390 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1391 {
1392         struct ldb_context *ldb;
1393         struct ldb_module *module = (struct ldb_module *)state;
1394         struct ldb_message *msg;
1395         const char *dn = NULL;
1396         int ret;
1397         TDB_DATA key2;
1398
1399         ldb = ldb_module_get_ctx(module);
1400
1401         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1402             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1403                 return 0;
1404         }
1405
1406         msg = talloc(module, struct ldb_message);
1407         if (msg == NULL) {
1408                 return -1;
1409         }
1410
1411         ret = ltdb_unpack_data(module, &data, msg);
1412         if (ret != 0) {
1413                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1414                           ldb_dn_get_linearized(msg->dn));
1415                 talloc_free(msg);
1416                 return -1;
1417         }
1418
1419         /* check if the DN key has changed, perhaps due to the
1420            case insensitivity of an element changing */
1421         key2 = ltdb_key(module, msg->dn);
1422         if (key2.dptr == NULL) {
1423                 /* probably a corrupt record ... darn */
1424                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1425                                                         ldb_dn_get_linearized(msg->dn));
1426                 talloc_free(msg);
1427                 return 0;
1428         }
1429         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1430                 tdb_delete(tdb, key);
1431                 tdb_store(tdb, key2, data, 0);
1432         }
1433         talloc_free(key2.dptr);
1434
1435         if (msg->dn == NULL) {
1436                 dn = (char *)key.dptr + 3;
1437         } else {
1438                 dn = ldb_dn_get_linearized(msg->dn);
1439         }
1440
1441         ret = ltdb_index_onelevel(module, msg, 1);
1442         if (ret != LDB_SUCCESS) {
1443                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1444                         "Adding special ONE LEVEL index failed (%s)!",
1445                         ldb_dn_get_linearized(msg->dn));
1446                 talloc_free(msg);
1447                 return -1;
1448         }
1449
1450         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1451
1452         talloc_free(msg);
1453
1454         if (ret != LDB_SUCCESS) return -1;
1455
1456         return 0;
1457 }
1458
1459 /*
1460   force a complete reindex of the database
1461 */
1462 int ltdb_reindex(struct ldb_module *module)
1463 {
1464         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1465         int ret;
1466
1467         if (ltdb_cache_reload(module) != 0) {
1468                 return LDB_ERR_OPERATIONS_ERROR;
1469         }
1470
1471         /* first traverse the database deleting any @INDEX records */
1472         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1473         if (ret == -1) {
1474                 return LDB_ERR_OPERATIONS_ERROR;
1475         }
1476
1477         /* if we don't have indexes we have nothing todo */
1478         if (ltdb->cache->indexlist->num_elements == 0) {
1479                 return LDB_SUCCESS;
1480         }
1481
1482         /* now traverse adding any indexes for normal LDB records */
1483         ret = tdb_traverse(ltdb->tdb, re_index, module);
1484         if (ret == -1) {
1485                 return LDB_ERR_OPERATIONS_ERROR;
1486         }
1487
1488         if (ltdb->idxptr) {
1489                 ltdb->idxptr->repack = true;
1490         }
1491
1492         return LDB_SUCCESS;
1493 }