04c3c7acce69a9130e134ff59a5c863aba9fe81a
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 talloc_free(msg);
219                 ldb_module_oom(module);
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         msg->dn = dn;
224         if (list->count > 0) {
225                 struct ldb_message_element *el;
226
227                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228                 if (ret != LDB_SUCCESS) {
229                         ldb_module_oom(module);
230                         talloc_free(msg);
231                         return LDB_ERR_OPERATIONS_ERROR;
232                 }
233                 el->values = list->dn;
234                 el->num_values = list->count;
235         }
236
237         ret = ltdb_store(module, msg, TDB_REPLACE);
238         talloc_free(msg);
239         return ret;
240 }
241
242 /*
243   save a dn_list into the database, in either @IDX or internal format
244  */
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
246                               struct dn_list *list)
247 {
248         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
249         TDB_DATA rec, key;
250         int ret;
251         struct dn_list *list2;
252
253         if (ltdb->idxptr == NULL) {
254                 return ltdb_dn_list_store_full(module, dn, list);
255         }
256
257         if (ltdb->idxptr->itdb == NULL) {
258                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259                 if (ltdb->idxptr->itdb == NULL) {
260                         return LDB_ERR_OPERATIONS_ERROR;
261                 }
262         }
263
264         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265         key.dsize = strlen((char *)key.dptr);
266
267         rec = tdb_fetch(ltdb->idxptr->itdb, key);
268         if (rec.dptr != NULL) {
269                 list2 = ltdb_index_idxptr(module, rec, false);
270                 if (list2 == NULL) {
271                         free(rec.dptr);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 free(rec.dptr);
275                 list2->dn = talloc_steal(list2, list->dn);
276                 list2->count = list->count;
277                 return LDB_SUCCESS;
278         }
279
280         list2 = talloc(ltdb->idxptr, struct dn_list);
281         if (list2 == NULL) {
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284         list2->dn = talloc_steal(list2, list->dn);
285         list2->count = list->count;
286
287         rec.dptr = (uint8_t *)&list2;
288         rec.dsize = sizeof(void *);
289
290         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
291         if (ret == -1) {
292                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
293         }
294         return LDB_SUCCESS;
295 }
296
297 /*
298   traverse function for storing the in-memory index entries on disk
299  */
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
301 {
302         struct ldb_module *module = state;
303         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
304         struct ldb_dn *dn;
305         struct ldb_context *ldb = ldb_module_get_ctx(module);
306         struct ldb_val v;
307         struct dn_list *list;
308
309         list = ltdb_index_idxptr(module, data, true);
310         if (list == NULL) {
311                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
312                 return -1;
313         }
314
315         v.data = key.dptr;
316         v.length = key.dsize;
317
318         dn = ldb_dn_from_ldb_val(module, ldb, &v);
319         if (dn == NULL) {
320                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
321                 return -1;
322         }
323
324         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
325         talloc_free(dn);
326         if (ltdb->idxptr->error != 0) {
327                 return -1;
328         }
329         return 0;
330 }
331
332 /* cleanup the idxptr mode when transaction commits */
333 int ltdb_index_transaction_commit(struct ldb_module *module)
334 {
335         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
336         int ret;
337
338         if (ltdb->idxptr->itdb) {
339                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
340                 tdb_close(ltdb->idxptr->itdb);
341         }
342
343         ret = ltdb->idxptr->error;
344
345         if (ret != LDB_SUCCESS) {
346                 struct ldb_context *ldb = ldb_module_get_ctx(module);
347                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
348         }
349
350         talloc_free(ltdb->idxptr);
351         ltdb->idxptr = NULL;
352         return ret;
353 }
354
355 /* cleanup the idxptr mode when transaction cancels */
356 int ltdb_index_transaction_cancel(struct ldb_module *module)
357 {
358         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
359         if (ltdb->idxptr && ltdb->idxptr->itdb) {
360                 tdb_close(ltdb->idxptr->itdb);
361         }
362         talloc_free(ltdb->idxptr);
363         ltdb->idxptr = NULL;
364         return LDB_SUCCESS;
365 }
366
367
368 /*
369   return the dn key to be used for an index
370   the caller is responsible for freeing
371 */
372 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
373                                      const char *attr, const struct ldb_val *value,
374                                      const struct ldb_schema_attribute **ap)
375 {
376         struct ldb_dn *ret;
377         struct ldb_val v;
378         const struct ldb_schema_attribute *a;
379         char *attr_folded;
380         int r;
381
382         attr_folded = ldb_attr_casefold(ldb, attr);
383         if (!attr_folded) {
384                 return NULL;
385         }
386
387         a = ldb_schema_attribute_by_name(ldb, attr);
388         if (ap) {
389                 *ap = a;
390         }
391         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
392         if (r != LDB_SUCCESS) {
393                 const char *errstr = ldb_errstring(ldb);
394                 /* canonicalisation can be refused. For example, 
395                    a attribute that takes wildcards will refuse to canonicalise
396                    if the value contains a wildcard */
397                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
398                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
399                 talloc_free(attr_folded);
400                 return NULL;
401         }
402         if (ldb_should_b64_encode(ldb, &v)) {
403                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
404                 if (!vstr) return NULL;
405                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
406                 talloc_free(vstr);
407         } else {
408                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
409         }
410
411         if (v.data != value->data) {
412                 talloc_free(v.data);
413         }
414         talloc_free(attr_folded);
415
416         return ret;
417 }
418
419 /*
420   see if a attribute value is in the list of indexed attributes
421 */
422 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
423 {
424         unsigned int i;
425         struct ldb_message_element *el;
426
427         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
428         if (el == NULL) {
429                 return false;
430         }
431         for (i=0; i<el->num_values; i++) {
432                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
433                         return true;
434                 }
435         }
436         return false;
437 }
438
439 /*
440   in the following logic functions, the return value is treated as
441   follows:
442
443      LDB_SUCCESS: we found some matching index values
444
445      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
446
447      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
448                                we'll need a full search
449  */
450
451 /*
452   return a list of dn's that might match a simple indexed search (an
453   equality search only)
454  */
455 static int ltdb_index_dn_simple(struct ldb_module *module,
456                                 const struct ldb_parse_tree *tree,
457                                 const struct ldb_message *index_list,
458                                 struct dn_list *list)
459 {
460         struct ldb_context *ldb;
461         struct ldb_dn *dn;
462         int ret;
463
464         ldb = ldb_module_get_ctx(module);
465
466         list->count = 0;
467         list->dn = NULL;
468
469         /* if the attribute isn't in the list of indexed attributes then
470            this node needs a full search */
471         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
472                 return LDB_ERR_OPERATIONS_ERROR;
473         }
474
475         /* the attribute is indexed. Pull the list of DNs that match the 
476            search criterion */
477         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
478         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
479
480         ret = ltdb_dn_list_load(module, dn, list);
481         talloc_free(dn);
482         return ret;
483 }
484
485
486 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
487
488 /*
489   return a list of dn's that might match a leaf indexed search
490  */
491 static int ltdb_index_dn_leaf(struct ldb_module *module,
492                               const struct ldb_parse_tree *tree,
493                               const struct ldb_message *index_list,
494                               struct dn_list *list)
495 {
496         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
497                 list->dn = talloc_array(list, struct ldb_val, 1);
498                 if (list->dn == NULL) {
499                         ldb_module_oom(module);
500                         return LDB_ERR_OPERATIONS_ERROR;
501                 }
502                 list->dn[0] = tree->u.equality.value;
503                 list->count = 1;
504                 return LDB_SUCCESS;
505         }
506         return ltdb_index_dn_simple(module, tree, index_list, list);
507 }
508
509
510 /*
511   list intersection
512   list = list & list2
513 */
514 static bool list_intersect(struct ldb_context *ldb,
515                            struct dn_list *list, const struct dn_list *list2)
516 {
517         struct dn_list *list3;
518         unsigned int i;
519
520         if (list->count == 0) {
521                 /* 0 & X == 0 */
522                 return true;
523         }
524         if (list2->count == 0) {
525                 /* X & 0 == 0 */
526                 list->count = 0;
527                 list->dn = NULL;
528                 return true;
529         }
530
531         /* the indexing code is allowed to return a longer list than
532            what really matches, as all results are filtered by the
533            full expression at the end - this shortcut avoids a lot of
534            work in some cases */
535         if (list->count < 2 && list2->count > 10) {
536                 return true;
537         }
538         if (list2->count < 2 && list->count > 10) {
539                 list->count = list2->count;
540                 list->dn = list2->dn;
541                 /* note that list2 may not be the parent of list2->dn,
542                    as list2->dn may be owned by ltdb->idxptr. In that
543                    case we expect this reparent call to fail, which is
544                    OK */
545                 talloc_reparent(list2, list, list2->dn);
546                 return true;
547         }
548
549         list3 = talloc_zero(list, struct dn_list);
550         if (list3 == NULL) {
551                 return false;
552         }
553
554         list3->dn = talloc_array(list3, struct ldb_val, list->count);
555         if (!list3->dn) {
556                 talloc_free(list3);
557                 return false;
558         }
559         list3->count = 0;
560
561         for (i=0;i<list->count;i++) {
562                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
563                         list3->dn[list3->count] = list->dn[i];
564                         list3->count++;
565                 }
566         }
567
568         list->dn = talloc_steal(list, list3->dn);
569         list->count = list3->count;
570         talloc_free(list3);
571
572         return true;
573 }
574
575
576 /*
577   list union
578   list = list | list2
579 */
580 static bool list_union(struct ldb_context *ldb,
581                        struct dn_list *list, const struct dn_list *list2)
582 {
583         struct ldb_val *dn3;
584
585         if (list2->count == 0) {
586                 /* X | 0 == X */
587                 return true;
588         }
589
590         if (list->count == 0) {
591                 /* 0 | X == X */
592                 list->count = list2->count;
593                 list->dn = list2->dn;
594                 /* note that list2 may not be the parent of list2->dn,
595                    as list2->dn may be owned by ltdb->idxptr. In that
596                    case we expect this reparent call to fail, which is
597                    OK */
598                 talloc_reparent(list2, list, list2->dn);
599                 return true;
600         }
601
602         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
603         if (!dn3) {
604                 ldb_oom(ldb);
605                 return false;
606         }
607
608         /* we allow for duplicates here, and get rid of them later */
609         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
610         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
611
612         list->dn = dn3;
613         list->count += list2->count;
614
615         return true;
616 }
617
618 static int ltdb_index_dn(struct ldb_module *module,
619                          const struct ldb_parse_tree *tree,
620                          const struct ldb_message *index_list,
621                          struct dn_list *list);
622
623
624 /*
625   process an OR list (a union)
626  */
627 static int ltdb_index_dn_or(struct ldb_module *module,
628                             const struct ldb_parse_tree *tree,
629                             const struct ldb_message *index_list,
630                             struct dn_list *list)
631 {
632         struct ldb_context *ldb;
633         unsigned int i;
634
635         ldb = ldb_module_get_ctx(module);
636
637         list->dn = NULL;
638         list->count = 0;
639
640         for (i=0; i<tree->u.list.num_elements; i++) {
641                 struct dn_list *list2;
642                 int ret;
643
644                 list2 = talloc_zero(list, struct dn_list);
645                 if (list2 == NULL) {
646                         return LDB_ERR_OPERATIONS_ERROR;
647                 }
648
649                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
650
651                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
652                         /* X || 0 == X */
653                         talloc_free(list2);
654                         continue;
655                 }
656
657                 if (ret != LDB_SUCCESS) {
658                         /* X || * == * */
659                         talloc_free(list2);
660                         return ret;
661                 }
662
663                 if (!list_union(ldb, list, list2)) {
664                         talloc_free(list2);
665                         return LDB_ERR_OPERATIONS_ERROR;
666                 }
667         }
668
669         if (list->count == 0) {
670                 return LDB_ERR_NO_SUCH_OBJECT;
671         }
672
673         return LDB_SUCCESS;
674 }
675
676
677 /*
678   NOT an index results
679  */
680 static int ltdb_index_dn_not(struct ldb_module *module,
681                              const struct ldb_parse_tree *tree,
682                              const struct ldb_message *index_list,
683                              struct dn_list *list)
684 {
685         /* the only way to do an indexed not would be if we could
686            negate the not via another not or if we knew the total
687            number of database elements so we could know that the
688            existing expression covered the whole database.
689
690            instead, we just give up, and rely on a full index scan
691            (unless an outer & manages to reduce the list)
692         */
693         return LDB_ERR_OPERATIONS_ERROR;
694 }
695
696
697 static bool ltdb_index_unique(struct ldb_context *ldb,
698                               const char *attr)
699 {
700         const struct ldb_schema_attribute *a;
701         a = ldb_schema_attribute_by_name(ldb, attr);
702         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
703                 return true;
704         }
705         return false;
706 }
707
708 /*
709   process an AND expression (intersection)
710  */
711 static int ltdb_index_dn_and(struct ldb_module *module,
712                              const struct ldb_parse_tree *tree,
713                              const struct ldb_message *index_list,
714                              struct dn_list *list)
715 {
716         struct ldb_context *ldb;
717         unsigned int i;
718         bool found;
719
720         ldb = ldb_module_get_ctx(module);
721
722         list->dn = NULL;
723         list->count = 0;
724
725         /* in the first pass we only look for unique simple
726            equality tests, in the hope of avoiding having to look
727            at any others */
728         for (i=0; i<tree->u.list.num_elements; i++) {
729                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
730                 int ret;
731
732                 if (subtree->operation != LDB_OP_EQUALITY ||
733                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
734                         continue;
735                 }
736                 
737                 ret = ltdb_index_dn(module, subtree, index_list, list);
738                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
739                         /* 0 && X == 0 */
740                         return LDB_ERR_NO_SUCH_OBJECT;
741                 }
742                 if (ret == LDB_SUCCESS) {
743                         /* a unique index match means we can
744                          * stop. Note that we don't care if we return
745                          * a few too many objects, due to later
746                          * filtering */
747                         return LDB_SUCCESS;
748                 }
749         }       
750
751         /* now do a full intersection */
752         found = false;
753
754         for (i=0; i<tree->u.list.num_elements; i++) {
755                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
756                 struct dn_list *list2;
757                 int ret;
758
759                 list2 = talloc_zero(list, struct dn_list);
760                 if (list2 == NULL) {
761                         ldb_module_oom(module);
762                         return LDB_ERR_OPERATIONS_ERROR;
763                 }
764                         
765                 ret = ltdb_index_dn(module, subtree, index_list, list2);
766
767                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
768                         /* X && 0 == 0 */
769                         list->dn = NULL;
770                         list->count = 0;
771                         talloc_free(list2);
772                         return LDB_ERR_NO_SUCH_OBJECT;
773                 }
774                 
775                 if (ret != LDB_SUCCESS) {
776                         /* this didn't adding anything */
777                         talloc_free(list2);
778                         continue;
779                 }
780
781                 if (!found) {
782                         talloc_reparent(list2, list, list->dn);
783                         list->dn = list2->dn;
784                         list->count = list2->count;
785                         found = true;
786                 } else if (!list_intersect(ldb, list, list2)) {
787                         talloc_free(list2);
788                         return LDB_ERR_OPERATIONS_ERROR;
789                 }
790                         
791                 if (list->count == 0) {
792                         list->dn = NULL;
793                         return LDB_ERR_NO_SUCH_OBJECT;
794                 }
795                         
796                 if (list->count < 2) {
797                         /* it isn't worth loading the next part of the tree */
798                         return LDB_SUCCESS;
799                 }
800         }       
801
802         if (!found) {
803                 /* none of the attributes were indexed */
804                 return LDB_ERR_OPERATIONS_ERROR;
805         }
806
807         return LDB_SUCCESS;
808 }
809         
810 /*
811   return a list of matching objects using a one-level index
812  */
813 static int ltdb_index_dn_one(struct ldb_module *module,
814                              struct ldb_dn *parent_dn,
815                              struct dn_list *list)
816 {
817         struct ldb_context *ldb;
818         struct ldb_dn *key;
819         struct ldb_val val;
820         int ret;
821
822         ldb = ldb_module_get_ctx(module);
823
824         /* work out the index key from the parent DN */
825         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
826         val.length = strlen((char *)val.data);
827         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
828         if (!key) {
829                 ldb_oom(ldb);
830                 return LDB_ERR_OPERATIONS_ERROR;
831         }
832
833         ret = ltdb_dn_list_load(module, key, list);
834         talloc_free(key);
835         if (ret != LDB_SUCCESS) {
836                 return ret;
837         }
838
839         if (list->count == 0) {
840                 return LDB_ERR_NO_SUCH_OBJECT;
841         }
842
843         return LDB_SUCCESS;
844 }
845
846 /*
847   return a list of dn's that might match a indexed search or
848   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
849  */
850 static int ltdb_index_dn(struct ldb_module *module,
851                          const struct ldb_parse_tree *tree,
852                          const struct ldb_message *index_list,
853                          struct dn_list *list)
854 {
855         int ret = LDB_ERR_OPERATIONS_ERROR;
856
857         switch (tree->operation) {
858         case LDB_OP_AND:
859                 ret = ltdb_index_dn_and(module, tree, index_list, list);
860                 break;
861
862         case LDB_OP_OR:
863                 ret = ltdb_index_dn_or(module, tree, index_list, list);
864                 break;
865
866         case LDB_OP_NOT:
867                 ret = ltdb_index_dn_not(module, tree, index_list, list);
868                 break;
869
870         case LDB_OP_EQUALITY:
871                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
872                 break;
873
874         case LDB_OP_SUBSTRING:
875         case LDB_OP_GREATER:
876         case LDB_OP_LESS:
877         case LDB_OP_PRESENT:
878         case LDB_OP_APPROX:
879         case LDB_OP_EXTENDED:
880                 /* we can't index with fancy bitops yet */
881                 ret = LDB_ERR_OPERATIONS_ERROR;
882                 break;
883         }
884
885         return ret;
886 }
887
888 /*
889   filter a candidate dn_list from an indexed search into a set of results
890   extracting just the given attributes
891 */
892 static int ltdb_index_filter(const struct dn_list *dn_list,
893                              struct ltdb_context *ac, 
894                              uint32_t *match_count)
895 {
896         struct ldb_context *ldb;
897         struct ldb_message *msg;
898         unsigned int i;
899
900         ldb = ldb_module_get_ctx(ac->module);
901
902         for (i = 0; i < dn_list->count; i++) {
903                 struct ldb_dn *dn;
904                 int ret;
905
906                 msg = ldb_msg_new(ac);
907                 if (!msg) {
908                         return LDB_ERR_OPERATIONS_ERROR;
909                 }
910
911                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
912                 if (dn == NULL) {
913                         talloc_free(msg);
914                         return LDB_ERR_OPERATIONS_ERROR;
915                 }
916
917                 ret = ltdb_search_dn1(ac->module, dn, msg);
918                 talloc_free(dn);
919                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
920                         /* the record has disappeared? yes, this can happen */
921                         talloc_free(msg);
922                         continue;
923                 }
924
925                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
926                         /* an internal error */
927                         talloc_free(msg);
928                         return LDB_ERR_OPERATIONS_ERROR;
929                 }
930
931                 if (!ldb_match_msg(ldb, msg,
932                                    ac->tree, ac->base, ac->scope)) {
933                         talloc_free(msg);
934                         continue;
935                 }
936
937                 /* filter the attributes that the user wants */
938                 ret = ltdb_filter_attrs(msg, ac->attrs);
939
940                 if (ret == -1) {
941                         talloc_free(msg);
942                         return LDB_ERR_OPERATIONS_ERROR;
943                 }
944
945                 ret = ldb_module_send_entry(ac->req, msg, NULL);
946                 if (ret != LDB_SUCCESS) {
947                         ac->request_terminated = true;
948                         return ret;
949                 }
950
951                 (*match_count)++;
952         }
953
954         return LDB_SUCCESS;
955 }
956
957 /*
958   remove any duplicated entries in a indexed result
959  */
960 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
961 {
962         int i, new_count;
963
964         if (list->count < 2) {
965                 return;
966         }
967
968         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
969
970         new_count = 1;
971         for (i=1; i<list->count; i++) {
972                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
973                         if (new_count != i) {
974                                 list->dn[new_count] = list->dn[i];
975                         }
976                         new_count++;
977                 }
978         }
979         
980         list->count = new_count;
981 }
982
983 /*
984   search the database with a LDAP-like expression using indexes
985   returns -1 if an indexed search is not possible, in which
986   case the caller should call ltdb_search_full()
987 */
988 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
989 {
990         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
991         struct dn_list *dn_list;
992         int ret;
993
994         /* see if indexing is enabled */
995         if (!ltdb->cache->attribute_indexes && 
996             !ltdb->cache->one_level_indexes &&
997             ac->scope != LDB_SCOPE_BASE) {
998                 /* fallback to a full search */
999                 return LDB_ERR_OPERATIONS_ERROR;
1000         }
1001
1002         dn_list = talloc_zero(ac, struct dn_list);
1003         if (dn_list == NULL) {
1004                 ldb_module_oom(ac->module);
1005                 return LDB_ERR_OPERATIONS_ERROR;
1006         }
1007
1008         switch (ac->scope) {
1009         case LDB_SCOPE_BASE:
1010                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1011                 if (dn_list->dn == NULL) {
1012                         ldb_module_oom(ac->module);
1013                         talloc_free(dn_list);
1014                         return LDB_ERR_OPERATIONS_ERROR;
1015                 }
1016                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1017                 if (dn_list->dn[0].data == NULL) {
1018                         ldb_module_oom(ac->module);
1019                         talloc_free(dn_list);
1020                         return LDB_ERR_OPERATIONS_ERROR;
1021                 }
1022                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1023                 dn_list->count = 1;
1024                 break;          
1025
1026         case LDB_SCOPE_ONELEVEL:
1027                 if (!ltdb->cache->one_level_indexes) {
1028                         talloc_free(dn_list);
1029                         return LDB_ERR_OPERATIONS_ERROR;
1030                 }
1031                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1032                 if (ret != LDB_SUCCESS) {
1033                         talloc_free(dn_list);
1034                         return ret;
1035                 }
1036                 break;
1037
1038         case LDB_SCOPE_SUBTREE:
1039         case LDB_SCOPE_DEFAULT:
1040                 if (!ltdb->cache->attribute_indexes) {
1041                         talloc_free(dn_list);
1042                         return LDB_ERR_OPERATIONS_ERROR;
1043                 }
1044                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1045                 if (ret != LDB_SUCCESS) {
1046                         talloc_free(dn_list);
1047                         return ret;
1048                 }
1049                 ltdb_dn_list_remove_duplicates(dn_list);
1050                 break;
1051         }
1052
1053         ret = ltdb_index_filter(dn_list, ac, match_count);
1054         talloc_free(dn_list);
1055         return ret;
1056 }
1057
1058 /*
1059   add an index entry for one message element
1060 */
1061 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1062                            struct ldb_message_element *el, int v_idx)
1063 {
1064         struct ldb_context *ldb;
1065         struct ldb_dn *dn_key;
1066         int ret;
1067         const struct ldb_schema_attribute *a;
1068         struct dn_list *list;
1069         unsigned alloc_len;
1070
1071         ldb = ldb_module_get_ctx(module);
1072
1073         list = talloc_zero(module, struct dn_list);
1074         if (list == NULL) {
1075                 return LDB_ERR_OPERATIONS_ERROR;
1076         }
1077
1078         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1079         if (!dn_key) {
1080                 talloc_free(list);
1081                 return LDB_ERR_OPERATIONS_ERROR;
1082         }
1083         talloc_steal(list, dn_key);
1084
1085         ret = ltdb_dn_list_load(module, dn_key, list);
1086         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1087                 talloc_free(list);
1088                 return ret;
1089         }
1090
1091         if (ltdb_dn_list_find_str(list, dn) != -1) {
1092                 talloc_free(list);
1093                 return LDB_SUCCESS;
1094         }
1095
1096         if (list->count > 0 &&
1097             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1098                 talloc_free(list);
1099                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1100         }
1101
1102         /* overallocate the list a bit, to reduce the number of
1103          * realloc trigered copies */    
1104         alloc_len = ((list->count+1)+7) & ~7;
1105         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1106         if (list->dn == NULL) {
1107                 talloc_free(list);
1108                 return LDB_ERR_OPERATIONS_ERROR;
1109         }
1110         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1111         list->dn[list->count].length = strlen(dn);
1112         list->count++;
1113
1114         ret = ltdb_dn_list_store(module, dn_key, list);
1115
1116         talloc_free(list);
1117
1118         return ret;
1119 }
1120
1121 /*
1122   add index entries for one elements in a message
1123  */
1124 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1125                              struct ldb_message_element *el)
1126 {
1127         unsigned int i;
1128         for (i = 0; i < el->num_values; i++) {
1129                 int ret = ltdb_index_add1(module, dn, el, i);
1130                 if (ret != LDB_SUCCESS) {
1131                         return ret;
1132                 }
1133         }
1134
1135         return LDB_SUCCESS;
1136 }
1137
1138 /*
1139   add index entries for all elements in a message
1140  */
1141 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1142                               struct ldb_message_element *elements, int num_el)
1143 {
1144         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1145         unsigned int i;
1146
1147         if (dn[0] == '@') {
1148                 return LDB_SUCCESS;
1149         }
1150
1151         if (ltdb->cache->indexlist->num_elements == 0) {
1152                 /* no indexed fields */
1153                 return LDB_SUCCESS;
1154         }
1155
1156         for (i = 0; i < num_el; i++) {
1157                 int ret;
1158                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1159                         continue;
1160                 }
1161                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1162                 if (ret != LDB_SUCCESS) {
1163                         return ret;
1164                 }
1165         }
1166
1167         return LDB_SUCCESS;
1168 }
1169
1170
1171 /*
1172   insert a one level index for a message
1173 */
1174 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1175 {
1176         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1177         struct ldb_message_element el;
1178         struct ldb_val val;
1179         struct ldb_dn *pdn;
1180         const char *dn;
1181         int ret;
1182
1183         /* We index for ONE Level only if requested */
1184         if (!ltdb->cache->one_level_indexes) {
1185                 return LDB_SUCCESS;
1186         }
1187
1188         pdn = ldb_dn_get_parent(module, msg->dn);
1189         if (pdn == NULL) {
1190                 return LDB_ERR_OPERATIONS_ERROR;
1191         }
1192
1193         dn = ldb_dn_get_linearized(msg->dn);
1194         if (dn == NULL) {
1195                 talloc_free(pdn);
1196                 return LDB_ERR_OPERATIONS_ERROR;
1197         }
1198
1199         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1200         if (val.data == NULL) {
1201                 talloc_free(pdn);
1202                 return LDB_ERR_OPERATIONS_ERROR;
1203         }
1204
1205         val.length = strlen((char *)val.data);
1206         el.name = LTDB_IDXONE;
1207         el.values = &val;
1208         el.num_values = 1;
1209
1210         if (add) {
1211                 ret = ltdb_index_add1(module, dn, &el, 0);
1212         } else { /* delete */
1213                 ret = ltdb_index_del_value(module, dn, &el, 0);
1214         }
1215
1216         talloc_free(pdn);
1217
1218         return ret;
1219 }
1220
1221 /*
1222   add the index entries for a new element in a record
1223   The caller guarantees that these element values are not yet indexed
1224 */
1225 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1226                            struct ldb_message_element *el)
1227 {
1228         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1229         if (ldb_dn_is_special(dn)) {
1230                 return LDB_SUCCESS;
1231         }
1232         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1233                 return LDB_SUCCESS;
1234         }
1235         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1236 }
1237
1238 /*
1239   add the index entries for a new record
1240 */
1241 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1242 {
1243         const char *dn;
1244         int ret;
1245
1246         if (ldb_dn_is_special(msg->dn)) {
1247                 return LDB_SUCCESS;
1248         }
1249
1250         dn = ldb_dn_get_linearized(msg->dn);
1251         if (dn == NULL) {
1252                 return LDB_ERR_OPERATIONS_ERROR;
1253         }
1254
1255         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1256         if (ret != LDB_SUCCESS) {
1257                 return ret;
1258         }
1259
1260         return ltdb_index_onelevel(module, msg, 1);
1261 }
1262
1263
1264 /*
1265   delete an index entry for one message element
1266 */
1267 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1268                          struct ldb_message_element *el, int v_idx)
1269 {
1270         struct ldb_context *ldb;
1271         struct ldb_dn *dn_key;
1272         int ret, i;
1273         struct dn_list *list;
1274
1275         ldb = ldb_module_get_ctx(module);
1276
1277         if (dn[0] == '@') {
1278                 return LDB_SUCCESS;
1279         }
1280
1281         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1282         if (!dn_key) {
1283                 return LDB_ERR_OPERATIONS_ERROR;
1284         }
1285
1286         list = talloc_zero(dn_key, struct dn_list);
1287         if (list == NULL) {
1288                 talloc_free(dn_key);
1289                 return LDB_ERR_OPERATIONS_ERROR;
1290         }
1291
1292         ret = ltdb_dn_list_load(module, dn_key, list);
1293         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1294                 /* it wasn't indexed. Did we have an earlier error? If we did then
1295                    its gone now */
1296                 talloc_free(dn_key);
1297                 return LDB_SUCCESS;
1298         }
1299
1300         if (ret != LDB_SUCCESS) {
1301                 talloc_free(dn_key);
1302                 return ret;
1303         }
1304
1305         i = ltdb_dn_list_find_str(list, dn);
1306         if (i == -1) {
1307                 /* nothing to delete */
1308                 talloc_free(dn_key);
1309                 return LDB_SUCCESS;             
1310         }
1311
1312         if (i != list->count-1) {
1313                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1314         }
1315         list->count--;
1316         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1317
1318         ret = ltdb_dn_list_store(module, dn_key, list);
1319
1320         talloc_free(dn_key);
1321
1322         return ret;
1323 }
1324
1325 /*
1326   delete the index entries for a element
1327   return -1 on failure
1328 */
1329 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1330 {
1331         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1332         int ret;
1333         unsigned int i;
1334
1335         if (!ltdb->cache->attribute_indexes) {
1336                 /* no indexed fields */
1337                 return LDB_SUCCESS;
1338         }
1339
1340         if (dn[0] == '@') {
1341                 return LDB_SUCCESS;
1342         }
1343
1344         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1345                 return LDB_SUCCESS;
1346         }
1347         for (i = 0; i < el->num_values; i++) {
1348                 ret = ltdb_index_del_value(module, dn, el, i);
1349                 if (ret != LDB_SUCCESS) {
1350                         return ret;
1351                 }
1352         }
1353
1354         return LDB_SUCCESS;
1355 }
1356
1357 /*
1358   delete the index entries for a record
1359   return -1 on failure
1360 */
1361 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1362 {
1363         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1364         int ret;
1365         const char *dn;
1366         unsigned int i;
1367
1368         if (ldb_dn_is_special(msg->dn)) {
1369                 return LDB_SUCCESS;
1370         }
1371
1372         ret = ltdb_index_onelevel(module, msg, 0);
1373         if (ret != LDB_SUCCESS) {
1374                 return ret;
1375         }
1376
1377         if (!ltdb->cache->attribute_indexes) {
1378                 /* no indexed fields */
1379                 return LDB_SUCCESS;
1380         }
1381
1382         dn = ldb_dn_get_linearized(msg->dn);
1383         if (dn == NULL) {
1384                 return LDB_ERR_OPERATIONS_ERROR;
1385         }
1386
1387         for (i = 0; i < msg->num_elements; i++) {
1388                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1389                 if (ret != LDB_SUCCESS) {
1390                         return ret;
1391                 }
1392         }
1393
1394         return LDB_SUCCESS;
1395 }
1396
1397
1398 /*
1399   traversal function that deletes all @INDEX records
1400 */
1401 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1402 {
1403         const char *dn = "DN=" LTDB_INDEX ":";
1404         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1405                 return tdb_delete(tdb, key);
1406         }
1407         return 0;
1408 }
1409
1410 /*
1411   traversal function that adds @INDEX records during a re index
1412 */
1413 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1414 {
1415         struct ldb_context *ldb;
1416         struct ldb_module *module = (struct ldb_module *)state;
1417         struct ldb_message *msg;
1418         const char *dn = NULL;
1419         int ret;
1420         TDB_DATA key2;
1421
1422         ldb = ldb_module_get_ctx(module);
1423
1424         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1425             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1426                 return 0;
1427         }
1428
1429         msg = ldb_msg_new(module);
1430         if (msg == NULL) {
1431                 return -1;
1432         }
1433
1434         ret = ltdb_unpack_data(module, &data, msg);
1435         if (ret != 0) {
1436                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1437                           ldb_dn_get_linearized(msg->dn));
1438                 talloc_free(msg);
1439                 return -1;
1440         }
1441
1442         /* check if the DN key has changed, perhaps due to the
1443            case insensitivity of an element changing */
1444         key2 = ltdb_key(module, msg->dn);
1445         if (key2.dptr == NULL) {
1446                 /* probably a corrupt record ... darn */
1447                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1448                                                         ldb_dn_get_linearized(msg->dn));
1449                 talloc_free(msg);
1450                 return 0;
1451         }
1452         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1453                 tdb_delete(tdb, key);
1454                 tdb_store(tdb, key2, data, 0);
1455         }
1456         talloc_free(key2.dptr);
1457
1458         if (msg->dn == NULL) {
1459                 dn = (char *)key.dptr + 3;
1460         } else {
1461                 dn = ldb_dn_get_linearized(msg->dn);
1462         }
1463
1464         ret = ltdb_index_onelevel(module, msg, 1);
1465         if (ret != LDB_SUCCESS) {
1466                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1467                         "Adding special ONE LEVEL index failed (%s)!",
1468                         ldb_dn_get_linearized(msg->dn));
1469                 talloc_free(msg);
1470                 return -1;
1471         }
1472
1473         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1474
1475         talloc_free(msg);
1476
1477         if (ret != LDB_SUCCESS) return -1;
1478
1479         return 0;
1480 }
1481
1482 /*
1483   force a complete reindex of the database
1484 */
1485 int ltdb_reindex(struct ldb_module *module)
1486 {
1487         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1488         int ret;
1489
1490         if (ltdb_cache_reload(module) != 0) {
1491                 return LDB_ERR_OPERATIONS_ERROR;
1492         }
1493
1494         /* first traverse the database deleting any @INDEX records */
1495         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1496         if (ret == -1) {
1497                 return LDB_ERR_OPERATIONS_ERROR;
1498         }
1499
1500         /* if we don't have indexes we have nothing todo */
1501         if (ltdb->cache->indexlist->num_elements == 0) {
1502                 return LDB_SUCCESS;
1503         }
1504
1505         /* now traverse adding any indexes for normal LDB records */
1506         ret = tdb_traverse(ltdb->tdb, re_index, module);
1507         if (ret == -1) {
1508                 return LDB_ERR_OPERATIONS_ERROR;
1509         }
1510
1511         if (ltdb->idxptr) {
1512                 ltdb->idxptr->repack = true;
1513         }
1514
1515         return LDB_SUCCESS;
1516 }