s4-ldb: delete empty index records
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 ldb_module_oom(module);
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         msg->dn = dn;
223         if (list->count > 0) {
224                 struct ldb_message_element *el;
225
226                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
227                 if (ret != LDB_SUCCESS) {
228                         ldb_module_oom(module);
229                         talloc_free(msg);
230                         return LDB_ERR_OPERATIONS_ERROR;
231                 }
232                 el->values = list->dn;
233                 el->num_values = list->count;
234         }
235
236         ret = ltdb_store(module, msg, TDB_REPLACE);
237         talloc_free(msg);
238         return ret;
239 }
240
241 /*
242   save a dn_list into the database, in either @IDX or internal format
243  */
244 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
245                               struct dn_list *list)
246 {
247         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
248         TDB_DATA rec, key;
249         int ret;
250         struct dn_list *list2;
251
252         if (ltdb->idxptr == NULL) {
253                 return ltdb_dn_list_store_full(module, dn, list);
254         }
255
256         if (ltdb->idxptr->itdb == NULL) {
257                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
258                 if (ltdb->idxptr->itdb == NULL) {
259                         return LDB_ERR_OPERATIONS_ERROR;
260                 }
261         }
262
263         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
264         key.dsize = strlen((char *)key.dptr);
265
266         rec = tdb_fetch(ltdb->idxptr->itdb, key);
267         if (rec.dptr != NULL) {
268                 list2 = ltdb_index_idxptr(module, rec, false);
269                 if (list2 == NULL) {
270                         free(rec.dptr);
271                         return LDB_ERR_OPERATIONS_ERROR;
272                 }
273                 free(rec.dptr);
274                 list2->dn = talloc_steal(list2, list->dn);
275                 list2->count = list->count;
276                 return LDB_SUCCESS;
277         }
278
279         list2 = talloc(ltdb->idxptr, struct dn_list);
280         if (list2 == NULL) {
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283         list2->dn = talloc_steal(list2, list->dn);
284         list2->count = list->count;
285
286         rec.dptr = (uint8_t *)&list2;
287         rec.dsize = sizeof(void *);
288
289         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
290
291         return ret;     
292 }
293
294 /*
295   traverse function for storing the in-memory index entries on disk
296  */
297 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
298 {
299         struct ldb_module *module = state;
300         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
301         struct ldb_dn *dn;
302         struct ldb_context *ldb = ldb_module_get_ctx(module);
303         struct ldb_val v;
304         struct dn_list *list;
305
306         list = ltdb_index_idxptr(module, data, true);
307         if (list == NULL) {
308                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
309                 return -1;
310         }
311
312         v.data = key.dptr;
313         v.length = key.dsize;
314
315         dn = ldb_dn_from_ldb_val(module, ldb, &v);
316         if (dn == NULL) {
317                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
318                 return -1;
319         }
320
321         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
322         talloc_free(dn);
323         return ltdb->idxptr->error;
324 }
325
326 /* cleanup the idxptr mode when transaction commits */
327 int ltdb_index_transaction_commit(struct ldb_module *module)
328 {
329         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
330         int ret;
331
332         if (ltdb->idxptr->itdb) {
333                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
334                 tdb_close(ltdb->idxptr->itdb);
335         }
336
337         ret = ltdb->idxptr->error;
338
339         if (ret != LDB_SUCCESS) {
340                 struct ldb_context *ldb = ldb_module_get_ctx(module);
341                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
342         }
343
344         talloc_free(ltdb->idxptr);
345         ltdb->idxptr = NULL;
346         return ret;
347 }
348
349 /* cleanup the idxptr mode when transaction cancels */
350 int ltdb_index_transaction_cancel(struct ldb_module *module)
351 {
352         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
353         if (ltdb->idxptr && ltdb->idxptr->itdb) {
354                 tdb_close(ltdb->idxptr->itdb);
355         }
356         talloc_free(ltdb->idxptr);
357         ltdb->idxptr = NULL;
358         return LDB_SUCCESS;
359 }
360
361
362 /*
363   return the dn key to be used for an index
364   the caller is responsible for freeing
365 */
366 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
367                                      const char *attr, const struct ldb_val *value,
368                                      const struct ldb_schema_attribute **ap)
369 {
370         struct ldb_dn *ret;
371         struct ldb_val v;
372         const struct ldb_schema_attribute *a;
373         char *attr_folded;
374         int r;
375
376         attr_folded = ldb_attr_casefold(ldb, attr);
377         if (!attr_folded) {
378                 return NULL;
379         }
380
381         a = ldb_schema_attribute_by_name(ldb, attr);
382         if (ap) {
383                 *ap = a;
384         }
385         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
386         if (r != LDB_SUCCESS) {
387                 const char *errstr = ldb_errstring(ldb);
388                 /* canonicalisation can be refused. For example, 
389                    a attribute that takes wildcards will refuse to canonicalise
390                    if the value contains a wildcard */
391                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
392                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
393                 talloc_free(attr_folded);
394                 return NULL;
395         }
396         if (ldb_should_b64_encode(ldb, &v)) {
397                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
398                 if (!vstr) return NULL;
399                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
400                 talloc_free(vstr);
401         } else {
402                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
403         }
404
405         if (v.data != value->data) {
406                 talloc_free(v.data);
407         }
408         talloc_free(attr_folded);
409
410         return ret;
411 }
412
413 /*
414   see if a attribute value is in the list of indexed attributes
415 */
416 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
417 {
418         unsigned int i;
419         struct ldb_message_element *el;
420
421         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
422         if (el == NULL) {
423                 return false;
424         }
425         for (i=0; i<el->num_values; i++) {
426                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
427                         return true;
428                 }
429         }
430         return false;
431 }
432
433 /*
434   in the following logic functions, the return value is treated as
435   follows:
436
437      LDB_SUCCESS: we found some matching index values
438
439      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
440
441      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
442                                we'll need a full search
443  */
444
445 /*
446   return a list of dn's that might match a simple indexed search (an
447   equality search only)
448  */
449 static int ltdb_index_dn_simple(struct ldb_module *module,
450                                 const struct ldb_parse_tree *tree,
451                                 const struct ldb_message *index_list,
452                                 struct dn_list *list)
453 {
454         struct ldb_context *ldb;
455         struct ldb_dn *dn;
456         int ret;
457
458         ldb = ldb_module_get_ctx(module);
459
460         list->count = 0;
461         list->dn = NULL;
462
463         /* if the attribute isn't in the list of indexed attributes then
464            this node needs a full search */
465         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
466                 return LDB_ERR_OPERATIONS_ERROR;
467         }
468
469         /* the attribute is indexed. Pull the list of DNs that match the 
470            search criterion */
471         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
472         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
473
474         ret = ltdb_dn_list_load(module, dn, list);
475         talloc_free(dn);
476         return ret;
477 }
478
479
480 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
481
482 /*
483   return a list of dn's that might match a leaf indexed search
484  */
485 static int ltdb_index_dn_leaf(struct ldb_module *module,
486                               const struct ldb_parse_tree *tree,
487                               const struct ldb_message *index_list,
488                               struct dn_list *list)
489 {
490         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
491                 list->dn = talloc_array(list, struct ldb_val, 1);
492                 if (list->dn == NULL) {
493                         ldb_module_oom(module);
494                         return LDB_ERR_OPERATIONS_ERROR;
495                 }
496                 list->dn[0] = tree->u.equality.value;
497                 list->count = 1;
498                 return LDB_SUCCESS;
499         }
500         return ltdb_index_dn_simple(module, tree, index_list, list);
501 }
502
503
504 /*
505   list intersection
506   list = list & list2
507 */
508 static bool list_intersect(struct ldb_context *ldb,
509                            struct dn_list *list, const struct dn_list *list2)
510 {
511         struct dn_list *list3;
512         unsigned int i;
513
514         if (list->count == 0) {
515                 /* 0 & X == 0 */
516                 return true;
517         }
518         if (list2->count == 0) {
519                 /* X & 0 == 0 */
520                 list->count = 0;
521                 list->dn = NULL;
522                 return true;
523         }
524
525         /* the indexing code is allowed to return a longer list than
526            what really matches, as all results are filtered by the
527            full expression at the end - this shortcut avoids a lot of
528            work in some cases */
529         if (list->count < 2 && list2->count > 10) {
530                 return true;
531         }
532         if (list2->count < 2 && list->count > 10) {
533                 list->count = list2->count;
534                 list->dn = list2->dn;
535                 /* note that list2 may not be the parent of list2->dn,
536                    as list2->dn may be owned by ltdb->idxptr. In that
537                    case we expect this reparent call to fail, which is
538                    OK */
539                 talloc_reparent(list2, list, list2->dn);
540                 return true;
541         }
542
543         list3 = talloc_zero(list, struct dn_list);
544         if (list3 == NULL) {
545                 return false;
546         }
547
548         list3->dn = talloc_array(list3, struct ldb_val, list->count);
549         if (!list3->dn) {
550                 talloc_free(list3);
551                 return false;
552         }
553         list3->count = 0;
554
555         for (i=0;i<list->count;i++) {
556                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
557                         list3->dn[list3->count] = list->dn[i];
558                         list3->count++;
559                 }
560         }
561
562         list->dn = talloc_steal(list, list3->dn);
563         list->count = list3->count;
564         talloc_free(list3);
565
566         return true;
567 }
568
569
570 /*
571   list union
572   list = list | list2
573 */
574 static bool list_union(struct ldb_context *ldb,
575                        struct dn_list *list, const struct dn_list *list2)
576 {
577         struct ldb_val *dn3;
578
579         if (list2->count == 0) {
580                 /* X | 0 == X */
581                 return true;
582         }
583
584         if (list->count == 0) {
585                 /* 0 | X == X */
586                 list->count = list2->count;
587                 list->dn = list2->dn;
588                 /* note that list2 may not be the parent of list2->dn,
589                    as list2->dn may be owned by ltdb->idxptr. In that
590                    case we expect this reparent call to fail, which is
591                    OK */
592                 talloc_reparent(list2, list, list2->dn);
593                 return true;
594         }
595
596         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
597         if (!dn3) {
598                 ldb_oom(ldb);
599                 return false;
600         }
601
602         /* we allow for duplicates here, and get rid of them later */
603         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
604         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
605
606         list->dn = dn3;
607         list->count += list2->count;
608
609         return true;
610 }
611
612 static int ltdb_index_dn(struct ldb_module *module,
613                          const struct ldb_parse_tree *tree,
614                          const struct ldb_message *index_list,
615                          struct dn_list *list);
616
617
618 /*
619   process an OR list (a union)
620  */
621 static int ltdb_index_dn_or(struct ldb_module *module,
622                             const struct ldb_parse_tree *tree,
623                             const struct ldb_message *index_list,
624                             struct dn_list *list)
625 {
626         struct ldb_context *ldb;
627         unsigned int i;
628
629         ldb = ldb_module_get_ctx(module);
630
631         list->dn = NULL;
632         list->count = 0;
633
634         for (i=0; i<tree->u.list.num_elements; i++) {
635                 struct dn_list *list2;
636                 int ret;
637
638                 list2 = talloc_zero(list, struct dn_list);
639                 if (list2 == NULL) {
640                         return LDB_ERR_OPERATIONS_ERROR;
641                 }
642
643                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
644
645                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
646                         /* X || 0 == X */
647                         talloc_free(list2);
648                         continue;
649                 }
650
651                 if (ret != LDB_SUCCESS) {
652                         /* X || * == * */
653                         talloc_free(list2);
654                         return ret;
655                 }
656
657                 if (!list_union(ldb, list, list2)) {
658                         talloc_free(list2);
659                         return LDB_ERR_OPERATIONS_ERROR;
660                 }
661         }
662
663         if (list->count == 0) {
664                 return LDB_ERR_NO_SUCH_OBJECT;
665         }
666
667         return LDB_SUCCESS;
668 }
669
670
671 /*
672   NOT an index results
673  */
674 static int ltdb_index_dn_not(struct ldb_module *module,
675                              const struct ldb_parse_tree *tree,
676                              const struct ldb_message *index_list,
677                              struct dn_list *list)
678 {
679         /* the only way to do an indexed not would be if we could
680            negate the not via another not or if we knew the total
681            number of database elements so we could know that the
682            existing expression covered the whole database.
683
684            instead, we just give up, and rely on a full index scan
685            (unless an outer & manages to reduce the list)
686         */
687         return LDB_ERR_OPERATIONS_ERROR;
688 }
689
690
691 static bool ltdb_index_unique(struct ldb_context *ldb,
692                               const char *attr)
693 {
694         const struct ldb_schema_attribute *a;
695         a = ldb_schema_attribute_by_name(ldb, attr);
696         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
697                 return true;
698         }
699         return false;
700 }
701
702 /*
703   process an AND expression (intersection)
704  */
705 static int ltdb_index_dn_and(struct ldb_module *module,
706                              const struct ldb_parse_tree *tree,
707                              const struct ldb_message *index_list,
708                              struct dn_list *list)
709 {
710         struct ldb_context *ldb;
711         unsigned int i;
712         bool found;
713
714         ldb = ldb_module_get_ctx(module);
715
716         list->dn = NULL;
717         list->count = 0;
718
719         /* in the first pass we only look for unique simple
720            equality tests, in the hope of avoiding having to look
721            at any others */
722         for (i=0; i<tree->u.list.num_elements; i++) {
723                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
724                 int ret;
725
726                 if (subtree->operation != LDB_OP_EQUALITY ||
727                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
728                         continue;
729                 }
730                 
731                 ret = ltdb_index_dn(module, subtree, index_list, list);
732                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
733                         /* 0 && X == 0 */
734                         return LDB_ERR_NO_SUCH_OBJECT;
735                 }
736                 if (ret == LDB_SUCCESS) {
737                         /* a unique index match means we can
738                          * stop. Note that we don't care if we return
739                          * a few too many objects, due to later
740                          * filtering */
741                         return LDB_SUCCESS;
742                 }
743         }       
744
745         /* now do a full intersection */
746         found = false;
747
748         for (i=0; i<tree->u.list.num_elements; i++) {
749                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
750                 struct dn_list *list2;
751                 int ret;
752
753                 list2 = talloc_zero(list, struct dn_list);
754                 if (list2 == NULL) {
755                         ldb_module_oom(module);
756                         return LDB_ERR_OPERATIONS_ERROR;
757                 }
758                         
759                 ret = ltdb_index_dn(module, subtree, index_list, list2);
760
761                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
762                         /* X && 0 == 0 */
763                         list->dn = NULL;
764                         list->count = 0;
765                         talloc_free(list2);
766                         return LDB_ERR_NO_SUCH_OBJECT;
767                 }
768                 
769                 if (ret != LDB_SUCCESS) {
770                         /* this didn't adding anything */
771                         talloc_free(list2);
772                         continue;
773                 }
774
775                 if (!found) {
776                         talloc_reparent(list2, list, list->dn);
777                         list->dn = list2->dn;
778                         list->count = list2->count;
779                         found = true;
780                 } else if (!list_intersect(ldb, list, list2)) {
781                         talloc_free(list2);
782                         return LDB_ERR_OPERATIONS_ERROR;
783                 }
784                         
785                 if (list->count == 0) {
786                         list->dn = NULL;
787                         return LDB_ERR_NO_SUCH_OBJECT;
788                 }
789                         
790                 if (list->count < 2) {
791                         /* it isn't worth loading the next part of the tree */
792                         return LDB_SUCCESS;
793                 }
794         }       
795
796         if (!found) {
797                 /* none of the attributes were indexed */
798                 return LDB_ERR_OPERATIONS_ERROR;
799         }
800
801         return LDB_SUCCESS;
802 }
803         
804 /*
805   return a list of matching objects using a one-level index
806  */
807 static int ltdb_index_dn_one(struct ldb_module *module,
808                              struct ldb_dn *parent_dn,
809                              struct dn_list *list)
810 {
811         struct ldb_context *ldb;
812         struct ldb_dn *key;
813         struct ldb_val val;
814         int ret;
815
816         ldb = ldb_module_get_ctx(module);
817
818         /* work out the index key from the parent DN */
819         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
820         val.length = strlen((char *)val.data);
821         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
822         if (!key) {
823                 ldb_oom(ldb);
824                 return LDB_ERR_OPERATIONS_ERROR;
825         }
826
827         ret = ltdb_dn_list_load(module, key, list);
828         talloc_free(key);
829         if (ret != LDB_SUCCESS) {
830                 return ret;
831         }
832
833         if (list->count == 0) {
834                 return LDB_ERR_NO_SUCH_OBJECT;
835         }
836
837         return LDB_SUCCESS;
838 }
839
840 /*
841   return a list of dn's that might match a indexed search or
842   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
843  */
844 static int ltdb_index_dn(struct ldb_module *module,
845                          const struct ldb_parse_tree *tree,
846                          const struct ldb_message *index_list,
847                          struct dn_list *list)
848 {
849         int ret = LDB_ERR_OPERATIONS_ERROR;
850
851         switch (tree->operation) {
852         case LDB_OP_AND:
853                 ret = ltdb_index_dn_and(module, tree, index_list, list);
854                 break;
855
856         case LDB_OP_OR:
857                 ret = ltdb_index_dn_or(module, tree, index_list, list);
858                 break;
859
860         case LDB_OP_NOT:
861                 ret = ltdb_index_dn_not(module, tree, index_list, list);
862                 break;
863
864         case LDB_OP_EQUALITY:
865                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
866                 break;
867
868         case LDB_OP_SUBSTRING:
869         case LDB_OP_GREATER:
870         case LDB_OP_LESS:
871         case LDB_OP_PRESENT:
872         case LDB_OP_APPROX:
873         case LDB_OP_EXTENDED:
874                 /* we can't index with fancy bitops yet */
875                 ret = LDB_ERR_OPERATIONS_ERROR;
876                 break;
877         }
878
879         return ret;
880 }
881
882 /*
883   filter a candidate dn_list from an indexed search into a set of results
884   extracting just the given attributes
885 */
886 static int ltdb_index_filter(const struct dn_list *dn_list,
887                              struct ltdb_context *ac, 
888                              uint32_t *match_count)
889 {
890         struct ldb_context *ldb;
891         struct ldb_message *msg;
892         unsigned int i;
893
894         ldb = ldb_module_get_ctx(ac->module);
895
896         for (i = 0; i < dn_list->count; i++) {
897                 struct ldb_dn *dn;
898                 int ret;
899
900                 msg = ldb_msg_new(ac);
901                 if (!msg) {
902                         return LDB_ERR_OPERATIONS_ERROR;
903                 }
904
905                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
906                 if (dn == NULL) {
907                         talloc_free(msg);
908                         return LDB_ERR_OPERATIONS_ERROR;
909                 }
910
911                 ret = ltdb_search_dn1(ac->module, dn, msg);
912                 talloc_free(dn);
913                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
914                         /* the record has disappeared? yes, this can happen */
915                         talloc_free(msg);
916                         continue;
917                 }
918
919                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
920                         /* an internal error */
921                         talloc_free(msg);
922                         return LDB_ERR_OPERATIONS_ERROR;
923                 }
924
925                 if (!ldb_match_msg(ldb, msg,
926                                    ac->tree, ac->base, ac->scope)) {
927                         talloc_free(msg);
928                         continue;
929                 }
930
931                 /* filter the attributes that the user wants */
932                 ret = ltdb_filter_attrs(msg, ac->attrs);
933
934                 if (ret == -1) {
935                         talloc_free(msg);
936                         return LDB_ERR_OPERATIONS_ERROR;
937                 }
938
939                 ret = ldb_module_send_entry(ac->req, msg, NULL);
940                 if (ret != LDB_SUCCESS) {
941                         ac->request_terminated = true;
942                         return ret;
943                 }
944
945                 (*match_count)++;
946         }
947
948         return LDB_SUCCESS;
949 }
950
951 /*
952   remove any duplicated entries in a indexed result
953  */
954 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
955 {
956         int i, new_count;
957
958         if (list->count < 2) {
959                 return;
960         }
961
962         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
963
964         new_count = 1;
965         for (i=1; i<list->count; i++) {
966                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
967                         if (new_count != i) {
968                                 list->dn[new_count] = list->dn[i];
969                         }
970                         new_count++;
971                 }
972         }
973         
974         list->count = new_count;
975 }
976
977 /*
978   search the database with a LDAP-like expression using indexes
979   returns -1 if an indexed search is not possible, in which
980   case the caller should call ltdb_search_full()
981 */
982 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
983 {
984         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
985         struct dn_list *dn_list;
986         int ret;
987
988         /* see if indexing is enabled */
989         if (!ltdb->cache->attribute_indexes && 
990             !ltdb->cache->one_level_indexes &&
991             ac->scope != LDB_SCOPE_BASE) {
992                 /* fallback to a full search */
993                 return LDB_ERR_OPERATIONS_ERROR;
994         }
995
996         dn_list = talloc_zero(ac, struct dn_list);
997         if (dn_list == NULL) {
998                 ldb_module_oom(ac->module);
999                 return LDB_ERR_OPERATIONS_ERROR;
1000         }
1001
1002         switch (ac->scope) {
1003         case LDB_SCOPE_BASE:
1004                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1005                 if (dn_list->dn == NULL) {
1006                         ldb_module_oom(ac->module);
1007                         talloc_free(dn_list);
1008                         return LDB_ERR_OPERATIONS_ERROR;
1009                 }
1010                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1011                 if (dn_list->dn[0].data == NULL) {
1012                         ldb_module_oom(ac->module);
1013                         talloc_free(dn_list);
1014                         return LDB_ERR_OPERATIONS_ERROR;
1015                 }
1016                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1017                 dn_list->count = 1;
1018                 break;          
1019
1020         case LDB_SCOPE_ONELEVEL:
1021                 if (!ltdb->cache->one_level_indexes) {
1022                         talloc_free(dn_list);
1023                         return LDB_ERR_OPERATIONS_ERROR;
1024                 }
1025                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1026                 if (ret != LDB_SUCCESS) {
1027                         talloc_free(dn_list);
1028                         return ret;
1029                 }
1030                 break;
1031
1032         case LDB_SCOPE_SUBTREE:
1033         case LDB_SCOPE_DEFAULT:
1034                 if (!ltdb->cache->attribute_indexes) {
1035                         talloc_free(dn_list);
1036                         return LDB_ERR_OPERATIONS_ERROR;
1037                 }
1038                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1039                 if (ret != LDB_SUCCESS) {
1040                         talloc_free(dn_list);
1041                         return ret;
1042                 }
1043                 ltdb_dn_list_remove_duplicates(dn_list);
1044                 break;
1045         }
1046
1047         ret = ltdb_index_filter(dn_list, ac, match_count);
1048         talloc_free(dn_list);
1049         return ret;
1050 }
1051
1052 /*
1053   add an index entry for one message element
1054 */
1055 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1056                            struct ldb_message_element *el, int v_idx)
1057 {
1058         struct ldb_context *ldb;
1059         struct ldb_dn *dn_key;
1060         int ret;
1061         const struct ldb_schema_attribute *a;
1062         struct dn_list *list;
1063
1064         ldb = ldb_module_get_ctx(module);
1065
1066         list = talloc_zero(module, struct dn_list);
1067         if (list == NULL) {
1068                 return LDB_ERR_OPERATIONS_ERROR;
1069         }
1070
1071         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1072         if (!dn_key) {
1073                 talloc_free(list);
1074                 return LDB_ERR_OPERATIONS_ERROR;
1075         }
1076         talloc_steal(list, dn_key);
1077
1078         ret = ltdb_dn_list_load(module, dn_key, list);
1079         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1080                 talloc_free(list);
1081                 return ret;
1082         }
1083
1084         if (ltdb_dn_list_find_str(list, dn) != -1) {
1085                 talloc_free(list);
1086                 return LDB_SUCCESS;
1087         }
1088
1089         if (list->count > 0 &&
1090             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1091                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1092         }
1093
1094         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
1095         if (list->dn == NULL) {
1096                 talloc_free(list);
1097                 return LDB_ERR_OPERATIONS_ERROR;
1098         }
1099         list->dn[list->count].data = discard_const_p(unsigned char, dn);
1100         list->dn[list->count].length = strlen(dn);
1101         list->count++;
1102
1103         ret = ltdb_dn_list_store(module, dn_key, list);
1104
1105         talloc_free(list);
1106
1107         return ret;
1108 }
1109
1110 /*
1111   add index entries for one elements in a message
1112  */
1113 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1114                              struct ldb_message_element *el)
1115 {
1116         unsigned int i;
1117         for (i = 0; i < el->num_values; i++) {
1118                 int ret = ltdb_index_add1(module, dn, el, i);
1119                 if (ret != LDB_SUCCESS) {
1120                         return ret;
1121                 }
1122         }
1123
1124         return LDB_SUCCESS;
1125 }
1126
1127 /*
1128   add index entries for all elements in a message
1129  */
1130 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1131                               struct ldb_message_element *elements, int num_el)
1132 {
1133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1134         unsigned int i;
1135
1136         if (dn[0] == '@') {
1137                 return LDB_SUCCESS;
1138         }
1139
1140         if (ltdb->cache->indexlist->num_elements == 0) {
1141                 /* no indexed fields */
1142                 return LDB_SUCCESS;
1143         }
1144
1145         for (i = 0; i < num_el; i++) {
1146                 int ret;
1147                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1148                         continue;
1149                 }
1150                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1151                 if (ret != LDB_SUCCESS) {
1152                         return ret;
1153                 }
1154         }
1155
1156         return LDB_SUCCESS;
1157 }
1158
1159
1160 /*
1161   insert a one level index for a message
1162 */
1163 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1164 {
1165         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1166         struct ldb_message_element el;
1167         struct ldb_val val;
1168         struct ldb_dn *pdn;
1169         const char *dn;
1170         int ret;
1171
1172         /* We index for ONE Level only if requested */
1173         if (!ltdb->cache->one_level_indexes) {
1174                 return LDB_SUCCESS;
1175         }
1176
1177         pdn = ldb_dn_get_parent(module, msg->dn);
1178         if (pdn == NULL) {
1179                 return LDB_ERR_OPERATIONS_ERROR;
1180         }
1181
1182         dn = ldb_dn_get_linearized(msg->dn);
1183         if (dn == NULL) {
1184                 talloc_free(pdn);
1185                 return LDB_ERR_OPERATIONS_ERROR;
1186         }
1187
1188         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1189         if (val.data == NULL) {
1190                 talloc_free(pdn);
1191                 return LDB_ERR_OPERATIONS_ERROR;
1192         }
1193
1194         val.length = strlen((char *)val.data);
1195         el.name = LTDB_IDXONE;
1196         el.values = &val;
1197         el.num_values = 1;
1198
1199         if (add) {
1200                 ret = ltdb_index_add1(module, dn, &el, 0);
1201         } else { /* delete */
1202                 ret = ltdb_index_del_value(module, dn, &el, 0);
1203         }
1204
1205         talloc_free(pdn);
1206
1207         return ret;
1208 }
1209
1210 /*
1211   add the index entries for a new element in a record
1212   The caller guarantees that these element values are not yet indexed
1213 */
1214 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1215                            struct ldb_message_element *el)
1216 {
1217         if (ldb_dn_is_special(dn)) {
1218                 return LDB_SUCCESS;
1219         }
1220         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1221 }
1222
1223 /*
1224   add the index entries for a new record
1225 */
1226 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1227 {
1228         const char *dn;
1229         int ret;
1230
1231         if (ldb_dn_is_special(msg->dn)) {
1232                 return LDB_SUCCESS;
1233         }
1234
1235         dn = ldb_dn_get_linearized(msg->dn);
1236         if (dn == NULL) {
1237                 return LDB_ERR_OPERATIONS_ERROR;
1238         }
1239
1240         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1241         if (ret != LDB_SUCCESS) {
1242                 return ret;
1243         }
1244
1245         return ltdb_index_onelevel(module, msg, 1);
1246 }
1247
1248
1249 /*
1250   delete an index entry for one message element
1251 */
1252 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1253                          struct ldb_message_element *el, int v_idx)
1254 {
1255         struct ldb_context *ldb;
1256         struct ldb_dn *dn_key;
1257         int ret, i;
1258         struct dn_list *list;
1259
1260         ldb = ldb_module_get_ctx(module);
1261
1262         if (dn[0] == '@') {
1263                 return LDB_SUCCESS;
1264         }
1265
1266         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1267         if (!dn_key) {
1268                 return LDB_ERR_OPERATIONS_ERROR;
1269         }
1270
1271         list = talloc_zero(dn_key, struct dn_list);
1272         if (list == NULL) {
1273                 talloc_free(dn_key);
1274                 return LDB_ERR_OPERATIONS_ERROR;
1275         }
1276
1277         ret = ltdb_dn_list_load(module, dn_key, list);
1278         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1279                 /* it wasn't indexed. Did we have an earlier error? If we did then
1280                    its gone now */
1281                 talloc_free(dn_key);
1282                 return LDB_SUCCESS;
1283         }
1284
1285         if (ret != LDB_SUCCESS) {
1286                 talloc_free(dn_key);
1287                 return ret;
1288         }
1289
1290         i = ltdb_dn_list_find_str(list, dn);
1291         if (i == -1) {
1292                 /* nothing to delete */
1293                 talloc_free(dn_key);
1294                 return LDB_SUCCESS;             
1295         }
1296
1297         if (i != list->count-1) {
1298                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1299         }
1300         list->count--;
1301         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1302
1303         ret = ltdb_dn_list_store(module, dn_key, list);
1304
1305         talloc_free(dn_key);
1306
1307         return ret;
1308 }
1309
1310 /*
1311   delete the index entries for a element
1312   return -1 on failure
1313 */
1314 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1315 {
1316         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1317         int ret;
1318         unsigned int i;
1319
1320         if (!ltdb->cache->attribute_indexes) {
1321                 /* no indexed fields */
1322                 return LDB_SUCCESS;
1323         }
1324
1325         if (dn[0] == '@') {
1326                 return LDB_SUCCESS;
1327         }
1328
1329         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1330                 return LDB_SUCCESS;
1331         }
1332         for (i = 0; i < el->num_values; i++) {
1333                 ret = ltdb_index_del_value(module, dn, el, i);
1334                 if (ret != LDB_SUCCESS) {
1335                         return ret;
1336                 }
1337         }
1338
1339         return LDB_SUCCESS;
1340 }
1341
1342 /*
1343   delete the index entries for a record
1344   return -1 on failure
1345 */
1346 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1347 {
1348         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1349         int ret;
1350         const char *dn;
1351         unsigned int i;
1352
1353         if (ldb_dn_is_special(msg->dn)) {
1354                 return LDB_SUCCESS;
1355         }
1356
1357         ret = ltdb_index_onelevel(module, msg, 0);
1358         if (ret != LDB_SUCCESS) {
1359                 return ret;
1360         }
1361
1362         if (!ltdb->cache->attribute_indexes) {
1363                 /* no indexed fields */
1364                 return LDB_SUCCESS;
1365         }
1366
1367         dn = ldb_dn_get_linearized(msg->dn);
1368         if (dn == NULL) {
1369                 return LDB_ERR_OPERATIONS_ERROR;
1370         }
1371
1372         for (i = 0; i < msg->num_elements; i++) {
1373                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1374                 if (ret != LDB_SUCCESS) {
1375                         return ret;
1376                 }
1377         }
1378
1379         return LDB_SUCCESS;
1380 }
1381
1382
1383 /*
1384   traversal function that deletes all @INDEX records
1385 */
1386 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1387 {
1388         const char *dn = "DN=" LTDB_INDEX ":";
1389         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1390                 return tdb_delete(tdb, key);
1391         }
1392         return 0;
1393 }
1394
1395 /*
1396   traversal function that adds @INDEX records during a re index
1397 */
1398 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1399 {
1400         struct ldb_context *ldb;
1401         struct ldb_module *module = (struct ldb_module *)state;
1402         struct ldb_message *msg;
1403         const char *dn = NULL;
1404         int ret;
1405         TDB_DATA key2;
1406
1407         ldb = ldb_module_get_ctx(module);
1408
1409         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1410             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1411                 return 0;
1412         }
1413
1414         msg = talloc(module, struct ldb_message);
1415         if (msg == NULL) {
1416                 return -1;
1417         }
1418
1419         ret = ltdb_unpack_data(module, &data, msg);
1420         if (ret != 0) {
1421                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1422                           ldb_dn_get_linearized(msg->dn));
1423                 talloc_free(msg);
1424                 return -1;
1425         }
1426
1427         /* check if the DN key has changed, perhaps due to the
1428            case insensitivity of an element changing */
1429         key2 = ltdb_key(module, msg->dn);
1430         if (key2.dptr == NULL) {
1431                 /* probably a corrupt record ... darn */
1432                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1433                                                         ldb_dn_get_linearized(msg->dn));
1434                 talloc_free(msg);
1435                 return 0;
1436         }
1437         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1438                 tdb_delete(tdb, key);
1439                 tdb_store(tdb, key2, data, 0);
1440         }
1441         talloc_free(key2.dptr);
1442
1443         if (msg->dn == NULL) {
1444                 dn = (char *)key.dptr + 3;
1445         } else {
1446                 dn = ldb_dn_get_linearized(msg->dn);
1447         }
1448
1449         ret = ltdb_index_onelevel(module, msg, 1);
1450         if (ret != LDB_SUCCESS) {
1451                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1452                         "Adding special ONE LEVEL index failed (%s)!",
1453                         ldb_dn_get_linearized(msg->dn));
1454                 talloc_free(msg);
1455                 return -1;
1456         }
1457
1458         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1459
1460         talloc_free(msg);
1461
1462         if (ret != LDB_SUCCESS) return -1;
1463
1464         return 0;
1465 }
1466
1467 /*
1468   force a complete reindex of the database
1469 */
1470 int ltdb_reindex(struct ldb_module *module)
1471 {
1472         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1473         int ret;
1474
1475         if (ltdb_cache_reload(module) != 0) {
1476                 return LDB_ERR_OPERATIONS_ERROR;
1477         }
1478
1479         /* first traverse the database deleting any @INDEX records */
1480         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1481         if (ret == -1) {
1482                 return LDB_ERR_OPERATIONS_ERROR;
1483         }
1484
1485         /* if we don't have indexes we have nothing todo */
1486         if (ltdb->cache->indexlist->num_elements == 0) {
1487                 return LDB_SUCCESS;
1488         }
1489
1490         /* now traverse adding any indexes for normal LDB records */
1491         ret = tdb_traverse(ltdb->tdb, re_index, module);
1492         if (ret == -1) {
1493                 return LDB_ERR_OPERATIONS_ERROR;
1494         }
1495
1496         if (ltdb->idxptr) {
1497                 ltdb->idxptr->repack = true;
1498         }
1499
1500         return LDB_SUCCESS;
1501 }