s4-ldb: when taking a list intersection, the result can be as long as the first list
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 /*
99   return the @IDX list in an index entry for a dn as a 
100   struct dn_list
101  */
102 static int ltdb_dn_list_load(struct ldb_module *module,
103                              struct ldb_dn *dn, struct dn_list *list)
104 {
105         struct ldb_message *msg;
106         int ret;
107         struct ldb_message_element *el;
108         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
109         TDB_DATA rec;
110         struct dn_list *list2;
111         TDB_DATA key;
112
113         list->dn = NULL;
114         list->count = 0;
115
116         /* see if we have any in-memory index entries */
117         if (ltdb->idxptr == NULL ||
118             ltdb->idxptr->itdb == NULL) {
119                 goto normal_index;
120         }
121
122         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
123         key.dsize = strlen((char *)key.dptr);
124
125         rec = tdb_fetch(ltdb->idxptr->itdb, key);
126         if (rec.dptr == NULL) {
127                 goto normal_index;
128         }
129
130         /* we've found an in-memory index entry */
131         if (rec.dsize != sizeof(void *)) {
132                 free(rec.dptr);
133                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
134                                        "Bad internal index size %u", (unsigned)rec.dsize);
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137         list2 = *(struct dn_list **)rec.dptr;
138         free(rec.dptr);
139
140         *list = *list2;
141         return LDB_SUCCESS;
142
143 normal_index:
144         msg = ldb_msg_new(list);
145         if (msg == NULL) {
146                 return LDB_ERR_OPERATIONS_ERROR;
147         }
148
149         ret = ltdb_search_dn1(module, dn, msg);
150         if (ret != LDB_SUCCESS) {
151                 return ret;
152         }
153
154         /* TODO: check indexing version number */
155
156         el = ldb_msg_find_element(msg, LTDB_IDX);
157         if (!el) {
158                 talloc_free(msg);
159                 return LDB_SUCCESS;
160         }
161
162         /* we avoid copying the strings by stealing the list */
163         list->dn = talloc_steal(list, el->values);
164         list->count = el->num_values;
165
166         return LDB_SUCCESS;
167 }
168
169
170 /*
171   save a dn_list into a full @IDX style record
172  */
173 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
174                                    struct dn_list *list)
175 {
176         struct ldb_message *msg;
177         int ret;
178
179         msg = ldb_msg_new(module);
180         if (!msg) {
181                 ldb_module_oom(module);
182                 return LDB_ERR_OPERATIONS_ERROR;
183         }
184
185         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
186         if (ret != LDB_SUCCESS) {
187                 ldb_module_oom(module);
188                 return LDB_ERR_OPERATIONS_ERROR;
189         }
190
191         msg->dn = dn;
192         if (list->count > 0) {
193                 struct ldb_message_element *el;
194
195                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
196                 if (ret != LDB_SUCCESS) {
197                         ldb_module_oom(module);
198                         talloc_free(msg);
199                         return LDB_ERR_OPERATIONS_ERROR;
200                 }
201                 el->values = list->dn;
202                 el->num_values = list->count;
203         }
204
205         ret = ltdb_store(module, msg, TDB_REPLACE);
206         talloc_free(msg);
207         return ret;
208 }
209
210 /*
211   save a dn_list into the database, in either @IDX or internal format
212  */
213 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
214                               struct dn_list *list)
215 {
216         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
217         TDB_DATA rec, key;
218         int ret;
219         struct dn_list *list2;
220
221         if (ltdb->idxptr == NULL) {
222                 return ltdb_dn_list_store_full(module, dn, list);
223         }
224
225         if (ltdb->idxptr->itdb == NULL) {
226                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
227                 if (ltdb->idxptr->itdb == NULL) {
228                         return LDB_ERR_OPERATIONS_ERROR;
229                 }
230         }
231
232         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
233         key.dsize = strlen((char *)key.dptr);
234
235         rec = tdb_fetch(ltdb->idxptr->itdb, key);
236         if (rec.dptr != NULL) {
237                 if (rec.dsize != sizeof(void *)) {
238                         free(rec.dptr);
239                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
240                                                "Bad internal index size %u", (unsigned)rec.dsize);
241                         return LDB_ERR_OPERATIONS_ERROR;
242                 }
243                 list2 = *(struct dn_list **)rec.dptr;
244                 free(rec.dptr);
245                 list2->dn = talloc_steal(list2, list->dn);
246                 list2->count = list->count;
247                 return LDB_SUCCESS;
248         }
249
250         list2 = talloc(ltdb->idxptr, struct dn_list);
251         if (list2 == NULL) {
252                 return LDB_ERR_OPERATIONS_ERROR;
253         }
254         list2->dn = talloc_steal(list2, list->dn);
255         list2->count = list->count;
256
257         rec.dptr = (uint8_t *)&list2;
258         rec.dsize = sizeof(void *);
259
260         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
261
262         return ret;     
263 }
264
265 /*
266   traverse function for storing the in-memory index entries on disk
267  */
268 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
269 {
270         struct ldb_module *module = state;
271         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
272         struct ldb_dn *dn;
273         struct ldb_context *ldb = ldb_module_get_ctx(module);
274         struct ldb_val v;
275         struct dn_list *list;
276
277         if (data.dsize != sizeof(void *)) {
278                 ldb_asprintf_errstring(ldb, "Bad internal index size %u", (unsigned)data.dsize);
279                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
280                 return -1;
281         }
282         
283         list = *(struct dn_list **)data.dptr;
284
285         v.data = key.dptr;
286         v.length = key.dsize;
287
288         dn = ldb_dn_from_ldb_val(module, ldb, &v);
289         if (dn == NULL) {
290                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
291                 return -1;
292         }
293
294         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
295         talloc_free(dn);
296         return ltdb->idxptr->error;
297 }
298
299 /* cleanup the idxptr mode when transaction commits */
300 int ltdb_index_transaction_commit(struct ldb_module *module)
301 {
302         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303         int ret;
304
305         if (ltdb->idxptr->itdb) {
306                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
307                 tdb_close(ltdb->idxptr->itdb);
308         }
309
310         ret = ltdb->idxptr->error;
311
312         if (ret != LDB_SUCCESS) {
313                 struct ldb_context *ldb = ldb_module_get_ctx(module);
314                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
315         }
316
317         talloc_free(ltdb->idxptr);
318         ltdb->idxptr = NULL;
319         return ret;
320 }
321
322 /* cleanup the idxptr mode when transaction cancels */
323 int ltdb_index_transaction_cancel(struct ldb_module *module)
324 {
325         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
326         if (ltdb->idxptr && ltdb->idxptr->itdb) {
327                 tdb_close(ltdb->idxptr->itdb);
328         }
329         talloc_free(ltdb->idxptr);
330         ltdb->idxptr = NULL;
331         return LDB_SUCCESS;
332 }
333
334
335 /*
336   return the dn key to be used for an index
337   the caller is responsible for freeing
338 */
339 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
340                                      const char *attr, const struct ldb_val *value,
341                                      const struct ldb_schema_attribute **ap)
342 {
343         struct ldb_dn *ret;
344         struct ldb_val v;
345         const struct ldb_schema_attribute *a;
346         char *attr_folded;
347         int r;
348
349         attr_folded = ldb_attr_casefold(ldb, attr);
350         if (!attr_folded) {
351                 return NULL;
352         }
353
354         a = ldb_schema_attribute_by_name(ldb, attr);
355         if (ap) {
356                 *ap = a;
357         }
358         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
359         if (r != LDB_SUCCESS) {
360                 const char *errstr = ldb_errstring(ldb);
361                 /* canonicalisation can be refused. For example, 
362                    a attribute that takes wildcards will refuse to canonicalise
363                    if the value contains a wildcard */
364                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
365                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
366                 talloc_free(attr_folded);
367                 return NULL;
368         }
369         if (ldb_should_b64_encode(ldb, &v)) {
370                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
371                 if (!vstr) return NULL;
372                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
373                 talloc_free(vstr);
374         } else {
375                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
376         }
377
378         if (v.data != value->data) {
379                 talloc_free(v.data);
380         }
381         talloc_free(attr_folded);
382
383         return ret;
384 }
385
386 /*
387   see if a attribute value is in the list of indexed attributes
388 */
389 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
390 {
391         unsigned int i;
392         struct ldb_message_element *el;
393
394         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
395         if (el == NULL) {
396                 return false;
397         }
398         for (i=0; i<el->num_values; i++) {
399                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
400                         return true;
401                 }
402         }
403         return false;
404 }
405
406 /*
407   in the following logic functions, the return value is treated as
408   follows:
409
410      LDB_SUCCESS: we found some matching index values
411
412      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
413
414      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
415                                we'll need a full search
416  */
417
418 /*
419   return a list of dn's that might match a simple indexed search (an
420   equality search only)
421  */
422 static int ltdb_index_dn_simple(struct ldb_module *module,
423                                 const struct ldb_parse_tree *tree,
424                                 const struct ldb_message *index_list,
425                                 struct dn_list *list)
426 {
427         struct ldb_context *ldb;
428         struct ldb_dn *dn;
429         int ret;
430
431         ldb = ldb_module_get_ctx(module);
432
433         list->count = 0;
434         list->dn = NULL;
435
436         /* if the attribute isn't in the list of indexed attributes then
437            this node needs a full search */
438         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
439                 return LDB_ERR_OPERATIONS_ERROR;
440         }
441
442         /* the attribute is indexed. Pull the list of DNs that match the 
443            search criterion */
444         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
445         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
446
447         ret = ltdb_dn_list_load(module, dn, list);
448         talloc_free(dn);
449         return ret;
450 }
451
452
453 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
454
455 /*
456   return a list of dn's that might match a leaf indexed search
457  */
458 static int ltdb_index_dn_leaf(struct ldb_module *module,
459                               const struct ldb_parse_tree *tree,
460                               const struct ldb_message *index_list,
461                               struct dn_list *list)
462 {
463         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
464                 list->dn = talloc_array(list, struct ldb_val, 1);
465                 if (list->dn == NULL) {
466                         ldb_module_oom(module);
467                         return LDB_ERR_OPERATIONS_ERROR;
468                 }
469                 list->dn[0] = tree->u.equality.value;
470                 list->count = 1;
471                 return LDB_SUCCESS;
472         }
473         return ltdb_index_dn_simple(module, tree, index_list, list);
474 }
475
476
477 /*
478   list intersection
479   list = list & list2
480 */
481 static bool list_intersect(struct ldb_context *ldb,
482                            struct dn_list *list, const struct dn_list *list2)
483 {
484         struct dn_list *list3;
485         unsigned int i;
486
487         if (list->count == 0) {
488                 /* 0 & X == 0 */
489                 return true;
490         }
491         if (list2->count == 0) {
492                 /* X & 0 == 0 */
493                 list->count = 0;
494                 list->dn = NULL;
495                 return true;
496         }
497
498         /* the indexing code is allowed to return a longer list than
499            what really matches, as all results are filtered by the
500            full expression at the end - this shortcut avoids a lot of
501            work in some cases */
502         if (list->count < 2 && list2->count > 10) {
503                 return true;
504         }
505         if (list2->count < 2 && list->count > 10) {
506                 list->count = list2->count;
507                 list->dn = list2->dn;
508                 /* note that list2 may not be the parent of list2->dn,
509                    as list2->dn may be owned by ltdb->idxptr. In that
510                    case we expect this reparent call to fail, which is
511                    OK */
512                 talloc_reparent(list2, list, list2->dn);
513                 return true;
514         }
515
516         list3 = talloc_zero(list, struct dn_list);
517         if (list3 == NULL) {
518                 return false;
519         }
520
521         list3->dn = talloc_array(list3, struct ldb_val, list->count);
522         if (!list3->dn) {
523                 talloc_free(list3);
524                 return false;
525         }
526         list3->count = 0;
527
528         for (i=0;i<list->count;i++) {
529                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
530                         list3->dn[list3->count] = list->dn[i];
531                         list3->count++;
532                 }
533         }
534
535         list->dn = talloc_steal(list, list3->dn);
536         list->count = list3->count;
537         talloc_free(list3);
538
539         return true;
540 }
541
542
543 /*
544   list union
545   list = list | list2
546 */
547 static bool list_union(struct ldb_context *ldb,
548                        struct dn_list *list, const struct dn_list *list2)
549 {
550         struct ldb_val *dn3;
551
552         if (list2->count == 0) {
553                 /* X | 0 == X */
554                 return true;
555         }
556
557         if (list->count == 0) {
558                 /* 0 | X == X */
559                 list->count = list2->count;
560                 list->dn = list2->dn;
561                 /* note that list2 may not be the parent of list2->dn,
562                    as list2->dn may be owned by ltdb->idxptr. In that
563                    case we expect this reparent call to fail, which is
564                    OK */
565                 talloc_reparent(list2, list, list2->dn);
566                 return true;
567         }
568
569         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
570         if (!dn3) {
571                 ldb_oom(ldb);
572                 return false;
573         }
574
575         /* we allow for duplicates here, and get rid of them later */
576         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
577         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
578
579         list->dn = dn3;
580         list->count += list2->count;
581
582         return true;
583 }
584
585 static int ltdb_index_dn(struct ldb_module *module,
586                          const struct ldb_parse_tree *tree,
587                          const struct ldb_message *index_list,
588                          struct dn_list *list);
589
590
591 /*
592   process an OR list (a union)
593  */
594 static int ltdb_index_dn_or(struct ldb_module *module,
595                             const struct ldb_parse_tree *tree,
596                             const struct ldb_message *index_list,
597                             struct dn_list *list)
598 {
599         struct ldb_context *ldb;
600         unsigned int i;
601
602         ldb = ldb_module_get_ctx(module);
603
604         list->dn = NULL;
605         list->count = 0;
606
607         for (i=0; i<tree->u.list.num_elements; i++) {
608                 struct dn_list *list2;
609                 int ret;
610
611                 list2 = talloc_zero(list, struct dn_list);
612                 if (list2 == NULL) {
613                         return LDB_ERR_OPERATIONS_ERROR;
614                 }
615
616                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
617
618                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
619                         /* X || 0 == X */
620                         talloc_free(list2);
621                         continue;
622                 }
623
624                 if (ret != LDB_SUCCESS) {
625                         /* X || * == * */
626                         talloc_free(list2);
627                         return ret;
628                 }
629
630                 if (!list_union(ldb, list, list2)) {
631                         talloc_free(list2);
632                         return LDB_ERR_OPERATIONS_ERROR;
633                 }
634         }
635
636         if (list->count == 0) {
637                 return LDB_ERR_NO_SUCH_OBJECT;
638         }
639
640         return LDB_SUCCESS;
641 }
642
643
644 /*
645   NOT an index results
646  */
647 static int ltdb_index_dn_not(struct ldb_module *module,
648                              const struct ldb_parse_tree *tree,
649                              const struct ldb_message *index_list,
650                              struct dn_list *list)
651 {
652         /* the only way to do an indexed not would be if we could
653            negate the not via another not or if we knew the total
654            number of database elements so we could know that the
655            existing expression covered the whole database.
656
657            instead, we just give up, and rely on a full index scan
658            (unless an outer & manages to reduce the list)
659         */
660         return LDB_ERR_OPERATIONS_ERROR;
661 }
662
663
664 static bool ltdb_index_unique(struct ldb_context *ldb,
665                               const char *attr)
666 {
667         const struct ldb_schema_attribute *a;
668         a = ldb_schema_attribute_by_name(ldb, attr);
669         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
670                 return true;
671         }
672         return false;
673 }
674
675 /*
676   process an AND expression (intersection)
677  */
678 static int ltdb_index_dn_and(struct ldb_module *module,
679                              const struct ldb_parse_tree *tree,
680                              const struct ldb_message *index_list,
681                              struct dn_list *list)
682 {
683         struct ldb_context *ldb;
684         unsigned int i;
685         bool found;
686
687         ldb = ldb_module_get_ctx(module);
688
689         list->dn = NULL;
690         list->count = 0;
691
692         /* in the first pass we only look for unique simple
693            equality tests, in the hope of avoiding having to look
694            at any others */
695         for (i=0; i<tree->u.list.num_elements; i++) {
696                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
697                 int ret;
698
699                 if (subtree->operation != LDB_OP_EQUALITY ||
700                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
701                         continue;
702                 }
703                 
704                 ret = ltdb_index_dn(module, subtree, index_list, list);
705                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
706                         /* 0 && X == 0 */
707                         return LDB_ERR_NO_SUCH_OBJECT;
708                 }
709                 if (ret == LDB_SUCCESS) {
710                         /* a unique index match means we can
711                          * stop. Note that we don't care if we return
712                          * a few too many objects, due to later
713                          * filtering */
714                         return LDB_SUCCESS;
715                 }
716         }       
717
718         /* now do a full intersection */
719         found = false;
720
721         for (i=0; i<tree->u.list.num_elements; i++) {
722                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
723                 struct dn_list *list2;
724                 int ret;
725
726                 list2 = talloc_zero(list, struct dn_list);
727                 if (list2 == NULL) {
728                         ldb_module_oom(module);
729                         return LDB_ERR_OPERATIONS_ERROR;
730                 }
731                         
732                 ret = ltdb_index_dn(module, subtree, index_list, list2);
733
734                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
735                         /* X && 0 == 0 */
736                         list->dn = NULL;
737                         list->count = 0;
738                         talloc_free(list2);
739                         return LDB_ERR_NO_SUCH_OBJECT;
740                 }
741                 
742                 if (ret != LDB_SUCCESS) {
743                         /* this didn't adding anything */
744                         talloc_free(list2);
745                         continue;
746                 }
747
748                 if (!found) {
749                         talloc_reparent(list2, list, list->dn);
750                         list->dn = list2->dn;
751                         list->count = list2->count;
752                         found = true;
753                 } else if (!list_intersect(ldb, list, list2)) {
754                         talloc_free(list2);
755                         return LDB_ERR_OPERATIONS_ERROR;
756                 }
757                         
758                 if (list->count == 0) {
759                         list->dn = NULL;
760                         return LDB_ERR_NO_SUCH_OBJECT;
761                 }
762                         
763                 if (list->count < 2) {
764                         /* it isn't worth loading the next part of the tree */
765                         return LDB_SUCCESS;
766                 }
767         }       
768
769         if (!found) {
770                 /* none of the attributes were indexed */
771                 return LDB_ERR_OPERATIONS_ERROR;
772         }
773
774         return LDB_SUCCESS;
775 }
776         
777 /*
778   return a list of matching objects using a one-level index
779  */
780 static int ltdb_index_dn_one(struct ldb_module *module,
781                              struct ldb_dn *parent_dn,
782                              struct dn_list *list)
783 {
784         struct ldb_context *ldb;
785         struct ldb_dn *key;
786         struct ldb_val val;
787         int ret;
788
789         ldb = ldb_module_get_ctx(module);
790
791         /* work out the index key from the parent DN */
792         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
793         val.length = strlen((char *)val.data);
794         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
795         if (!key) {
796                 ldb_oom(ldb);
797                 return LDB_ERR_OPERATIONS_ERROR;
798         }
799
800         ret = ltdb_dn_list_load(module, key, list);
801         talloc_free(key);
802         if (ret != LDB_SUCCESS) {
803                 return ret;
804         }
805
806         if (list->count == 0) {
807                 return LDB_ERR_NO_SUCH_OBJECT;
808         }
809
810         return LDB_SUCCESS;
811 }
812
813 /*
814   return a list of dn's that might match a indexed search or
815   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
816  */
817 static int ltdb_index_dn(struct ldb_module *module,
818                          const struct ldb_parse_tree *tree,
819                          const struct ldb_message *index_list,
820                          struct dn_list *list)
821 {
822         int ret = LDB_ERR_OPERATIONS_ERROR;
823
824         switch (tree->operation) {
825         case LDB_OP_AND:
826                 ret = ltdb_index_dn_and(module, tree, index_list, list);
827                 break;
828
829         case LDB_OP_OR:
830                 ret = ltdb_index_dn_or(module, tree, index_list, list);
831                 break;
832
833         case LDB_OP_NOT:
834                 ret = ltdb_index_dn_not(module, tree, index_list, list);
835                 break;
836
837         case LDB_OP_EQUALITY:
838                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
839                 break;
840
841         case LDB_OP_SUBSTRING:
842         case LDB_OP_GREATER:
843         case LDB_OP_LESS:
844         case LDB_OP_PRESENT:
845         case LDB_OP_APPROX:
846         case LDB_OP_EXTENDED:
847                 /* we can't index with fancy bitops yet */
848                 ret = LDB_ERR_OPERATIONS_ERROR;
849                 break;
850         }
851
852         return ret;
853 }
854
855 /*
856   filter a candidate dn_list from an indexed search into a set of results
857   extracting just the given attributes
858 */
859 static int ltdb_index_filter(const struct dn_list *dn_list,
860                              struct ltdb_context *ac, 
861                              uint32_t *match_count)
862 {
863         struct ldb_context *ldb;
864         struct ldb_message *msg;
865         unsigned int i;
866
867         ldb = ldb_module_get_ctx(ac->module);
868
869         for (i = 0; i < dn_list->count; i++) {
870                 struct ldb_dn *dn;
871                 int ret;
872
873                 msg = ldb_msg_new(ac);
874                 if (!msg) {
875                         return LDB_ERR_OPERATIONS_ERROR;
876                 }
877
878                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
879                 if (dn == NULL) {
880                         talloc_free(msg);
881                         return LDB_ERR_OPERATIONS_ERROR;
882                 }
883
884                 ret = ltdb_search_dn1(ac->module, dn, msg);
885                 talloc_free(dn);
886                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
887                         /* the record has disappeared? yes, this can happen */
888                         talloc_free(msg);
889                         continue;
890                 }
891
892                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
893                         /* an internal error */
894                         talloc_free(msg);
895                         return LDB_ERR_OPERATIONS_ERROR;
896                 }
897
898                 if (!ldb_match_msg(ldb, msg,
899                                    ac->tree, ac->base, ac->scope)) {
900                         talloc_free(msg);
901                         continue;
902                 }
903
904                 /* filter the attributes that the user wants */
905                 ret = ltdb_filter_attrs(msg, ac->attrs);
906
907                 if (ret == -1) {
908                         talloc_free(msg);
909                         return LDB_ERR_OPERATIONS_ERROR;
910                 }
911
912                 ret = ldb_module_send_entry(ac->req, msg, NULL);
913                 if (ret != LDB_SUCCESS) {
914                         ac->request_terminated = true;
915                         return ret;
916                 }
917
918                 (*match_count)++;
919         }
920
921         return LDB_SUCCESS;
922 }
923
924 /*
925   remove any duplicated entries in a indexed result
926  */
927 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
928 {
929         int i, new_count;
930
931         if (list->count < 2) {
932                 return;
933         }
934
935         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
936
937         new_count = 1;
938         for (i=1; i<list->count; i++) {
939                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
940                         if (new_count != i) {
941                                 list->dn[new_count] = list->dn[i];
942                         }
943                         new_count++;
944                 }
945         }
946         
947         list->count = new_count;
948 }
949
950 /*
951   search the database with a LDAP-like expression using indexes
952   returns -1 if an indexed search is not possible, in which
953   case the caller should call ltdb_search_full()
954 */
955 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
956 {
957         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
958         struct dn_list *dn_list;
959         int ret;
960
961         /* see if indexing is enabled */
962         if (!ltdb->cache->attribute_indexes && 
963             !ltdb->cache->one_level_indexes &&
964             ac->scope != LDB_SCOPE_BASE) {
965                 /* fallback to a full search */
966                 return LDB_ERR_OPERATIONS_ERROR;
967         }
968
969         dn_list = talloc_zero(ac, struct dn_list);
970         if (dn_list == NULL) {
971                 ldb_module_oom(ac->module);
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974
975         switch (ac->scope) {
976         case LDB_SCOPE_BASE:
977                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
978                 if (dn_list->dn == NULL) {
979                         ldb_module_oom(ac->module);
980                         talloc_free(dn_list);
981                         return LDB_ERR_OPERATIONS_ERROR;
982                 }
983                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
984                 if (dn_list->dn[0].data == NULL) {
985                         ldb_module_oom(ac->module);
986                         talloc_free(dn_list);
987                         return LDB_ERR_OPERATIONS_ERROR;
988                 }
989                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
990                 dn_list->count = 1;
991                 break;          
992
993         case LDB_SCOPE_ONELEVEL:
994                 if (!ltdb->cache->one_level_indexes) {
995                         talloc_free(dn_list);
996                         return LDB_ERR_OPERATIONS_ERROR;
997                 }
998                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
999                 if (ret != LDB_SUCCESS) {
1000                         talloc_free(dn_list);
1001                         return ret;
1002                 }
1003                 break;
1004
1005         case LDB_SCOPE_SUBTREE:
1006         case LDB_SCOPE_DEFAULT:
1007                 if (!ltdb->cache->attribute_indexes) {
1008                         talloc_free(dn_list);
1009                         return LDB_ERR_OPERATIONS_ERROR;
1010                 }
1011                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1012                 if (ret != LDB_SUCCESS) {
1013                         talloc_free(dn_list);
1014                         return ret;
1015                 }
1016                 ltdb_dn_list_remove_duplicates(dn_list);
1017                 break;
1018         }
1019
1020         ret = ltdb_index_filter(dn_list, ac, match_count);
1021         talloc_free(dn_list);
1022         return ret;
1023 }
1024
1025 /*
1026   add an index entry for one message element
1027 */
1028 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1029                            struct ldb_message_element *el, int v_idx)
1030 {
1031         struct ldb_context *ldb;
1032         struct ldb_dn *dn_key;
1033         int ret;
1034         const struct ldb_schema_attribute *a;
1035         struct dn_list *list;
1036
1037         ldb = ldb_module_get_ctx(module);
1038
1039         list = talloc_zero(module, struct dn_list);
1040         if (list == NULL) {
1041                 return LDB_ERR_OPERATIONS_ERROR;
1042         }
1043
1044         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1045         if (!dn_key) {
1046                 talloc_free(list);
1047                 return LDB_ERR_OPERATIONS_ERROR;
1048         }
1049         talloc_steal(list, dn_key);
1050
1051         ret = ltdb_dn_list_load(module, dn_key, list);
1052         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1053                 talloc_free(list);
1054                 return ret;
1055         }
1056
1057         if (ltdb_dn_list_find_str(list, dn) != -1) {
1058                 talloc_free(list);
1059                 return LDB_SUCCESS;
1060         }
1061
1062         if (list->count > 0 &&
1063             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1064                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1065         }
1066
1067         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
1068         if (list->dn == NULL) {
1069                 talloc_free(list);
1070                 return LDB_ERR_OPERATIONS_ERROR;
1071         }
1072         list->dn[list->count].data = discard_const_p(unsigned char, dn);
1073         list->dn[list->count].length = strlen(dn);
1074         list->count++;
1075
1076         ret = ltdb_dn_list_store(module, dn_key, list);
1077
1078         talloc_free(list);
1079
1080         return ret;
1081 }
1082
1083 /*
1084   add index entries for one elements in a message
1085  */
1086 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1087                              struct ldb_message_element *el)
1088 {
1089         unsigned int i;
1090         for (i = 0; i < el->num_values; i++) {
1091                 int ret = ltdb_index_add1(module, dn, el, i);
1092                 if (ret != LDB_SUCCESS) {
1093                         return ret;
1094                 }
1095         }
1096
1097         return LDB_SUCCESS;
1098 }
1099
1100 /*
1101   add index entries for all elements in a message
1102  */
1103 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1104                               struct ldb_message_element *elements, int num_el)
1105 {
1106         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1107         unsigned int i;
1108
1109         if (dn[0] == '@') {
1110                 return LDB_SUCCESS;
1111         }
1112
1113         if (ltdb->cache->indexlist->num_elements == 0) {
1114                 /* no indexed fields */
1115                 return LDB_SUCCESS;
1116         }
1117
1118         for (i = 0; i < num_el; i++) {
1119                 int ret;
1120                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1121                         continue;
1122                 }
1123                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1124                 if (ret != LDB_SUCCESS) {
1125                         return ret;
1126                 }
1127         }
1128
1129         return LDB_SUCCESS;
1130 }
1131
1132
1133 /*
1134   insert a one level index for a message
1135 */
1136 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1137 {
1138         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1139         struct ldb_message_element el;
1140         struct ldb_val val;
1141         struct ldb_dn *pdn;
1142         const char *dn;
1143         int ret;
1144
1145         /* We index for ONE Level only if requested */
1146         if (!ltdb->cache->one_level_indexes) {
1147                 return LDB_SUCCESS;
1148         }
1149
1150         pdn = ldb_dn_get_parent(module, msg->dn);
1151         if (pdn == NULL) {
1152                 return LDB_ERR_OPERATIONS_ERROR;
1153         }
1154
1155         dn = ldb_dn_get_linearized(msg->dn);
1156         if (dn == NULL) {
1157                 talloc_free(pdn);
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160
1161         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1162         if (val.data == NULL) {
1163                 talloc_free(pdn);
1164                 return LDB_ERR_OPERATIONS_ERROR;
1165         }
1166
1167         val.length = strlen((char *)val.data);
1168         el.name = LTDB_IDXONE;
1169         el.values = &val;
1170         el.num_values = 1;
1171
1172         if (add) {
1173                 ret = ltdb_index_add1(module, dn, &el, 0);
1174         } else { /* delete */
1175                 ret = ltdb_index_del_value(module, dn, &el, 0);
1176         }
1177
1178         talloc_free(pdn);
1179
1180         return ret;
1181 }
1182
1183 /*
1184   add the index entries for a new element in a record
1185   The caller guarantees that these element values are not yet indexed
1186 */
1187 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1188                            struct ldb_message_element *el)
1189 {
1190         if (ldb_dn_is_special(dn)) {
1191                 return LDB_SUCCESS;
1192         }
1193         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1194 }
1195
1196 /*
1197   add the index entries for a new record
1198 */
1199 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1200 {
1201         const char *dn;
1202         int ret;
1203
1204         if (ldb_dn_is_special(msg->dn)) {
1205                 return LDB_SUCCESS;
1206         }
1207
1208         dn = ldb_dn_get_linearized(msg->dn);
1209         if (dn == NULL) {
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1214         if (ret != LDB_SUCCESS) {
1215                 return ret;
1216         }
1217
1218         return ltdb_index_onelevel(module, msg, 1);
1219 }
1220
1221
1222 /*
1223   delete an index entry for one message element
1224 */
1225 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1226                          struct ldb_message_element *el, int v_idx)
1227 {
1228         struct ldb_context *ldb;
1229         struct ldb_dn *dn_key;
1230         int ret, i;
1231         struct dn_list *list;
1232
1233         ldb = ldb_module_get_ctx(module);
1234
1235         if (dn[0] == '@') {
1236                 return LDB_SUCCESS;
1237         }
1238
1239         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1240         if (!dn_key) {
1241                 return LDB_ERR_OPERATIONS_ERROR;
1242         }
1243
1244         list = talloc_zero(dn_key, struct dn_list);
1245         if (list == NULL) {
1246                 talloc_free(dn_key);
1247                 return LDB_ERR_OPERATIONS_ERROR;
1248         }
1249
1250         ret = ltdb_dn_list_load(module, dn_key, list);
1251         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1252                 /* it wasn't indexed. Did we have an earlier error? If we did then
1253                    its gone now */
1254                 talloc_free(dn_key);
1255                 return LDB_SUCCESS;
1256         }
1257
1258         if (ret != LDB_SUCCESS) {
1259                 talloc_free(dn_key);
1260                 return ret;
1261         }
1262
1263         i = ltdb_dn_list_find_str(list, dn);
1264         if (i == -1) {
1265                 /* nothing to delete */
1266                 talloc_free(dn_key);
1267                 return LDB_SUCCESS;             
1268         }
1269
1270         if (i != list->count-1) {
1271                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1272         }
1273         list->count--;
1274         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1275
1276         ret = ltdb_dn_list_store(module, dn_key, list);
1277
1278         talloc_free(dn_key);
1279
1280         return ret;
1281 }
1282
1283 /*
1284   delete the index entries for a element
1285   return -1 on failure
1286 */
1287 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1288 {
1289         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1290         int ret;
1291         unsigned int i;
1292
1293         if (!ltdb->cache->attribute_indexes) {
1294                 /* no indexed fields */
1295                 return LDB_SUCCESS;
1296         }
1297
1298         if (dn[0] == '@') {
1299                 return LDB_SUCCESS;
1300         }
1301
1302         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1303                 return LDB_SUCCESS;
1304         }
1305         for (i = 0; i < el->num_values; i++) {
1306                 ret = ltdb_index_del_value(module, dn, el, i);
1307                 if (ret != LDB_SUCCESS) {
1308                         return ret;
1309                 }
1310         }
1311
1312         return LDB_SUCCESS;
1313 }
1314
1315 /*
1316   delete the index entries for a record
1317   return -1 on failure
1318 */
1319 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1320 {
1321         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1322         int ret;
1323         const char *dn;
1324         unsigned int i;
1325
1326         if (ldb_dn_is_special(msg->dn)) {
1327                 return LDB_SUCCESS;
1328         }
1329
1330         ret = ltdb_index_onelevel(module, msg, 0);
1331         if (ret != LDB_SUCCESS) {
1332                 return ret;
1333         }
1334
1335         if (!ltdb->cache->attribute_indexes) {
1336                 /* no indexed fields */
1337                 return LDB_SUCCESS;
1338         }
1339
1340         dn = ldb_dn_get_linearized(msg->dn);
1341         if (dn == NULL) {
1342                 return LDB_ERR_OPERATIONS_ERROR;
1343         }
1344
1345         for (i = 0; i < msg->num_elements; i++) {
1346                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1347                 if (ret != LDB_SUCCESS) {
1348                         return ret;
1349                 }
1350         }
1351
1352         return LDB_SUCCESS;
1353 }
1354
1355
1356 /*
1357   traversal function that deletes all @INDEX records
1358 */
1359 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1360 {
1361         const char *dn = "DN=" LTDB_INDEX ":";
1362         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1363                 return tdb_delete(tdb, key);
1364         }
1365         return 0;
1366 }
1367
1368 /*
1369   traversal function that adds @INDEX records during a re index
1370 */
1371 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1372 {
1373         struct ldb_context *ldb;
1374         struct ldb_module *module = (struct ldb_module *)state;
1375         struct ldb_message *msg;
1376         const char *dn = NULL;
1377         int ret;
1378         TDB_DATA key2;
1379
1380         ldb = ldb_module_get_ctx(module);
1381
1382         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1383             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1384                 return 0;
1385         }
1386
1387         msg = talloc(module, struct ldb_message);
1388         if (msg == NULL) {
1389                 return -1;
1390         }
1391
1392         ret = ltdb_unpack_data(module, &data, msg);
1393         if (ret != 0) {
1394                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1395                           ldb_dn_get_linearized(msg->dn));
1396                 talloc_free(msg);
1397                 return -1;
1398         }
1399
1400         /* check if the DN key has changed, perhaps due to the
1401            case insensitivity of an element changing */
1402         key2 = ltdb_key(module, msg->dn);
1403         if (key2.dptr == NULL) {
1404                 /* probably a corrupt record ... darn */
1405                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1406                                                         ldb_dn_get_linearized(msg->dn));
1407                 talloc_free(msg);
1408                 return 0;
1409         }
1410         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1411                 tdb_delete(tdb, key);
1412                 tdb_store(tdb, key2, data, 0);
1413         }
1414         talloc_free(key2.dptr);
1415
1416         if (msg->dn == NULL) {
1417                 dn = (char *)key.dptr + 3;
1418         } else {
1419                 dn = ldb_dn_get_linearized(msg->dn);
1420         }
1421
1422         ret = ltdb_index_onelevel(module, msg, 1);
1423         if (ret != LDB_SUCCESS) {
1424                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1425                         "Adding special ONE LEVEL index failed (%s)!",
1426                         ldb_dn_get_linearized(msg->dn));
1427                 talloc_free(msg);
1428                 return -1;
1429         }
1430
1431         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1432
1433         talloc_free(msg);
1434
1435         if (ret != LDB_SUCCESS) return -1;
1436
1437         return 0;
1438 }
1439
1440 /*
1441   force a complete reindex of the database
1442 */
1443 int ltdb_reindex(struct ldb_module *module)
1444 {
1445         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1446         int ret;
1447
1448         if (ltdb_cache_reload(module) != 0) {
1449                 return LDB_ERR_OPERATIONS_ERROR;
1450         }
1451
1452         /* first traverse the database deleting any @INDEX records */
1453         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1454         if (ret == -1) {
1455                 return LDB_ERR_OPERATIONS_ERROR;
1456         }
1457
1458         /* if we don't have indexes we have nothing todo */
1459         if (ltdb->cache->indexlist->num_elements == 0) {
1460                 return LDB_SUCCESS;
1461         }
1462
1463         /* now traverse adding any indexes for normal LDB records */
1464         ret = tdb_traverse(ltdb->tdb, re_index, module);
1465         if (ret == -1) {
1466                 return LDB_ERR_OPERATIONS_ERROR;
1467         }
1468
1469         if (ltdb->idxptr) {
1470                 ltdb->idxptr->repack = true;
1471         }
1472
1473         return LDB_SUCCESS;
1474 }