s4-ldb: added a TODO about checking the indexlist
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         bool repack;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65         if (ret != 0) return ret;
66         if (v2->length > v1->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return 0;
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 talloc_free(msg);
219                 ldb_module_oom(module);
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         msg->dn = dn;
224         if (list->count > 0) {
225                 struct ldb_message_element *el;
226
227                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228                 if (ret != LDB_SUCCESS) {
229                         ldb_module_oom(module);
230                         talloc_free(msg);
231                         return LDB_ERR_OPERATIONS_ERROR;
232                 }
233                 el->values = list->dn;
234                 el->num_values = list->count;
235         }
236
237         ret = ltdb_store(module, msg, TDB_REPLACE);
238         talloc_free(msg);
239         return ret;
240 }
241
242 /*
243   save a dn_list into the database, in either @IDX or internal format
244  */
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
246                               struct dn_list *list)
247 {
248         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
249         TDB_DATA rec, key;
250         int ret;
251         struct dn_list *list2;
252
253         if (ltdb->idxptr == NULL) {
254                 return ltdb_dn_list_store_full(module, dn, list);
255         }
256
257         if (ltdb->idxptr->itdb == NULL) {
258                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259                 if (ltdb->idxptr->itdb == NULL) {
260                         return LDB_ERR_OPERATIONS_ERROR;
261                 }
262         }
263
264         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265         key.dsize = strlen((char *)key.dptr);
266
267         rec = tdb_fetch(ltdb->idxptr->itdb, key);
268         if (rec.dptr != NULL) {
269                 list2 = ltdb_index_idxptr(module, rec, false);
270                 if (list2 == NULL) {
271                         free(rec.dptr);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 free(rec.dptr);
275                 list2->dn = talloc_steal(list2, list->dn);
276                 list2->count = list->count;
277                 return LDB_SUCCESS;
278         }
279
280         list2 = talloc(ltdb->idxptr, struct dn_list);
281         if (list2 == NULL) {
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284         list2->dn = talloc_steal(list2, list->dn);
285         list2->count = list->count;
286
287         rec.dptr = (uint8_t *)&list2;
288         rec.dsize = sizeof(void *);
289
290         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
291         if (ret == -1) {
292                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
293         }
294         return LDB_SUCCESS;
295 }
296
297 /*
298   traverse function for storing the in-memory index entries on disk
299  */
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
301 {
302         struct ldb_module *module = state;
303         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
304         struct ldb_dn *dn;
305         struct ldb_context *ldb = ldb_module_get_ctx(module);
306         struct ldb_val v;
307         struct dn_list *list;
308
309         list = ltdb_index_idxptr(module, data, true);
310         if (list == NULL) {
311                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
312                 return -1;
313         }
314
315         v.data = key.dptr;
316         v.length = key.dsize;
317
318         dn = ldb_dn_from_ldb_val(module, ldb, &v);
319         if (dn == NULL) {
320                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
321                 return -1;
322         }
323
324         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
325         talloc_free(dn);
326         if (ltdb->idxptr->error != 0) {
327                 return -1;
328         }
329         return 0;
330 }
331
332 /* cleanup the idxptr mode when transaction commits */
333 int ltdb_index_transaction_commit(struct ldb_module *module)
334 {
335         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
336         int ret;
337
338         if (ltdb->idxptr->itdb) {
339                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
340                 tdb_close(ltdb->idxptr->itdb);
341         }
342
343         ret = ltdb->idxptr->error;
344
345         if (ret != LDB_SUCCESS) {
346                 struct ldb_context *ldb = ldb_module_get_ctx(module);
347                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
348         }
349
350         talloc_free(ltdb->idxptr);
351         ltdb->idxptr = NULL;
352         return ret;
353 }
354
355 /* cleanup the idxptr mode when transaction cancels */
356 int ltdb_index_transaction_cancel(struct ldb_module *module)
357 {
358         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
359         if (ltdb->idxptr && ltdb->idxptr->itdb) {
360                 tdb_close(ltdb->idxptr->itdb);
361         }
362         talloc_free(ltdb->idxptr);
363         ltdb->idxptr = NULL;
364         return LDB_SUCCESS;
365 }
366
367
368 /*
369   return the dn key to be used for an index
370   the caller is responsible for freeing
371 */
372 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
373                                      const char *attr, const struct ldb_val *value,
374                                      const struct ldb_schema_attribute **ap)
375 {
376         struct ldb_dn *ret;
377         struct ldb_val v;
378         const struct ldb_schema_attribute *a;
379         char *attr_folded;
380         int r;
381
382         attr_folded = ldb_attr_casefold(ldb, attr);
383         if (!attr_folded) {
384                 return NULL;
385         }
386
387         a = ldb_schema_attribute_by_name(ldb, attr);
388         if (ap) {
389                 *ap = a;
390         }
391         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
392         if (r != LDB_SUCCESS) {
393                 const char *errstr = ldb_errstring(ldb);
394                 /* canonicalisation can be refused. For example, 
395                    a attribute that takes wildcards will refuse to canonicalise
396                    if the value contains a wildcard */
397                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
398                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
399                 talloc_free(attr_folded);
400                 return NULL;
401         }
402         if (ldb_should_b64_encode(ldb, &v)) {
403                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
404                 if (!vstr) return NULL;
405                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
406                 talloc_free(vstr);
407         } else {
408                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
409         }
410
411         if (v.data != value->data) {
412                 talloc_free(v.data);
413         }
414         talloc_free(attr_folded);
415
416         return ret;
417 }
418
419 /*
420   see if a attribute value is in the list of indexed attributes
421 */
422 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
423 {
424         unsigned int i;
425         struct ldb_message_element *el;
426
427         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
428         if (el == NULL) {
429                 return false;
430         }
431
432         /* TODO: this is too expensive! At least use a binary search */
433         for (i=0; i<el->num_values; i++) {
434                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
435                         return true;
436                 }
437         }
438         return false;
439 }
440
441 /*
442   in the following logic functions, the return value is treated as
443   follows:
444
445      LDB_SUCCESS: we found some matching index values
446
447      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
448
449      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
450                                we'll need a full search
451  */
452
453 /*
454   return a list of dn's that might match a simple indexed search (an
455   equality search only)
456  */
457 static int ltdb_index_dn_simple(struct ldb_module *module,
458                                 const struct ldb_parse_tree *tree,
459                                 const struct ldb_message *index_list,
460                                 struct dn_list *list)
461 {
462         struct ldb_context *ldb;
463         struct ldb_dn *dn;
464         int ret;
465
466         ldb = ldb_module_get_ctx(module);
467
468         list->count = 0;
469         list->dn = NULL;
470
471         /* if the attribute isn't in the list of indexed attributes then
472            this node needs a full search */
473         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
474                 return LDB_ERR_OPERATIONS_ERROR;
475         }
476
477         /* the attribute is indexed. Pull the list of DNs that match the 
478            search criterion */
479         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
480         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
481
482         ret = ltdb_dn_list_load(module, dn, list);
483         talloc_free(dn);
484         return ret;
485 }
486
487
488 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
489
490 /*
491   return a list of dn's that might match a leaf indexed search
492  */
493 static int ltdb_index_dn_leaf(struct ldb_module *module,
494                               const struct ldb_parse_tree *tree,
495                               const struct ldb_message *index_list,
496                               struct dn_list *list)
497 {
498         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
499                 list->dn = talloc_array(list, struct ldb_val, 1);
500                 if (list->dn == NULL) {
501                         ldb_module_oom(module);
502                         return LDB_ERR_OPERATIONS_ERROR;
503                 }
504                 list->dn[0] = tree->u.equality.value;
505                 list->count = 1;
506                 return LDB_SUCCESS;
507         }
508         return ltdb_index_dn_simple(module, tree, index_list, list);
509 }
510
511
512 /*
513   list intersection
514   list = list & list2
515 */
516 static bool list_intersect(struct ldb_context *ldb,
517                            struct dn_list *list, const struct dn_list *list2)
518 {
519         struct dn_list *list3;
520         unsigned int i;
521
522         if (list->count == 0) {
523                 /* 0 & X == 0 */
524                 return true;
525         }
526         if (list2->count == 0) {
527                 /* X & 0 == 0 */
528                 list->count = 0;
529                 list->dn = NULL;
530                 return true;
531         }
532
533         /* the indexing code is allowed to return a longer list than
534            what really matches, as all results are filtered by the
535            full expression at the end - this shortcut avoids a lot of
536            work in some cases */
537         if (list->count < 2 && list2->count > 10) {
538                 return true;
539         }
540         if (list2->count < 2 && list->count > 10) {
541                 list->count = list2->count;
542                 list->dn = list2->dn;
543                 /* note that list2 may not be the parent of list2->dn,
544                    as list2->dn may be owned by ltdb->idxptr. In that
545                    case we expect this reparent call to fail, which is
546                    OK */
547                 talloc_reparent(list2, list, list2->dn);
548                 return true;
549         }
550
551         list3 = talloc_zero(list, struct dn_list);
552         if (list3 == NULL) {
553                 return false;
554         }
555
556         list3->dn = talloc_array(list3, struct ldb_val, list->count);
557         if (!list3->dn) {
558                 talloc_free(list3);
559                 return false;
560         }
561         list3->count = 0;
562
563         for (i=0;i<list->count;i++) {
564                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
565                         list3->dn[list3->count] = list->dn[i];
566                         list3->count++;
567                 }
568         }
569
570         list->dn = talloc_steal(list, list3->dn);
571         list->count = list3->count;
572         talloc_free(list3);
573
574         return true;
575 }
576
577
578 /*
579   list union
580   list = list | list2
581 */
582 static bool list_union(struct ldb_context *ldb,
583                        struct dn_list *list, const struct dn_list *list2)
584 {
585         struct ldb_val *dn3;
586
587         if (list2->count == 0) {
588                 /* X | 0 == X */
589                 return true;
590         }
591
592         if (list->count == 0) {
593                 /* 0 | X == X */
594                 list->count = list2->count;
595                 list->dn = list2->dn;
596                 /* note that list2 may not be the parent of list2->dn,
597                    as list2->dn may be owned by ltdb->idxptr. In that
598                    case we expect this reparent call to fail, which is
599                    OK */
600                 talloc_reparent(list2, list, list2->dn);
601                 return true;
602         }
603
604         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
605         if (!dn3) {
606                 ldb_oom(ldb);
607                 return false;
608         }
609
610         /* we allow for duplicates here, and get rid of them later */
611         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
612         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
613
614         list->dn = dn3;
615         list->count += list2->count;
616
617         return true;
618 }
619
620 static int ltdb_index_dn(struct ldb_module *module,
621                          const struct ldb_parse_tree *tree,
622                          const struct ldb_message *index_list,
623                          struct dn_list *list);
624
625
626 /*
627   process an OR list (a union)
628  */
629 static int ltdb_index_dn_or(struct ldb_module *module,
630                             const struct ldb_parse_tree *tree,
631                             const struct ldb_message *index_list,
632                             struct dn_list *list)
633 {
634         struct ldb_context *ldb;
635         unsigned int i;
636
637         ldb = ldb_module_get_ctx(module);
638
639         list->dn = NULL;
640         list->count = 0;
641
642         for (i=0; i<tree->u.list.num_elements; i++) {
643                 struct dn_list *list2;
644                 int ret;
645
646                 list2 = talloc_zero(list, struct dn_list);
647                 if (list2 == NULL) {
648                         return LDB_ERR_OPERATIONS_ERROR;
649                 }
650
651                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
652
653                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
654                         /* X || 0 == X */
655                         talloc_free(list2);
656                         continue;
657                 }
658
659                 if (ret != LDB_SUCCESS) {
660                         /* X || * == * */
661                         talloc_free(list2);
662                         return ret;
663                 }
664
665                 if (!list_union(ldb, list, list2)) {
666                         talloc_free(list2);
667                         return LDB_ERR_OPERATIONS_ERROR;
668                 }
669         }
670
671         if (list->count == 0) {
672                 return LDB_ERR_NO_SUCH_OBJECT;
673         }
674
675         return LDB_SUCCESS;
676 }
677
678
679 /*
680   NOT an index results
681  */
682 static int ltdb_index_dn_not(struct ldb_module *module,
683                              const struct ldb_parse_tree *tree,
684                              const struct ldb_message *index_list,
685                              struct dn_list *list)
686 {
687         /* the only way to do an indexed not would be if we could
688            negate the not via another not or if we knew the total
689            number of database elements so we could know that the
690            existing expression covered the whole database.
691
692            instead, we just give up, and rely on a full index scan
693            (unless an outer & manages to reduce the list)
694         */
695         return LDB_ERR_OPERATIONS_ERROR;
696 }
697
698
699 static bool ltdb_index_unique(struct ldb_context *ldb,
700                               const char *attr)
701 {
702         const struct ldb_schema_attribute *a;
703         a = ldb_schema_attribute_by_name(ldb, attr);
704         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
705                 return true;
706         }
707         return false;
708 }
709
710 /*
711   process an AND expression (intersection)
712  */
713 static int ltdb_index_dn_and(struct ldb_module *module,
714                              const struct ldb_parse_tree *tree,
715                              const struct ldb_message *index_list,
716                              struct dn_list *list)
717 {
718         struct ldb_context *ldb;
719         unsigned int i;
720         bool found;
721
722         ldb = ldb_module_get_ctx(module);
723
724         list->dn = NULL;
725         list->count = 0;
726
727         /* in the first pass we only look for unique simple
728            equality tests, in the hope of avoiding having to look
729            at any others */
730         for (i=0; i<tree->u.list.num_elements; i++) {
731                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
732                 int ret;
733
734                 if (subtree->operation != LDB_OP_EQUALITY ||
735                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
736                         continue;
737                 }
738                 
739                 ret = ltdb_index_dn(module, subtree, index_list, list);
740                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
741                         /* 0 && X == 0 */
742                         return LDB_ERR_NO_SUCH_OBJECT;
743                 }
744                 if (ret == LDB_SUCCESS) {
745                         /* a unique index match means we can
746                          * stop. Note that we don't care if we return
747                          * a few too many objects, due to later
748                          * filtering */
749                         return LDB_SUCCESS;
750                 }
751         }       
752
753         /* now do a full intersection */
754         found = false;
755
756         for (i=0; i<tree->u.list.num_elements; i++) {
757                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
758                 struct dn_list *list2;
759                 int ret;
760
761                 list2 = talloc_zero(list, struct dn_list);
762                 if (list2 == NULL) {
763                         ldb_module_oom(module);
764                         return LDB_ERR_OPERATIONS_ERROR;
765                 }
766                         
767                 ret = ltdb_index_dn(module, subtree, index_list, list2);
768
769                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
770                         /* X && 0 == 0 */
771                         list->dn = NULL;
772                         list->count = 0;
773                         talloc_free(list2);
774                         return LDB_ERR_NO_SUCH_OBJECT;
775                 }
776                 
777                 if (ret != LDB_SUCCESS) {
778                         /* this didn't adding anything */
779                         talloc_free(list2);
780                         continue;
781                 }
782
783                 if (!found) {
784                         talloc_reparent(list2, list, list->dn);
785                         list->dn = list2->dn;
786                         list->count = list2->count;
787                         found = true;
788                 } else if (!list_intersect(ldb, list, list2)) {
789                         talloc_free(list2);
790                         return LDB_ERR_OPERATIONS_ERROR;
791                 }
792                         
793                 if (list->count == 0) {
794                         list->dn = NULL;
795                         return LDB_ERR_NO_SUCH_OBJECT;
796                 }
797                         
798                 if (list->count < 2) {
799                         /* it isn't worth loading the next part of the tree */
800                         return LDB_SUCCESS;
801                 }
802         }       
803
804         if (!found) {
805                 /* none of the attributes were indexed */
806                 return LDB_ERR_OPERATIONS_ERROR;
807         }
808
809         return LDB_SUCCESS;
810 }
811         
812 /*
813   return a list of matching objects using a one-level index
814  */
815 static int ltdb_index_dn_one(struct ldb_module *module,
816                              struct ldb_dn *parent_dn,
817                              struct dn_list *list)
818 {
819         struct ldb_context *ldb;
820         struct ldb_dn *key;
821         struct ldb_val val;
822         int ret;
823
824         ldb = ldb_module_get_ctx(module);
825
826         /* work out the index key from the parent DN */
827         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
828         val.length = strlen((char *)val.data);
829         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
830         if (!key) {
831                 ldb_oom(ldb);
832                 return LDB_ERR_OPERATIONS_ERROR;
833         }
834
835         ret = ltdb_dn_list_load(module, key, list);
836         talloc_free(key);
837         if (ret != LDB_SUCCESS) {
838                 return ret;
839         }
840
841         if (list->count == 0) {
842                 return LDB_ERR_NO_SUCH_OBJECT;
843         }
844
845         return LDB_SUCCESS;
846 }
847
848 /*
849   return a list of dn's that might match a indexed search or
850   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
851  */
852 static int ltdb_index_dn(struct ldb_module *module,
853                          const struct ldb_parse_tree *tree,
854                          const struct ldb_message *index_list,
855                          struct dn_list *list)
856 {
857         int ret = LDB_ERR_OPERATIONS_ERROR;
858
859         switch (tree->operation) {
860         case LDB_OP_AND:
861                 ret = ltdb_index_dn_and(module, tree, index_list, list);
862                 break;
863
864         case LDB_OP_OR:
865                 ret = ltdb_index_dn_or(module, tree, index_list, list);
866                 break;
867
868         case LDB_OP_NOT:
869                 ret = ltdb_index_dn_not(module, tree, index_list, list);
870                 break;
871
872         case LDB_OP_EQUALITY:
873                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
874                 break;
875
876         case LDB_OP_SUBSTRING:
877         case LDB_OP_GREATER:
878         case LDB_OP_LESS:
879         case LDB_OP_PRESENT:
880         case LDB_OP_APPROX:
881         case LDB_OP_EXTENDED:
882                 /* we can't index with fancy bitops yet */
883                 ret = LDB_ERR_OPERATIONS_ERROR;
884                 break;
885         }
886
887         return ret;
888 }
889
890 /*
891   filter a candidate dn_list from an indexed search into a set of results
892   extracting just the given attributes
893 */
894 static int ltdb_index_filter(const struct dn_list *dn_list,
895                              struct ltdb_context *ac, 
896                              uint32_t *match_count)
897 {
898         struct ldb_context *ldb;
899         struct ldb_message *msg;
900         unsigned int i;
901
902         ldb = ldb_module_get_ctx(ac->module);
903
904         for (i = 0; i < dn_list->count; i++) {
905                 struct ldb_dn *dn;
906                 int ret;
907
908                 msg = ldb_msg_new(ac);
909                 if (!msg) {
910                         return LDB_ERR_OPERATIONS_ERROR;
911                 }
912
913                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
914                 if (dn == NULL) {
915                         talloc_free(msg);
916                         return LDB_ERR_OPERATIONS_ERROR;
917                 }
918
919                 ret = ltdb_search_dn1(ac->module, dn, msg);
920                 talloc_free(dn);
921                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
922                         /* the record has disappeared? yes, this can happen */
923                         talloc_free(msg);
924                         continue;
925                 }
926
927                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
928                         /* an internal error */
929                         talloc_free(msg);
930                         return LDB_ERR_OPERATIONS_ERROR;
931                 }
932
933                 if (!ldb_match_msg(ldb, msg,
934                                    ac->tree, ac->base, ac->scope)) {
935                         talloc_free(msg);
936                         continue;
937                 }
938
939                 /* filter the attributes that the user wants */
940                 ret = ltdb_filter_attrs(msg, ac->attrs);
941
942                 if (ret == -1) {
943                         talloc_free(msg);
944                         return LDB_ERR_OPERATIONS_ERROR;
945                 }
946
947                 ret = ldb_module_send_entry(ac->req, msg, NULL);
948                 if (ret != LDB_SUCCESS) {
949                         ac->request_terminated = true;
950                         return ret;
951                 }
952
953                 (*match_count)++;
954         }
955
956         return LDB_SUCCESS;
957 }
958
959 /*
960   remove any duplicated entries in a indexed result
961  */
962 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
963 {
964         int i, new_count;
965
966         if (list->count < 2) {
967                 return;
968         }
969
970         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
971
972         new_count = 1;
973         for (i=1; i<list->count; i++) {
974                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
975                         if (new_count != i) {
976                                 list->dn[new_count] = list->dn[i];
977                         }
978                         new_count++;
979                 }
980         }
981         
982         list->count = new_count;
983 }
984
985 /*
986   search the database with a LDAP-like expression using indexes
987   returns -1 if an indexed search is not possible, in which
988   case the caller should call ltdb_search_full()
989 */
990 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
991 {
992         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
993         struct dn_list *dn_list;
994         int ret;
995
996         /* see if indexing is enabled */
997         if (!ltdb->cache->attribute_indexes && 
998             !ltdb->cache->one_level_indexes &&
999             ac->scope != LDB_SCOPE_BASE) {
1000                 /* fallback to a full search */
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         dn_list = talloc_zero(ac, struct dn_list);
1005         if (dn_list == NULL) {
1006                 ldb_module_oom(ac->module);
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         switch (ac->scope) {
1011         case LDB_SCOPE_BASE:
1012                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1013                 if (dn_list->dn == NULL) {
1014                         ldb_module_oom(ac->module);
1015                         talloc_free(dn_list);
1016                         return LDB_ERR_OPERATIONS_ERROR;
1017                 }
1018                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1019                 if (dn_list->dn[0].data == NULL) {
1020                         ldb_module_oom(ac->module);
1021                         talloc_free(dn_list);
1022                         return LDB_ERR_OPERATIONS_ERROR;
1023                 }
1024                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1025                 dn_list->count = 1;
1026                 break;          
1027
1028         case LDB_SCOPE_ONELEVEL:
1029                 if (!ltdb->cache->one_level_indexes) {
1030                         talloc_free(dn_list);
1031                         return LDB_ERR_OPERATIONS_ERROR;
1032                 }
1033                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1034                 if (ret != LDB_SUCCESS) {
1035                         talloc_free(dn_list);
1036                         return ret;
1037                 }
1038                 break;
1039
1040         case LDB_SCOPE_SUBTREE:
1041         case LDB_SCOPE_DEFAULT:
1042                 if (!ltdb->cache->attribute_indexes) {
1043                         talloc_free(dn_list);
1044                         return LDB_ERR_OPERATIONS_ERROR;
1045                 }
1046                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1047                 if (ret != LDB_SUCCESS) {
1048                         talloc_free(dn_list);
1049                         return ret;
1050                 }
1051                 ltdb_dn_list_remove_duplicates(dn_list);
1052                 break;
1053         }
1054
1055         ret = ltdb_index_filter(dn_list, ac, match_count);
1056         talloc_free(dn_list);
1057         return ret;
1058 }
1059
1060 /*
1061   add an index entry for one message element
1062 */
1063 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1064                            struct ldb_message_element *el, int v_idx)
1065 {
1066         struct ldb_context *ldb;
1067         struct ldb_dn *dn_key;
1068         int ret;
1069         const struct ldb_schema_attribute *a;
1070         struct dn_list *list;
1071         unsigned alloc_len;
1072
1073         ldb = ldb_module_get_ctx(module);
1074
1075         list = talloc_zero(module, struct dn_list);
1076         if (list == NULL) {
1077                 return LDB_ERR_OPERATIONS_ERROR;
1078         }
1079
1080         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1081         if (!dn_key) {
1082                 talloc_free(list);
1083                 return LDB_ERR_OPERATIONS_ERROR;
1084         }
1085         talloc_steal(list, dn_key);
1086
1087         ret = ltdb_dn_list_load(module, dn_key, list);
1088         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1089                 talloc_free(list);
1090                 return ret;
1091         }
1092
1093         if (ltdb_dn_list_find_str(list, dn) != -1) {
1094                 talloc_free(list);
1095                 return LDB_SUCCESS;
1096         }
1097
1098         if (list->count > 0 &&
1099             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1100                 talloc_free(list);
1101                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1102         }
1103
1104         /* overallocate the list a bit, to reduce the number of
1105          * realloc trigered copies */    
1106         alloc_len = ((list->count+1)+7) & ~7;
1107         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1108         if (list->dn == NULL) {
1109                 talloc_free(list);
1110                 return LDB_ERR_OPERATIONS_ERROR;
1111         }
1112         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1113         list->dn[list->count].length = strlen(dn);
1114         list->count++;
1115
1116         ret = ltdb_dn_list_store(module, dn_key, list);
1117
1118         talloc_free(list);
1119
1120         return ret;
1121 }
1122
1123 /*
1124   add index entries for one elements in a message
1125  */
1126 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1127                              struct ldb_message_element *el)
1128 {
1129         unsigned int i;
1130         for (i = 0; i < el->num_values; i++) {
1131                 int ret = ltdb_index_add1(module, dn, el, i);
1132                 if (ret != LDB_SUCCESS) {
1133                         return ret;
1134                 }
1135         }
1136
1137         return LDB_SUCCESS;
1138 }
1139
1140 /*
1141   add index entries for all elements in a message
1142  */
1143 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1144                               struct ldb_message_element *elements, int num_el)
1145 {
1146         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1147         unsigned int i;
1148
1149         if (dn[0] == '@') {
1150                 return LDB_SUCCESS;
1151         }
1152
1153         if (ltdb->cache->indexlist->num_elements == 0) {
1154                 /* no indexed fields */
1155                 return LDB_SUCCESS;
1156         }
1157
1158         for (i = 0; i < num_el; i++) {
1159                 int ret;
1160                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1161                         continue;
1162                 }
1163                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1164                 if (ret != LDB_SUCCESS) {
1165                         return ret;
1166                 }
1167         }
1168
1169         return LDB_SUCCESS;
1170 }
1171
1172
1173 /*
1174   insert a one level index for a message
1175 */
1176 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1177 {
1178         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1179         struct ldb_message_element el;
1180         struct ldb_val val;
1181         struct ldb_dn *pdn;
1182         const char *dn;
1183         int ret;
1184
1185         /* We index for ONE Level only if requested */
1186         if (!ltdb->cache->one_level_indexes) {
1187                 return LDB_SUCCESS;
1188         }
1189
1190         pdn = ldb_dn_get_parent(module, msg->dn);
1191         if (pdn == NULL) {
1192                 return LDB_ERR_OPERATIONS_ERROR;
1193         }
1194
1195         dn = ldb_dn_get_linearized(msg->dn);
1196         if (dn == NULL) {
1197                 talloc_free(pdn);
1198                 return LDB_ERR_OPERATIONS_ERROR;
1199         }
1200
1201         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1202         if (val.data == NULL) {
1203                 talloc_free(pdn);
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206
1207         val.length = strlen((char *)val.data);
1208         el.name = LTDB_IDXONE;
1209         el.values = &val;
1210         el.num_values = 1;
1211
1212         if (add) {
1213                 ret = ltdb_index_add1(module, dn, &el, 0);
1214         } else { /* delete */
1215                 ret = ltdb_index_del_value(module, dn, &el, 0);
1216         }
1217
1218         talloc_free(pdn);
1219
1220         return ret;
1221 }
1222
1223 /*
1224   add the index entries for a new element in a record
1225   The caller guarantees that these element values are not yet indexed
1226 */
1227 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1228                            struct ldb_message_element *el)
1229 {
1230         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1231         if (ldb_dn_is_special(dn)) {
1232                 return LDB_SUCCESS;
1233         }
1234         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1235                 return LDB_SUCCESS;
1236         }
1237         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1238 }
1239
1240 /*
1241   add the index entries for a new record
1242 */
1243 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1244 {
1245         const char *dn;
1246         int ret;
1247
1248         if (ldb_dn_is_special(msg->dn)) {
1249                 return LDB_SUCCESS;
1250         }
1251
1252         dn = ldb_dn_get_linearized(msg->dn);
1253         if (dn == NULL) {
1254                 return LDB_ERR_OPERATIONS_ERROR;
1255         }
1256
1257         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1258         if (ret != LDB_SUCCESS) {
1259                 return ret;
1260         }
1261
1262         return ltdb_index_onelevel(module, msg, 1);
1263 }
1264
1265
1266 /*
1267   delete an index entry for one message element
1268 */
1269 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1270                          struct ldb_message_element *el, int v_idx)
1271 {
1272         struct ldb_context *ldb;
1273         struct ldb_dn *dn_key;
1274         int ret, i;
1275         struct dn_list *list;
1276
1277         ldb = ldb_module_get_ctx(module);
1278
1279         if (dn[0] == '@') {
1280                 return LDB_SUCCESS;
1281         }
1282
1283         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1284         if (!dn_key) {
1285                 return LDB_ERR_OPERATIONS_ERROR;
1286         }
1287
1288         list = talloc_zero(dn_key, struct dn_list);
1289         if (list == NULL) {
1290                 talloc_free(dn_key);
1291                 return LDB_ERR_OPERATIONS_ERROR;
1292         }
1293
1294         ret = ltdb_dn_list_load(module, dn_key, list);
1295         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1296                 /* it wasn't indexed. Did we have an earlier error? If we did then
1297                    its gone now */
1298                 talloc_free(dn_key);
1299                 return LDB_SUCCESS;
1300         }
1301
1302         if (ret != LDB_SUCCESS) {
1303                 talloc_free(dn_key);
1304                 return ret;
1305         }
1306
1307         i = ltdb_dn_list_find_str(list, dn);
1308         if (i == -1) {
1309                 /* nothing to delete */
1310                 talloc_free(dn_key);
1311                 return LDB_SUCCESS;             
1312         }
1313
1314         if (i != list->count-1) {
1315                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1316         }
1317         list->count--;
1318         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1319
1320         ret = ltdb_dn_list_store(module, dn_key, list);
1321
1322         talloc_free(dn_key);
1323
1324         return ret;
1325 }
1326
1327 /*
1328   delete the index entries for a element
1329   return -1 on failure
1330 */
1331 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1332 {
1333         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1334         int ret;
1335         unsigned int i;
1336
1337         if (!ltdb->cache->attribute_indexes) {
1338                 /* no indexed fields */
1339                 return LDB_SUCCESS;
1340         }
1341
1342         if (dn[0] == '@') {
1343                 return LDB_SUCCESS;
1344         }
1345
1346         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1347                 return LDB_SUCCESS;
1348         }
1349         for (i = 0; i < el->num_values; i++) {
1350                 ret = ltdb_index_del_value(module, dn, el, i);
1351                 if (ret != LDB_SUCCESS) {
1352                         return ret;
1353                 }
1354         }
1355
1356         return LDB_SUCCESS;
1357 }
1358
1359 /*
1360   delete the index entries for a record
1361   return -1 on failure
1362 */
1363 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1364 {
1365         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1366         int ret;
1367         const char *dn;
1368         unsigned int i;
1369
1370         if (ldb_dn_is_special(msg->dn)) {
1371                 return LDB_SUCCESS;
1372         }
1373
1374         ret = ltdb_index_onelevel(module, msg, 0);
1375         if (ret != LDB_SUCCESS) {
1376                 return ret;
1377         }
1378
1379         if (!ltdb->cache->attribute_indexes) {
1380                 /* no indexed fields */
1381                 return LDB_SUCCESS;
1382         }
1383
1384         dn = ldb_dn_get_linearized(msg->dn);
1385         if (dn == NULL) {
1386                 return LDB_ERR_OPERATIONS_ERROR;
1387         }
1388
1389         for (i = 0; i < msg->num_elements; i++) {
1390                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1391                 if (ret != LDB_SUCCESS) {
1392                         return ret;
1393                 }
1394         }
1395
1396         return LDB_SUCCESS;
1397 }
1398
1399
1400 /*
1401   traversal function that deletes all @INDEX records
1402 */
1403 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1404 {
1405         const char *dn = "DN=" LTDB_INDEX ":";
1406         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1407                 return tdb_delete(tdb, key);
1408         }
1409         return 0;
1410 }
1411
1412 /*
1413   traversal function that adds @INDEX records during a re index
1414 */
1415 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1416 {
1417         struct ldb_context *ldb;
1418         struct ldb_module *module = (struct ldb_module *)state;
1419         struct ldb_message *msg;
1420         const char *dn = NULL;
1421         int ret;
1422         TDB_DATA key2;
1423
1424         ldb = ldb_module_get_ctx(module);
1425
1426         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1427             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1428                 return 0;
1429         }
1430
1431         msg = ldb_msg_new(module);
1432         if (msg == NULL) {
1433                 return -1;
1434         }
1435
1436         ret = ltdb_unpack_data(module, &data, msg);
1437         if (ret != 0) {
1438                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1439                           ldb_dn_get_linearized(msg->dn));
1440                 talloc_free(msg);
1441                 return -1;
1442         }
1443
1444         /* check if the DN key has changed, perhaps due to the
1445            case insensitivity of an element changing */
1446         key2 = ltdb_key(module, msg->dn);
1447         if (key2.dptr == NULL) {
1448                 /* probably a corrupt record ... darn */
1449                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1450                                                         ldb_dn_get_linearized(msg->dn));
1451                 talloc_free(msg);
1452                 return 0;
1453         }
1454         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1455                 tdb_delete(tdb, key);
1456                 tdb_store(tdb, key2, data, 0);
1457         }
1458         talloc_free(key2.dptr);
1459
1460         if (msg->dn == NULL) {
1461                 dn = (char *)key.dptr + 3;
1462         } else {
1463                 dn = ldb_dn_get_linearized(msg->dn);
1464         }
1465
1466         ret = ltdb_index_onelevel(module, msg, 1);
1467         if (ret != LDB_SUCCESS) {
1468                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1469                         "Adding special ONE LEVEL index failed (%s)!",
1470                         ldb_dn_get_linearized(msg->dn));
1471                 talloc_free(msg);
1472                 return -1;
1473         }
1474
1475         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1476
1477         talloc_free(msg);
1478
1479         if (ret != LDB_SUCCESS) return -1;
1480
1481         return 0;
1482 }
1483
1484 /*
1485   force a complete reindex of the database
1486 */
1487 int ltdb_reindex(struct ldb_module *module)
1488 {
1489         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1490         int ret;
1491
1492         if (ltdb_cache_reload(module) != 0) {
1493                 return LDB_ERR_OPERATIONS_ERROR;
1494         }
1495
1496         /* first traverse the database deleting any @INDEX records */
1497         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1498         if (ret == -1) {
1499                 return LDB_ERR_OPERATIONS_ERROR;
1500         }
1501
1502         /* if we don't have indexes we have nothing todo */
1503         if (ltdb->cache->indexlist->num_elements == 0) {
1504                 return LDB_SUCCESS;
1505         }
1506
1507         /* now traverse adding any indexes for normal LDB records */
1508         ret = tdb_traverse(ltdb->tdb, re_index, module);
1509         if (ret == -1) {
1510                 return LDB_ERR_OPERATIONS_ERROR;
1511         }
1512
1513         if (ltdb->idxptr) {
1514                 ltdb->idxptr->repack = true;
1515         }
1516
1517         return LDB_SUCCESS;
1518 }