s4-ldb: fixed re-index during a complex transaction
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
64         if (ret != 0) return ret;
65         if (v2->length > v1->length && v2->data[v1->length] != 0) {
66                 return 1;
67         }
68         return 0;
69 }
70
71
72 /*
73   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
74   comparison with the dn returns -1 if not found
75  */
76 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
77 {
78         int i;
79         for (i=0; i<list->count; i++) {
80                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
81         }
82         return -1;
83 }
84
85 /*
86   find a entry in a dn_list. Uses a case sensitive comparison with the dn
87   returns -1 if not found
88  */
89 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
90 {
91         struct ldb_val v;
92         v.data = discard_const_p(unsigned char, dn);
93         v.length = strlen(dn);
94         return ltdb_dn_list_find_val(list, &v);
95 }
96
97 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
98 {
99         struct dn_list *list;
100         if (rec.dsize != sizeof(void *)) {
101                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
102                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
103                 return NULL;
104         }
105         
106         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
107         if (list == NULL) {
108                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
109                                        "Bad type '%s' for idxptr", 
110                                        talloc_get_name(*(struct dn_list **)rec.dptr));
111                 return NULL;
112         }
113         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
114                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
115                                        "Bad parent '%s' for idxptr", 
116                                        talloc_get_name(talloc_parent(list->dn)));
117                 return NULL;
118         }
119         return list;
120 }
121
122 /*
123   return the @IDX list in an index entry for a dn as a 
124   struct dn_list
125  */
126 static int ltdb_dn_list_load(struct ldb_module *module,
127                              struct ldb_dn *dn, struct dn_list *list)
128 {
129         struct ldb_message *msg;
130         int ret;
131         struct ldb_message_element *el;
132         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
133         TDB_DATA rec;
134         struct dn_list *list2;
135         TDB_DATA key;
136
137         list->dn = NULL;
138         list->count = 0;
139
140         /* see if we have any in-memory index entries */
141         if (ltdb->idxptr == NULL ||
142             ltdb->idxptr->itdb == NULL) {
143                 goto normal_index;
144         }
145
146         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
147         key.dsize = strlen((char *)key.dptr);
148
149         rec = tdb_fetch(ltdb->idxptr->itdb, key);
150         if (rec.dptr == NULL) {
151                 goto normal_index;
152         }
153
154         /* we've found an in-memory index entry */
155         list2 = ltdb_index_idxptr(module, rec, true);
156         if (list2 == NULL) {
157                 free(rec.dptr);
158                 return LDB_ERR_OPERATIONS_ERROR;
159         }
160         free(rec.dptr);
161
162         *list = *list2;
163         return LDB_SUCCESS;
164
165 normal_index:
166         msg = ldb_msg_new(list);
167         if (msg == NULL) {
168                 return LDB_ERR_OPERATIONS_ERROR;
169         }
170
171         ret = ltdb_search_dn1(module, dn, msg);
172         if (ret != LDB_SUCCESS) {
173                 return ret;
174         }
175
176         /* TODO: check indexing version number */
177
178         el = ldb_msg_find_element(msg, LTDB_IDX);
179         if (!el) {
180                 talloc_free(msg);
181                 return LDB_SUCCESS;
182         }
183
184         /* we avoid copying the strings by stealing the list */
185         list->dn = talloc_steal(list, el->values);
186         list->count = el->num_values;
187
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193   save a dn_list into a full @IDX style record
194  */
195 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
196                                    struct dn_list *list)
197 {
198         struct ldb_message *msg;
199         int ret;
200
201         if (list->count == 0) {
202                 ret = ltdb_delete_noindex(module, dn);
203                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
204                         return LDB_SUCCESS;
205                 }
206                 return ret;
207         }
208
209         msg = ldb_msg_new(module);
210         if (!msg) {
211                 ldb_module_oom(module);
212                 return LDB_ERR_OPERATIONS_ERROR;
213         }
214
215         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
216         if (ret != LDB_SUCCESS) {
217                 talloc_free(msg);
218                 ldb_module_oom(module);
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         msg->dn = dn;
223         if (list->count > 0) {
224                 struct ldb_message_element *el;
225
226                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
227                 if (ret != LDB_SUCCESS) {
228                         ldb_module_oom(module);
229                         talloc_free(msg);
230                         return LDB_ERR_OPERATIONS_ERROR;
231                 }
232                 el->values = list->dn;
233                 el->num_values = list->count;
234         }
235
236         ret = ltdb_store(module, msg, TDB_REPLACE);
237         talloc_free(msg);
238         return ret;
239 }
240
241 /*
242   save a dn_list into the database, in either @IDX or internal format
243  */
244 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
245                               struct dn_list *list)
246 {
247         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
248         TDB_DATA rec, key;
249         int ret;
250         struct dn_list *list2;
251
252         if (ltdb->idxptr == NULL) {
253                 return ltdb_dn_list_store_full(module, dn, list);
254         }
255
256         if (ltdb->idxptr->itdb == NULL) {
257                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
258                 if (ltdb->idxptr->itdb == NULL) {
259                         return LDB_ERR_OPERATIONS_ERROR;
260                 }
261         }
262
263         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
264         key.dsize = strlen((char *)key.dptr);
265
266         rec = tdb_fetch(ltdb->idxptr->itdb, key);
267         if (rec.dptr != NULL) {
268                 list2 = ltdb_index_idxptr(module, rec, false);
269                 if (list2 == NULL) {
270                         free(rec.dptr);
271                         return LDB_ERR_OPERATIONS_ERROR;
272                 }
273                 free(rec.dptr);
274                 list2->dn = talloc_steal(list2, list->dn);
275                 list2->count = list->count;
276                 return LDB_SUCCESS;
277         }
278
279         list2 = talloc(ltdb->idxptr, struct dn_list);
280         if (list2 == NULL) {
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283         list2->dn = talloc_steal(list2, list->dn);
284         list2->count = list->count;
285
286         rec.dptr = (uint8_t *)&list2;
287         rec.dsize = sizeof(void *);
288
289         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
290         if (ret == -1) {
291                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
292         }
293         return LDB_SUCCESS;
294 }
295
296 /*
297   traverse function for storing the in-memory index entries on disk
298  */
299 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
300 {
301         struct ldb_module *module = state;
302         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303         struct ldb_dn *dn;
304         struct ldb_context *ldb = ldb_module_get_ctx(module);
305         struct ldb_val v;
306         struct dn_list *list;
307
308         list = ltdb_index_idxptr(module, data, true);
309         if (list == NULL) {
310                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
311                 return -1;
312         }
313
314         v.data = key.dptr;
315         v.length = key.dsize;
316
317         dn = ldb_dn_from_ldb_val(module, ldb, &v);
318         if (dn == NULL) {
319                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
320                 return -1;
321         }
322
323         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
324         talloc_free(dn);
325         if (ltdb->idxptr->error != 0) {
326                 return -1;
327         }
328         return 0;
329 }
330
331 /* cleanup the idxptr mode when transaction commits */
332 int ltdb_index_transaction_commit(struct ldb_module *module)
333 {
334         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
335         int ret;
336
337         if (ltdb->idxptr->itdb) {
338                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
339                 tdb_close(ltdb->idxptr->itdb);
340         }
341
342         ret = ltdb->idxptr->error;
343
344         if (ret != LDB_SUCCESS) {
345                 struct ldb_context *ldb = ldb_module_get_ctx(module);
346                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
347         }
348
349         talloc_free(ltdb->idxptr);
350         ltdb->idxptr = NULL;
351         return ret;
352 }
353
354 /* cleanup the idxptr mode when transaction cancels */
355 int ltdb_index_transaction_cancel(struct ldb_module *module)
356 {
357         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
358         if (ltdb->idxptr && ltdb->idxptr->itdb) {
359                 tdb_close(ltdb->idxptr->itdb);
360         }
361         talloc_free(ltdb->idxptr);
362         ltdb->idxptr = NULL;
363         return LDB_SUCCESS;
364 }
365
366
367 /*
368   return the dn key to be used for an index
369   the caller is responsible for freeing
370 */
371 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
372                                      const char *attr, const struct ldb_val *value,
373                                      const struct ldb_schema_attribute **ap)
374 {
375         struct ldb_dn *ret;
376         struct ldb_val v;
377         const struct ldb_schema_attribute *a;
378         char *attr_folded;
379         int r;
380
381         attr_folded = ldb_attr_casefold(ldb, attr);
382         if (!attr_folded) {
383                 return NULL;
384         }
385
386         a = ldb_schema_attribute_by_name(ldb, attr);
387         if (ap) {
388                 *ap = a;
389         }
390         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
391         if (r != LDB_SUCCESS) {
392                 const char *errstr = ldb_errstring(ldb);
393                 /* canonicalisation can be refused. For example, 
394                    a attribute that takes wildcards will refuse to canonicalise
395                    if the value contains a wildcard */
396                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
397                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
398                 talloc_free(attr_folded);
399                 return NULL;
400         }
401         if (ldb_should_b64_encode(ldb, &v)) {
402                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
403                 if (!vstr) return NULL;
404                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
405                 talloc_free(vstr);
406         } else {
407                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
408         }
409
410         if (v.data != value->data) {
411                 talloc_free(v.data);
412         }
413         talloc_free(attr_folded);
414
415         return ret;
416 }
417
418 /*
419   see if a attribute value is in the list of indexed attributes
420 */
421 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
422 {
423         unsigned int i;
424         struct ldb_message_element *el;
425
426         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
427         if (el == NULL) {
428                 return false;
429         }
430
431         /* TODO: this is too expensive! At least use a binary search */
432         for (i=0; i<el->num_values; i++) {
433                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
434                         return true;
435                 }
436         }
437         return false;
438 }
439
440 /*
441   in the following logic functions, the return value is treated as
442   follows:
443
444      LDB_SUCCESS: we found some matching index values
445
446      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
447
448      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
449                                we'll need a full search
450  */
451
452 /*
453   return a list of dn's that might match a simple indexed search (an
454   equality search only)
455  */
456 static int ltdb_index_dn_simple(struct ldb_module *module,
457                                 const struct ldb_parse_tree *tree,
458                                 const struct ldb_message *index_list,
459                                 struct dn_list *list)
460 {
461         struct ldb_context *ldb;
462         struct ldb_dn *dn;
463         int ret;
464
465         ldb = ldb_module_get_ctx(module);
466
467         list->count = 0;
468         list->dn = NULL;
469
470         /* if the attribute isn't in the list of indexed attributes then
471            this node needs a full search */
472         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
473                 return LDB_ERR_OPERATIONS_ERROR;
474         }
475
476         /* the attribute is indexed. Pull the list of DNs that match the 
477            search criterion */
478         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
479         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
480
481         ret = ltdb_dn_list_load(module, dn, list);
482         talloc_free(dn);
483         return ret;
484 }
485
486
487 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
488
489 /*
490   return a list of dn's that might match a leaf indexed search
491  */
492 static int ltdb_index_dn_leaf(struct ldb_module *module,
493                               const struct ldb_parse_tree *tree,
494                               const struct ldb_message *index_list,
495                               struct dn_list *list)
496 {
497         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
498                 list->dn = talloc_array(list, struct ldb_val, 1);
499                 if (list->dn == NULL) {
500                         ldb_module_oom(module);
501                         return LDB_ERR_OPERATIONS_ERROR;
502                 }
503                 list->dn[0] = tree->u.equality.value;
504                 list->count = 1;
505                 return LDB_SUCCESS;
506         }
507         return ltdb_index_dn_simple(module, tree, index_list, list);
508 }
509
510
511 /*
512   list intersection
513   list = list & list2
514 */
515 static bool list_intersect(struct ldb_context *ldb,
516                            struct dn_list *list, const struct dn_list *list2)
517 {
518         struct dn_list *list3;
519         unsigned int i;
520
521         if (list->count == 0) {
522                 /* 0 & X == 0 */
523                 return true;
524         }
525         if (list2->count == 0) {
526                 /* X & 0 == 0 */
527                 list->count = 0;
528                 list->dn = NULL;
529                 return true;
530         }
531
532         /* the indexing code is allowed to return a longer list than
533            what really matches, as all results are filtered by the
534            full expression at the end - this shortcut avoids a lot of
535            work in some cases */
536         if (list->count < 2 && list2->count > 10) {
537                 return true;
538         }
539         if (list2->count < 2 && list->count > 10) {
540                 list->count = list2->count;
541                 list->dn = list2->dn;
542                 /* note that list2 may not be the parent of list2->dn,
543                    as list2->dn may be owned by ltdb->idxptr. In that
544                    case we expect this reparent call to fail, which is
545                    OK */
546                 talloc_reparent(list2, list, list2->dn);
547                 return true;
548         }
549
550         list3 = talloc_zero(list, struct dn_list);
551         if (list3 == NULL) {
552                 return false;
553         }
554
555         list3->dn = talloc_array(list3, struct ldb_val, list->count);
556         if (!list3->dn) {
557                 talloc_free(list3);
558                 return false;
559         }
560         list3->count = 0;
561
562         for (i=0;i<list->count;i++) {
563                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
564                         list3->dn[list3->count] = list->dn[i];
565                         list3->count++;
566                 }
567         }
568
569         list->dn = talloc_steal(list, list3->dn);
570         list->count = list3->count;
571         talloc_free(list3);
572
573         return true;
574 }
575
576
577 /*
578   list union
579   list = list | list2
580 */
581 static bool list_union(struct ldb_context *ldb,
582                        struct dn_list *list, const struct dn_list *list2)
583 {
584         struct ldb_val *dn3;
585
586         if (list2->count == 0) {
587                 /* X | 0 == X */
588                 return true;
589         }
590
591         if (list->count == 0) {
592                 /* 0 | X == X */
593                 list->count = list2->count;
594                 list->dn = list2->dn;
595                 /* note that list2 may not be the parent of list2->dn,
596                    as list2->dn may be owned by ltdb->idxptr. In that
597                    case we expect this reparent call to fail, which is
598                    OK */
599                 talloc_reparent(list2, list, list2->dn);
600                 return true;
601         }
602
603         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
604         if (!dn3) {
605                 ldb_oom(ldb);
606                 return false;
607         }
608
609         /* we allow for duplicates here, and get rid of them later */
610         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
611         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
612
613         list->dn = dn3;
614         list->count += list2->count;
615
616         return true;
617 }
618
619 static int ltdb_index_dn(struct ldb_module *module,
620                          const struct ldb_parse_tree *tree,
621                          const struct ldb_message *index_list,
622                          struct dn_list *list);
623
624
625 /*
626   process an OR list (a union)
627  */
628 static int ltdb_index_dn_or(struct ldb_module *module,
629                             const struct ldb_parse_tree *tree,
630                             const struct ldb_message *index_list,
631                             struct dn_list *list)
632 {
633         struct ldb_context *ldb;
634         unsigned int i;
635
636         ldb = ldb_module_get_ctx(module);
637
638         list->dn = NULL;
639         list->count = 0;
640
641         for (i=0; i<tree->u.list.num_elements; i++) {
642                 struct dn_list *list2;
643                 int ret;
644
645                 list2 = talloc_zero(list, struct dn_list);
646                 if (list2 == NULL) {
647                         return LDB_ERR_OPERATIONS_ERROR;
648                 }
649
650                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
651
652                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
653                         /* X || 0 == X */
654                         talloc_free(list2);
655                         continue;
656                 }
657
658                 if (ret != LDB_SUCCESS) {
659                         /* X || * == * */
660                         talloc_free(list2);
661                         return ret;
662                 }
663
664                 if (!list_union(ldb, list, list2)) {
665                         talloc_free(list2);
666                         return LDB_ERR_OPERATIONS_ERROR;
667                 }
668         }
669
670         if (list->count == 0) {
671                 return LDB_ERR_NO_SUCH_OBJECT;
672         }
673
674         return LDB_SUCCESS;
675 }
676
677
678 /*
679   NOT an index results
680  */
681 static int ltdb_index_dn_not(struct ldb_module *module,
682                              const struct ldb_parse_tree *tree,
683                              const struct ldb_message *index_list,
684                              struct dn_list *list)
685 {
686         /* the only way to do an indexed not would be if we could
687            negate the not via another not or if we knew the total
688            number of database elements so we could know that the
689            existing expression covered the whole database.
690
691            instead, we just give up, and rely on a full index scan
692            (unless an outer & manages to reduce the list)
693         */
694         return LDB_ERR_OPERATIONS_ERROR;
695 }
696
697
698 static bool ltdb_index_unique(struct ldb_context *ldb,
699                               const char *attr)
700 {
701         const struct ldb_schema_attribute *a;
702         a = ldb_schema_attribute_by_name(ldb, attr);
703         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
704                 return true;
705         }
706         return false;
707 }
708
709 /*
710   process an AND expression (intersection)
711  */
712 static int ltdb_index_dn_and(struct ldb_module *module,
713                              const struct ldb_parse_tree *tree,
714                              const struct ldb_message *index_list,
715                              struct dn_list *list)
716 {
717         struct ldb_context *ldb;
718         unsigned int i;
719         bool found;
720
721         ldb = ldb_module_get_ctx(module);
722
723         list->dn = NULL;
724         list->count = 0;
725
726         /* in the first pass we only look for unique simple
727            equality tests, in the hope of avoiding having to look
728            at any others */
729         for (i=0; i<tree->u.list.num_elements; i++) {
730                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
731                 int ret;
732
733                 if (subtree->operation != LDB_OP_EQUALITY ||
734                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
735                         continue;
736                 }
737                 
738                 ret = ltdb_index_dn(module, subtree, index_list, list);
739                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
740                         /* 0 && X == 0 */
741                         return LDB_ERR_NO_SUCH_OBJECT;
742                 }
743                 if (ret == LDB_SUCCESS) {
744                         /* a unique index match means we can
745                          * stop. Note that we don't care if we return
746                          * a few too many objects, due to later
747                          * filtering */
748                         return LDB_SUCCESS;
749                 }
750         }       
751
752         /* now do a full intersection */
753         found = false;
754
755         for (i=0; i<tree->u.list.num_elements; i++) {
756                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
757                 struct dn_list *list2;
758                 int ret;
759
760                 list2 = talloc_zero(list, struct dn_list);
761                 if (list2 == NULL) {
762                         ldb_module_oom(module);
763                         return LDB_ERR_OPERATIONS_ERROR;
764                 }
765                         
766                 ret = ltdb_index_dn(module, subtree, index_list, list2);
767
768                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
769                         /* X && 0 == 0 */
770                         list->dn = NULL;
771                         list->count = 0;
772                         talloc_free(list2);
773                         return LDB_ERR_NO_SUCH_OBJECT;
774                 }
775                 
776                 if (ret != LDB_SUCCESS) {
777                         /* this didn't adding anything */
778                         talloc_free(list2);
779                         continue;
780                 }
781
782                 if (!found) {
783                         talloc_reparent(list2, list, list->dn);
784                         list->dn = list2->dn;
785                         list->count = list2->count;
786                         found = true;
787                 } else if (!list_intersect(ldb, list, list2)) {
788                         talloc_free(list2);
789                         return LDB_ERR_OPERATIONS_ERROR;
790                 }
791                         
792                 if (list->count == 0) {
793                         list->dn = NULL;
794                         return LDB_ERR_NO_SUCH_OBJECT;
795                 }
796                         
797                 if (list->count < 2) {
798                         /* it isn't worth loading the next part of the tree */
799                         return LDB_SUCCESS;
800                 }
801         }       
802
803         if (!found) {
804                 /* none of the attributes were indexed */
805                 return LDB_ERR_OPERATIONS_ERROR;
806         }
807
808         return LDB_SUCCESS;
809 }
810         
811 /*
812   return a list of matching objects using a one-level index
813  */
814 static int ltdb_index_dn_one(struct ldb_module *module,
815                              struct ldb_dn *parent_dn,
816                              struct dn_list *list)
817 {
818         struct ldb_context *ldb;
819         struct ldb_dn *key;
820         struct ldb_val val;
821         int ret;
822
823         ldb = ldb_module_get_ctx(module);
824
825         /* work out the index key from the parent DN */
826         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
827         val.length = strlen((char *)val.data);
828         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
829         if (!key) {
830                 ldb_oom(ldb);
831                 return LDB_ERR_OPERATIONS_ERROR;
832         }
833
834         ret = ltdb_dn_list_load(module, key, list);
835         talloc_free(key);
836         if (ret != LDB_SUCCESS) {
837                 return ret;
838         }
839
840         if (list->count == 0) {
841                 return LDB_ERR_NO_SUCH_OBJECT;
842         }
843
844         return LDB_SUCCESS;
845 }
846
847 /*
848   return a list of dn's that might match a indexed search or
849   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
850  */
851 static int ltdb_index_dn(struct ldb_module *module,
852                          const struct ldb_parse_tree *tree,
853                          const struct ldb_message *index_list,
854                          struct dn_list *list)
855 {
856         int ret = LDB_ERR_OPERATIONS_ERROR;
857
858         switch (tree->operation) {
859         case LDB_OP_AND:
860                 ret = ltdb_index_dn_and(module, tree, index_list, list);
861                 break;
862
863         case LDB_OP_OR:
864                 ret = ltdb_index_dn_or(module, tree, index_list, list);
865                 break;
866
867         case LDB_OP_NOT:
868                 ret = ltdb_index_dn_not(module, tree, index_list, list);
869                 break;
870
871         case LDB_OP_EQUALITY:
872                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
873                 break;
874
875         case LDB_OP_SUBSTRING:
876         case LDB_OP_GREATER:
877         case LDB_OP_LESS:
878         case LDB_OP_PRESENT:
879         case LDB_OP_APPROX:
880         case LDB_OP_EXTENDED:
881                 /* we can't index with fancy bitops yet */
882                 ret = LDB_ERR_OPERATIONS_ERROR;
883                 break;
884         }
885
886         return ret;
887 }
888
889 /*
890   filter a candidate dn_list from an indexed search into a set of results
891   extracting just the given attributes
892 */
893 static int ltdb_index_filter(const struct dn_list *dn_list,
894                              struct ltdb_context *ac, 
895                              uint32_t *match_count)
896 {
897         struct ldb_context *ldb;
898         struct ldb_message *msg;
899         unsigned int i;
900
901         ldb = ldb_module_get_ctx(ac->module);
902
903         for (i = 0; i < dn_list->count; i++) {
904                 struct ldb_dn *dn;
905                 int ret;
906
907                 msg = ldb_msg_new(ac);
908                 if (!msg) {
909                         return LDB_ERR_OPERATIONS_ERROR;
910                 }
911
912                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
913                 if (dn == NULL) {
914                         talloc_free(msg);
915                         return LDB_ERR_OPERATIONS_ERROR;
916                 }
917
918                 ret = ltdb_search_dn1(ac->module, dn, msg);
919                 talloc_free(dn);
920                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
921                         /* the record has disappeared? yes, this can happen */
922                         talloc_free(msg);
923                         continue;
924                 }
925
926                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
927                         /* an internal error */
928                         talloc_free(msg);
929                         return LDB_ERR_OPERATIONS_ERROR;
930                 }
931
932                 if (!ldb_match_msg(ldb, msg,
933                                    ac->tree, ac->base, ac->scope)) {
934                         talloc_free(msg);
935                         continue;
936                 }
937
938                 /* filter the attributes that the user wants */
939                 ret = ltdb_filter_attrs(msg, ac->attrs);
940
941                 if (ret == -1) {
942                         talloc_free(msg);
943                         return LDB_ERR_OPERATIONS_ERROR;
944                 }
945
946                 ret = ldb_module_send_entry(ac->req, msg, NULL);
947                 if (ret != LDB_SUCCESS) {
948                         ac->request_terminated = true;
949                         return ret;
950                 }
951
952                 (*match_count)++;
953         }
954
955         return LDB_SUCCESS;
956 }
957
958 /*
959   remove any duplicated entries in a indexed result
960  */
961 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
962 {
963         int i, new_count;
964
965         if (list->count < 2) {
966                 return;
967         }
968
969         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
970
971         new_count = 1;
972         for (i=1; i<list->count; i++) {
973                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
974                         if (new_count != i) {
975                                 list->dn[new_count] = list->dn[i];
976                         }
977                         new_count++;
978                 }
979         }
980         
981         list->count = new_count;
982 }
983
984 /*
985   search the database with a LDAP-like expression using indexes
986   returns -1 if an indexed search is not possible, in which
987   case the caller should call ltdb_search_full()
988 */
989 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
990 {
991         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
992         struct dn_list *dn_list;
993         int ret;
994
995         /* see if indexing is enabled */
996         if (!ltdb->cache->attribute_indexes && 
997             !ltdb->cache->one_level_indexes &&
998             ac->scope != LDB_SCOPE_BASE) {
999                 /* fallback to a full search */
1000                 return LDB_ERR_OPERATIONS_ERROR;
1001         }
1002
1003         dn_list = talloc_zero(ac, struct dn_list);
1004         if (dn_list == NULL) {
1005                 ldb_module_oom(ac->module);
1006                 return LDB_ERR_OPERATIONS_ERROR;
1007         }
1008
1009         switch (ac->scope) {
1010         case LDB_SCOPE_BASE:
1011                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1012                 if (dn_list->dn == NULL) {
1013                         ldb_module_oom(ac->module);
1014                         talloc_free(dn_list);
1015                         return LDB_ERR_OPERATIONS_ERROR;
1016                 }
1017                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1018                 if (dn_list->dn[0].data == NULL) {
1019                         ldb_module_oom(ac->module);
1020                         talloc_free(dn_list);
1021                         return LDB_ERR_OPERATIONS_ERROR;
1022                 }
1023                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1024                 dn_list->count = 1;
1025                 break;          
1026
1027         case LDB_SCOPE_ONELEVEL:
1028                 if (!ltdb->cache->one_level_indexes) {
1029                         talloc_free(dn_list);
1030                         return LDB_ERR_OPERATIONS_ERROR;
1031                 }
1032                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1033                 if (ret != LDB_SUCCESS) {
1034                         talloc_free(dn_list);
1035                         return ret;
1036                 }
1037                 break;
1038
1039         case LDB_SCOPE_SUBTREE:
1040         case LDB_SCOPE_DEFAULT:
1041                 if (!ltdb->cache->attribute_indexes) {
1042                         talloc_free(dn_list);
1043                         return LDB_ERR_OPERATIONS_ERROR;
1044                 }
1045                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1046                 if (ret != LDB_SUCCESS) {
1047                         talloc_free(dn_list);
1048                         return ret;
1049                 }
1050                 ltdb_dn_list_remove_duplicates(dn_list);
1051                 break;
1052         }
1053
1054         ret = ltdb_index_filter(dn_list, ac, match_count);
1055         talloc_free(dn_list);
1056         return ret;
1057 }
1058
1059 /*
1060   add an index entry for one message element
1061 */
1062 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1063                            struct ldb_message_element *el, int v_idx)
1064 {
1065         struct ldb_context *ldb;
1066         struct ldb_dn *dn_key;
1067         int ret;
1068         const struct ldb_schema_attribute *a;
1069         struct dn_list *list;
1070         unsigned alloc_len;
1071
1072         ldb = ldb_module_get_ctx(module);
1073
1074         list = talloc_zero(module, struct dn_list);
1075         if (list == NULL) {
1076                 return LDB_ERR_OPERATIONS_ERROR;
1077         }
1078
1079         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1080         if (!dn_key) {
1081                 talloc_free(list);
1082                 return LDB_ERR_OPERATIONS_ERROR;
1083         }
1084         talloc_steal(list, dn_key);
1085
1086         ret = ltdb_dn_list_load(module, dn_key, list);
1087         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1088                 talloc_free(list);
1089                 return ret;
1090         }
1091
1092         if (ltdb_dn_list_find_str(list, dn) != -1) {
1093                 talloc_free(list);
1094                 return LDB_SUCCESS;
1095         }
1096
1097         if (list->count > 0 &&
1098             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1099                 talloc_free(list);
1100                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1101         }
1102
1103         /* overallocate the list a bit, to reduce the number of
1104          * realloc trigered copies */    
1105         alloc_len = ((list->count+1)+7) & ~7;
1106         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1107         if (list->dn == NULL) {
1108                 talloc_free(list);
1109                 return LDB_ERR_OPERATIONS_ERROR;
1110         }
1111         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1112         list->dn[list->count].length = strlen(dn);
1113         list->count++;
1114
1115         ret = ltdb_dn_list_store(module, dn_key, list);
1116
1117         talloc_free(list);
1118
1119         return ret;
1120 }
1121
1122 /*
1123   add index entries for one elements in a message
1124  */
1125 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1126                              struct ldb_message_element *el)
1127 {
1128         unsigned int i;
1129         for (i = 0; i < el->num_values; i++) {
1130                 int ret = ltdb_index_add1(module, dn, el, i);
1131                 if (ret != LDB_SUCCESS) {
1132                         return ret;
1133                 }
1134         }
1135
1136         return LDB_SUCCESS;
1137 }
1138
1139 /*
1140   add index entries for all elements in a message
1141  */
1142 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1143                               struct ldb_message_element *elements, int num_el)
1144 {
1145         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1146         unsigned int i;
1147
1148         if (dn[0] == '@') {
1149                 return LDB_SUCCESS;
1150         }
1151
1152         if (ltdb->cache->indexlist->num_elements == 0) {
1153                 /* no indexed fields */
1154                 return LDB_SUCCESS;
1155         }
1156
1157         for (i = 0; i < num_el; i++) {
1158                 int ret;
1159                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1160                         continue;
1161                 }
1162                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1163                 if (ret != LDB_SUCCESS) {
1164                         return ret;
1165                 }
1166         }
1167
1168         return LDB_SUCCESS;
1169 }
1170
1171
1172 /*
1173   insert a one level index for a message
1174 */
1175 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1176 {
1177         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1178         struct ldb_message_element el;
1179         struct ldb_val val;
1180         struct ldb_dn *pdn;
1181         const char *dn;
1182         int ret;
1183
1184         /* We index for ONE Level only if requested */
1185         if (!ltdb->cache->one_level_indexes) {
1186                 return LDB_SUCCESS;
1187         }
1188
1189         pdn = ldb_dn_get_parent(module, msg->dn);
1190         if (pdn == NULL) {
1191                 return LDB_ERR_OPERATIONS_ERROR;
1192         }
1193
1194         dn = ldb_dn_get_linearized(msg->dn);
1195         if (dn == NULL) {
1196                 talloc_free(pdn);
1197                 return LDB_ERR_OPERATIONS_ERROR;
1198         }
1199
1200         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1201         if (val.data == NULL) {
1202                 talloc_free(pdn);
1203                 return LDB_ERR_OPERATIONS_ERROR;
1204         }
1205
1206         val.length = strlen((char *)val.data);
1207         el.name = LTDB_IDXONE;
1208         el.values = &val;
1209         el.num_values = 1;
1210
1211         if (add) {
1212                 ret = ltdb_index_add1(module, dn, &el, 0);
1213         } else { /* delete */
1214                 ret = ltdb_index_del_value(module, dn, &el, 0);
1215         }
1216
1217         talloc_free(pdn);
1218
1219         return ret;
1220 }
1221
1222 /*
1223   add the index entries for a new element in a record
1224   The caller guarantees that these element values are not yet indexed
1225 */
1226 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1227                            struct ldb_message_element *el)
1228 {
1229         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1230         if (ldb_dn_is_special(dn)) {
1231                 return LDB_SUCCESS;
1232         }
1233         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1234                 return LDB_SUCCESS;
1235         }
1236         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1237 }
1238
1239 /*
1240   add the index entries for a new record
1241 */
1242 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1243 {
1244         const char *dn;
1245         int ret;
1246
1247         if (ldb_dn_is_special(msg->dn)) {
1248                 return LDB_SUCCESS;
1249         }
1250
1251         dn = ldb_dn_get_linearized(msg->dn);
1252         if (dn == NULL) {
1253                 return LDB_ERR_OPERATIONS_ERROR;
1254         }
1255
1256         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1257         if (ret != LDB_SUCCESS) {
1258                 return ret;
1259         }
1260
1261         return ltdb_index_onelevel(module, msg, 1);
1262 }
1263
1264
1265 /*
1266   delete an index entry for one message element
1267 */
1268 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1269                          struct ldb_message_element *el, int v_idx)
1270 {
1271         struct ldb_context *ldb;
1272         struct ldb_dn *dn_key;
1273         int ret, i;
1274         struct dn_list *list;
1275
1276         ldb = ldb_module_get_ctx(module);
1277
1278         if (dn[0] == '@') {
1279                 return LDB_SUCCESS;
1280         }
1281
1282         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1283         if (!dn_key) {
1284                 return LDB_ERR_OPERATIONS_ERROR;
1285         }
1286
1287         list = talloc_zero(dn_key, struct dn_list);
1288         if (list == NULL) {
1289                 talloc_free(dn_key);
1290                 return LDB_ERR_OPERATIONS_ERROR;
1291         }
1292
1293         ret = ltdb_dn_list_load(module, dn_key, list);
1294         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1295                 /* it wasn't indexed. Did we have an earlier error? If we did then
1296                    its gone now */
1297                 talloc_free(dn_key);
1298                 return LDB_SUCCESS;
1299         }
1300
1301         if (ret != LDB_SUCCESS) {
1302                 talloc_free(dn_key);
1303                 return ret;
1304         }
1305
1306         i = ltdb_dn_list_find_str(list, dn);
1307         if (i == -1) {
1308                 /* nothing to delete */
1309                 talloc_free(dn_key);
1310                 return LDB_SUCCESS;             
1311         }
1312
1313         if (i != list->count-1) {
1314                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1315         }
1316         list->count--;
1317         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1318
1319         ret = ltdb_dn_list_store(module, dn_key, list);
1320
1321         talloc_free(dn_key);
1322
1323         return ret;
1324 }
1325
1326 /*
1327   delete the index entries for a element
1328   return -1 on failure
1329 */
1330 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1331 {
1332         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1333         int ret;
1334         unsigned int i;
1335
1336         if (!ltdb->cache->attribute_indexes) {
1337                 /* no indexed fields */
1338                 return LDB_SUCCESS;
1339         }
1340
1341         if (dn[0] == '@') {
1342                 return LDB_SUCCESS;
1343         }
1344
1345         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1346                 return LDB_SUCCESS;
1347         }
1348         for (i = 0; i < el->num_values; i++) {
1349                 ret = ltdb_index_del_value(module, dn, el, i);
1350                 if (ret != LDB_SUCCESS) {
1351                         return ret;
1352                 }
1353         }
1354
1355         return LDB_SUCCESS;
1356 }
1357
1358 /*
1359   delete the index entries for a record
1360   return -1 on failure
1361 */
1362 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1363 {
1364         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1365         int ret;
1366         const char *dn;
1367         unsigned int i;
1368
1369         if (ldb_dn_is_special(msg->dn)) {
1370                 return LDB_SUCCESS;
1371         }
1372
1373         ret = ltdb_index_onelevel(module, msg, 0);
1374         if (ret != LDB_SUCCESS) {
1375                 return ret;
1376         }
1377
1378         if (!ltdb->cache->attribute_indexes) {
1379                 /* no indexed fields */
1380                 return LDB_SUCCESS;
1381         }
1382
1383         dn = ldb_dn_get_linearized(msg->dn);
1384         if (dn == NULL) {
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         for (i = 0; i < msg->num_elements; i++) {
1389                 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1390                 if (ret != LDB_SUCCESS) {
1391                         return ret;
1392                 }
1393         }
1394
1395         return LDB_SUCCESS;
1396 }
1397
1398
1399 /*
1400   traversal function that deletes all @INDEX records
1401 */
1402 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1403 {
1404         struct ldb_module *module = state;
1405         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1406         const char *dnstr = "DN=" LTDB_INDEX ":";
1407         struct dn_list list;
1408         struct ldb_dn *dn;
1409         struct ldb_val v;
1410         int ret;
1411
1412         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1413                 return 0;
1414         }
1415         /* we need to put a empty list in the internal tdb for this
1416          * index entry */
1417         list.dn = NULL;
1418         list.count = 0;
1419         v.data = key.dptr;
1420         v.length = strlen((char *)key.dptr);
1421
1422         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1423         ret = ltdb_dn_list_store(module, dn, &list);
1424         if (ret != LDB_SUCCESS) {
1425                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1426                                        "Unable to store null index for %s\n",
1427                                        ldb_dn_get_linearized(dn));
1428                 talloc_free(dn);
1429                 return -1;
1430         }
1431         talloc_free(dn);
1432         return 0;
1433 }
1434
1435 /*
1436   traversal function that adds @INDEX records during a re index
1437 */
1438 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1439 {
1440         struct ldb_context *ldb;
1441         struct ldb_module *module = (struct ldb_module *)state;
1442         struct ldb_message *msg;
1443         const char *dn = NULL;
1444         int ret;
1445         TDB_DATA key2;
1446
1447         ldb = ldb_module_get_ctx(module);
1448
1449         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1450             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1451                 return 0;
1452         }
1453
1454         msg = ldb_msg_new(module);
1455         if (msg == NULL) {
1456                 return -1;
1457         }
1458
1459         ret = ltdb_unpack_data(module, &data, msg);
1460         if (ret != 0) {
1461                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1462                           ldb_dn_get_linearized(msg->dn));
1463                 talloc_free(msg);
1464                 return -1;
1465         }
1466
1467         /* check if the DN key has changed, perhaps due to the
1468            case insensitivity of an element changing */
1469         key2 = ltdb_key(module, msg->dn);
1470         if (key2.dptr == NULL) {
1471                 /* probably a corrupt record ... darn */
1472                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1473                                                         ldb_dn_get_linearized(msg->dn));
1474                 talloc_free(msg);
1475                 return 0;
1476         }
1477         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1478                 tdb_delete(tdb, key);
1479                 tdb_store(tdb, key2, data, 0);
1480         }
1481         talloc_free(key2.dptr);
1482
1483         if (msg->dn == NULL) {
1484                 dn = (char *)key.dptr + 3;
1485         } else {
1486                 dn = ldb_dn_get_linearized(msg->dn);
1487         }
1488
1489         ret = ltdb_index_onelevel(module, msg, 1);
1490         if (ret != LDB_SUCCESS) {
1491                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1492                         "Adding special ONE LEVEL index failed (%s)!",
1493                         ldb_dn_get_linearized(msg->dn));
1494                 talloc_free(msg);
1495                 return -1;
1496         }
1497
1498         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1499
1500         talloc_free(msg);
1501
1502         if (ret != LDB_SUCCESS) return -1;
1503
1504         return 0;
1505 }
1506
1507 /*
1508   force a complete reindex of the database
1509 */
1510 int ltdb_reindex(struct ldb_module *module)
1511 {
1512         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1513         int ret;
1514
1515         if (ltdb_cache_reload(module) != 0) {
1516                 return LDB_ERR_OPERATIONS_ERROR;
1517         }
1518
1519         /* first traverse the database deleting any @INDEX records by
1520          * putting NULL entries in the in-memory tdb
1521          */
1522         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1523         if (ret == -1) {
1524                 return LDB_ERR_OPERATIONS_ERROR;
1525         }
1526
1527         /* if we don't have indexes we have nothing todo */
1528         if (ltdb->cache->indexlist->num_elements == 0) {
1529                 return LDB_SUCCESS;
1530         }
1531
1532         /* now traverse adding any indexes for normal LDB records */
1533         ret = tdb_traverse(ltdb->tdb, re_index, module);
1534         if (ret == -1) {
1535                 return LDB_ERR_OPERATIONS_ERROR;
1536         }
1537
1538         return LDB_SUCCESS;
1539 }