52f9f00c58db953e3c59f108c60cb3393d7eeab7
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         if (v1->length > v2->length && v1->data[v2->length] != 0) {
64                 return -1;
65         }
66         if (v1->length < v2->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         
107         list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(*(struct dn_list **)rec.dptr));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 talloc_free(msg);
219                 ldb_module_oom(module);
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         msg->dn = dn;
224         if (list->count > 0) {
225                 struct ldb_message_element *el;
226
227                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228                 if (ret != LDB_SUCCESS) {
229                         ldb_module_oom(module);
230                         talloc_free(msg);
231                         return LDB_ERR_OPERATIONS_ERROR;
232                 }
233                 el->values = list->dn;
234                 el->num_values = list->count;
235         }
236
237         ret = ltdb_store(module, msg, TDB_REPLACE);
238         talloc_free(msg);
239         return ret;
240 }
241
242 /*
243   save a dn_list into the database, in either @IDX or internal format
244  */
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
246                               struct dn_list *list)
247 {
248         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
249         TDB_DATA rec, key;
250         int ret;
251         struct dn_list *list2;
252
253         if (ltdb->idxptr == NULL) {
254                 return ltdb_dn_list_store_full(module, dn, list);
255         }
256
257         if (ltdb->idxptr->itdb == NULL) {
258                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259                 if (ltdb->idxptr->itdb == NULL) {
260                         return LDB_ERR_OPERATIONS_ERROR;
261                 }
262         }
263
264         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265         key.dsize = strlen((char *)key.dptr);
266
267         rec = tdb_fetch(ltdb->idxptr->itdb, key);
268         if (rec.dptr != NULL) {
269                 list2 = ltdb_index_idxptr(module, rec, false);
270                 if (list2 == NULL) {
271                         free(rec.dptr);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 free(rec.dptr);
275                 list2->dn = talloc_steal(list2, list->dn);
276                 list2->count = list->count;
277                 return LDB_SUCCESS;
278         }
279
280         list2 = talloc(ltdb->idxptr, struct dn_list);
281         if (list2 == NULL) {
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284         list2->dn = talloc_steal(list2, list->dn);
285         list2->count = list->count;
286
287         rec.dptr = (uint8_t *)&list2;
288         rec.dsize = sizeof(void *);
289
290         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
291         if (ret == -1) {
292                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
293         }
294         return LDB_SUCCESS;
295 }
296
297 /*
298   traverse function for storing the in-memory index entries on disk
299  */
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
301 {
302         struct ldb_module *module = state;
303         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
304         struct ldb_dn *dn;
305         struct ldb_context *ldb = ldb_module_get_ctx(module);
306         struct ldb_val v;
307         struct dn_list *list;
308
309         list = ltdb_index_idxptr(module, data, true);
310         if (list == NULL) {
311                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
312                 return -1;
313         }
314
315         v.data = key.dptr;
316         v.length = strnlen((char *)key.dptr, key.dsize);
317
318         dn = ldb_dn_from_ldb_val(module, ldb, &v);
319         if (dn == NULL) {
320                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
321                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
322                 return -1;
323         }
324
325         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
326         talloc_free(dn);
327         if (ltdb->idxptr->error != 0) {
328                 return -1;
329         }
330         return 0;
331 }
332
333 /* cleanup the idxptr mode when transaction commits */
334 int ltdb_index_transaction_commit(struct ldb_module *module)
335 {
336         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
337         int ret;
338
339         struct ldb_context *ldb = ldb_module_get_ctx(module);
340
341         ldb_reset_err_string(ldb);
342         if (ltdb->idxptr->itdb) {
343                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
344                 tdb_close(ltdb->idxptr->itdb);
345         }
346
347         ret = ltdb->idxptr->error;
348
349         if (ret != LDB_SUCCESS) {
350                 if (!ldb_errstring(ldb)) {
351                         ldb_set_errstring(ldb, ldb_strerror(ret));
352                 }
353                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
354         }
355
356         talloc_free(ltdb->idxptr);
357         ltdb->idxptr = NULL;
358         return ret;
359 }
360
361 /* cleanup the idxptr mode when transaction cancels */
362 int ltdb_index_transaction_cancel(struct ldb_module *module)
363 {
364         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
365         if (ltdb->idxptr && ltdb->idxptr->itdb) {
366                 tdb_close(ltdb->idxptr->itdb);
367         }
368         talloc_free(ltdb->idxptr);
369         ltdb->idxptr = NULL;
370         return LDB_SUCCESS;
371 }
372
373
374 /*
375   return the dn key to be used for an index
376   the caller is responsible for freeing
377 */
378 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
379                                      const char *attr, const struct ldb_val *value,
380                                      const struct ldb_schema_attribute **ap)
381 {
382         struct ldb_dn *ret;
383         struct ldb_val v;
384         const struct ldb_schema_attribute *a;
385         char *attr_folded;
386         int r;
387
388         attr_folded = ldb_attr_casefold(ldb, attr);
389         if (!attr_folded) {
390                 return NULL;
391         }
392
393         a = ldb_schema_attribute_by_name(ldb, attr);
394         if (ap) {
395                 *ap = a;
396         }
397         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
398         if (r != LDB_SUCCESS) {
399                 const char *errstr = ldb_errstring(ldb);
400                 /* canonicalisation can be refused. For example, 
401                    a attribute that takes wildcards will refuse to canonicalise
402                    if the value contains a wildcard */
403                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
404                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
405                 talloc_free(attr_folded);
406                 return NULL;
407         }
408         if (ldb_should_b64_encode(ldb, &v)) {
409                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
410                 if (!vstr) return NULL;
411                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
412                 talloc_free(vstr);
413         } else {
414                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
415         }
416
417         if (v.data != value->data) {
418                 talloc_free(v.data);
419         }
420         talloc_free(attr_folded);
421
422         return ret;
423 }
424
425 /*
426   see if a attribute value is in the list of indexed attributes
427 */
428 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
429 {
430         unsigned int i;
431         struct ldb_message_element *el;
432
433         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
434         if (el == NULL) {
435                 return false;
436         }
437
438         /* TODO: this is too expensive! At least use a binary search */
439         for (i=0; i<el->num_values; i++) {
440                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
441                         return true;
442                 }
443         }
444         return false;
445 }
446
447 /*
448   in the following logic functions, the return value is treated as
449   follows:
450
451      LDB_SUCCESS: we found some matching index values
452
453      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
454
455      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
456                                we'll need a full search
457  */
458
459 /*
460   return a list of dn's that might match a simple indexed search (an
461   equality search only)
462  */
463 static int ltdb_index_dn_simple(struct ldb_module *module,
464                                 const struct ldb_parse_tree *tree,
465                                 const struct ldb_message *index_list,
466                                 struct dn_list *list)
467 {
468         struct ldb_context *ldb;
469         struct ldb_dn *dn;
470         int ret;
471
472         ldb = ldb_module_get_ctx(module);
473
474         list->count = 0;
475         list->dn = NULL;
476
477         /* if the attribute isn't in the list of indexed attributes then
478            this node needs a full search */
479         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483         /* the attribute is indexed. Pull the list of DNs that match the 
484            search criterion */
485         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
486         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
487
488         ret = ltdb_dn_list_load(module, dn, list);
489         talloc_free(dn);
490         return ret;
491 }
492
493
494 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
495
496 /*
497   return a list of dn's that might match a leaf indexed search
498  */
499 static int ltdb_index_dn_leaf(struct ldb_module *module,
500                               const struct ldb_parse_tree *tree,
501                               const struct ldb_message *index_list,
502                               struct dn_list *list)
503 {
504         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
505                 list->dn = talloc_array(list, struct ldb_val, 1);
506                 if (list->dn == NULL) {
507                         ldb_module_oom(module);
508                         return LDB_ERR_OPERATIONS_ERROR;
509                 }
510                 list->dn[0] = tree->u.equality.value;
511                 list->count = 1;
512                 return LDB_SUCCESS;
513         }
514         return ltdb_index_dn_simple(module, tree, index_list, list);
515 }
516
517
518 /*
519   list intersection
520   list = list & list2
521 */
522 static bool list_intersect(struct ldb_context *ldb,
523                            struct dn_list *list, const struct dn_list *list2)
524 {
525         struct dn_list *list3;
526         unsigned int i;
527
528         if (list->count == 0) {
529                 /* 0 & X == 0 */
530                 return true;
531         }
532         if (list2->count == 0) {
533                 /* X & 0 == 0 */
534                 list->count = 0;
535                 list->dn = NULL;
536                 return true;
537         }
538
539         /* the indexing code is allowed to return a longer list than
540            what really matches, as all results are filtered by the
541            full expression at the end - this shortcut avoids a lot of
542            work in some cases */
543         if (list->count < 2 && list2->count > 10) {
544                 return true;
545         }
546         if (list2->count < 2 && list->count > 10) {
547                 list->count = list2->count;
548                 list->dn = list2->dn;
549                 /* note that list2 may not be the parent of list2->dn,
550                    as list2->dn may be owned by ltdb->idxptr. In that
551                    case we expect this reparent call to fail, which is
552                    OK */
553                 talloc_reparent(list2, list, list2->dn);
554                 return true;
555         }
556
557         list3 = talloc_zero(list, struct dn_list);
558         if (list3 == NULL) {
559                 return false;
560         }
561
562         list3->dn = talloc_array(list3, struct ldb_val, list->count);
563         if (!list3->dn) {
564                 talloc_free(list3);
565                 return false;
566         }
567         list3->count = 0;
568
569         for (i=0;i<list->count;i++) {
570                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
571                         list3->dn[list3->count] = list->dn[i];
572                         list3->count++;
573                 }
574         }
575
576         list->dn = talloc_steal(list, list3->dn);
577         list->count = list3->count;
578         talloc_free(list3);
579
580         return true;
581 }
582
583
584 /*
585   list union
586   list = list | list2
587 */
588 static bool list_union(struct ldb_context *ldb,
589                        struct dn_list *list, const struct dn_list *list2)
590 {
591         struct ldb_val *dn3;
592
593         if (list2->count == 0) {
594                 /* X | 0 == X */
595                 return true;
596         }
597
598         if (list->count == 0) {
599                 /* 0 | X == X */
600                 list->count = list2->count;
601                 list->dn = list2->dn;
602                 /* note that list2 may not be the parent of list2->dn,
603                    as list2->dn may be owned by ltdb->idxptr. In that
604                    case we expect this reparent call to fail, which is
605                    OK */
606                 talloc_reparent(list2, list, list2->dn);
607                 return true;
608         }
609
610         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
611         if (!dn3) {
612                 ldb_oom(ldb);
613                 return false;
614         }
615
616         /* we allow for duplicates here, and get rid of them later */
617         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
618         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
619
620         list->dn = dn3;
621         list->count += list2->count;
622
623         return true;
624 }
625
626 static int ltdb_index_dn(struct ldb_module *module,
627                          const struct ldb_parse_tree *tree,
628                          const struct ldb_message *index_list,
629                          struct dn_list *list);
630
631
632 /*
633   process an OR list (a union)
634  */
635 static int ltdb_index_dn_or(struct ldb_module *module,
636                             const struct ldb_parse_tree *tree,
637                             const struct ldb_message *index_list,
638                             struct dn_list *list)
639 {
640         struct ldb_context *ldb;
641         unsigned int i;
642
643         ldb = ldb_module_get_ctx(module);
644
645         list->dn = NULL;
646         list->count = 0;
647
648         for (i=0; i<tree->u.list.num_elements; i++) {
649                 struct dn_list *list2;
650                 int ret;
651
652                 list2 = talloc_zero(list, struct dn_list);
653                 if (list2 == NULL) {
654                         return LDB_ERR_OPERATIONS_ERROR;
655                 }
656
657                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
658
659                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
660                         /* X || 0 == X */
661                         talloc_free(list2);
662                         continue;
663                 }
664
665                 if (ret != LDB_SUCCESS) {
666                         /* X || * == * */
667                         talloc_free(list2);
668                         return ret;
669                 }
670
671                 if (!list_union(ldb, list, list2)) {
672                         talloc_free(list2);
673                         return LDB_ERR_OPERATIONS_ERROR;
674                 }
675         }
676
677         if (list->count == 0) {
678                 return LDB_ERR_NO_SUCH_OBJECT;
679         }
680
681         return LDB_SUCCESS;
682 }
683
684
685 /*
686   NOT an index results
687  */
688 static int ltdb_index_dn_not(struct ldb_module *module,
689                              const struct ldb_parse_tree *tree,
690                              const struct ldb_message *index_list,
691                              struct dn_list *list)
692 {
693         /* the only way to do an indexed not would be if we could
694            negate the not via another not or if we knew the total
695            number of database elements so we could know that the
696            existing expression covered the whole database.
697
698            instead, we just give up, and rely on a full index scan
699            (unless an outer & manages to reduce the list)
700         */
701         return LDB_ERR_OPERATIONS_ERROR;
702 }
703
704
705 static bool ltdb_index_unique(struct ldb_context *ldb,
706                               const char *attr)
707 {
708         const struct ldb_schema_attribute *a;
709         a = ldb_schema_attribute_by_name(ldb, attr);
710         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
711                 return true;
712         }
713         return false;
714 }
715
716 /*
717   process an AND expression (intersection)
718  */
719 static int ltdb_index_dn_and(struct ldb_module *module,
720                              const struct ldb_parse_tree *tree,
721                              const struct ldb_message *index_list,
722                              struct dn_list *list)
723 {
724         struct ldb_context *ldb;
725         unsigned int i;
726         bool found;
727
728         ldb = ldb_module_get_ctx(module);
729
730         list->dn = NULL;
731         list->count = 0;
732
733         /* in the first pass we only look for unique simple
734            equality tests, in the hope of avoiding having to look
735            at any others */
736         for (i=0; i<tree->u.list.num_elements; i++) {
737                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
738                 int ret;
739
740                 if (subtree->operation != LDB_OP_EQUALITY ||
741                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
742                         continue;
743                 }
744                 
745                 ret = ltdb_index_dn(module, subtree, index_list, list);
746                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
747                         /* 0 && X == 0 */
748                         return LDB_ERR_NO_SUCH_OBJECT;
749                 }
750                 if (ret == LDB_SUCCESS) {
751                         /* a unique index match means we can
752                          * stop. Note that we don't care if we return
753                          * a few too many objects, due to later
754                          * filtering */
755                         return LDB_SUCCESS;
756                 }
757         }       
758
759         /* now do a full intersection */
760         found = false;
761
762         for (i=0; i<tree->u.list.num_elements; i++) {
763                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
764                 struct dn_list *list2;
765                 int ret;
766
767                 list2 = talloc_zero(list, struct dn_list);
768                 if (list2 == NULL) {
769                         ldb_module_oom(module);
770                         return LDB_ERR_OPERATIONS_ERROR;
771                 }
772                         
773                 ret = ltdb_index_dn(module, subtree, index_list, list2);
774
775                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
776                         /* X && 0 == 0 */
777                         list->dn = NULL;
778                         list->count = 0;
779                         talloc_free(list2);
780                         return LDB_ERR_NO_SUCH_OBJECT;
781                 }
782                 
783                 if (ret != LDB_SUCCESS) {
784                         /* this didn't adding anything */
785                         talloc_free(list2);
786                         continue;
787                 }
788
789                 if (!found) {
790                         talloc_reparent(list2, list, list->dn);
791                         list->dn = list2->dn;
792                         list->count = list2->count;
793                         found = true;
794                 } else if (!list_intersect(ldb, list, list2)) {
795                         talloc_free(list2);
796                         return LDB_ERR_OPERATIONS_ERROR;
797                 }
798                         
799                 if (list->count == 0) {
800                         list->dn = NULL;
801                         return LDB_ERR_NO_SUCH_OBJECT;
802                 }
803                         
804                 if (list->count < 2) {
805                         /* it isn't worth loading the next part of the tree */
806                         return LDB_SUCCESS;
807                 }
808         }       
809
810         if (!found) {
811                 /* none of the attributes were indexed */
812                 return LDB_ERR_OPERATIONS_ERROR;
813         }
814
815         return LDB_SUCCESS;
816 }
817         
818 /*
819   return a list of matching objects using a one-level index
820  */
821 static int ltdb_index_dn_one(struct ldb_module *module,
822                              struct ldb_dn *parent_dn,
823                              struct dn_list *list)
824 {
825         struct ldb_context *ldb;
826         struct ldb_dn *key;
827         struct ldb_val val;
828         int ret;
829
830         ldb = ldb_module_get_ctx(module);
831
832         /* work out the index key from the parent DN */
833         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
834         val.length = strlen((char *)val.data);
835         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
836         if (!key) {
837                 ldb_oom(ldb);
838                 return LDB_ERR_OPERATIONS_ERROR;
839         }
840
841         ret = ltdb_dn_list_load(module, key, list);
842         talloc_free(key);
843         if (ret != LDB_SUCCESS) {
844                 return ret;
845         }
846
847         if (list->count == 0) {
848                 return LDB_ERR_NO_SUCH_OBJECT;
849         }
850
851         return LDB_SUCCESS;
852 }
853
854 /*
855   return a list of dn's that might match a indexed search or
856   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
857  */
858 static int ltdb_index_dn(struct ldb_module *module,
859                          const struct ldb_parse_tree *tree,
860                          const struct ldb_message *index_list,
861                          struct dn_list *list)
862 {
863         int ret = LDB_ERR_OPERATIONS_ERROR;
864
865         switch (tree->operation) {
866         case LDB_OP_AND:
867                 ret = ltdb_index_dn_and(module, tree, index_list, list);
868                 break;
869
870         case LDB_OP_OR:
871                 ret = ltdb_index_dn_or(module, tree, index_list, list);
872                 break;
873
874         case LDB_OP_NOT:
875                 ret = ltdb_index_dn_not(module, tree, index_list, list);
876                 break;
877
878         case LDB_OP_EQUALITY:
879                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
880                 break;
881
882         case LDB_OP_SUBSTRING:
883         case LDB_OP_GREATER:
884         case LDB_OP_LESS:
885         case LDB_OP_PRESENT:
886         case LDB_OP_APPROX:
887         case LDB_OP_EXTENDED:
888                 /* we can't index with fancy bitops yet */
889                 ret = LDB_ERR_OPERATIONS_ERROR;
890                 break;
891         }
892
893         return ret;
894 }
895
896 /*
897   filter a candidate dn_list from an indexed search into a set of results
898   extracting just the given attributes
899 */
900 static int ltdb_index_filter(const struct dn_list *dn_list,
901                              struct ltdb_context *ac, 
902                              uint32_t *match_count)
903 {
904         struct ldb_context *ldb;
905         struct ldb_message *msg;
906         unsigned int i;
907
908         ldb = ldb_module_get_ctx(ac->module);
909
910         for (i = 0; i < dn_list->count; i++) {
911                 struct ldb_dn *dn;
912                 int ret;
913
914                 msg = ldb_msg_new(ac);
915                 if (!msg) {
916                         return LDB_ERR_OPERATIONS_ERROR;
917                 }
918
919                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
920                 if (dn == NULL) {
921                         talloc_free(msg);
922                         return LDB_ERR_OPERATIONS_ERROR;
923                 }
924
925                 ret = ltdb_search_dn1(ac->module, dn, msg);
926                 talloc_free(dn);
927                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
928                         /* the record has disappeared? yes, this can happen */
929                         talloc_free(msg);
930                         continue;
931                 }
932
933                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
934                         /* an internal error */
935                         talloc_free(msg);
936                         return LDB_ERR_OPERATIONS_ERROR;
937                 }
938
939                 if (!ldb_match_msg(ldb, msg,
940                                    ac->tree, ac->base, ac->scope)) {
941                         talloc_free(msg);
942                         continue;
943                 }
944
945                 /* filter the attributes that the user wants */
946                 ret = ltdb_filter_attrs(msg, ac->attrs);
947
948                 if (ret == -1) {
949                         talloc_free(msg);
950                         return LDB_ERR_OPERATIONS_ERROR;
951                 }
952
953                 ret = ldb_module_send_entry(ac->req, msg, NULL);
954                 if (ret != LDB_SUCCESS) {
955                         ac->request_terminated = true;
956                         return ret;
957                 }
958
959                 (*match_count)++;
960         }
961
962         return LDB_SUCCESS;
963 }
964
965 /*
966   remove any duplicated entries in a indexed result
967  */
968 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
969 {
970         int i, new_count;
971
972         if (list->count < 2) {
973                 return;
974         }
975
976         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
977
978         new_count = 1;
979         for (i=1; i<list->count; i++) {
980                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
981                         if (new_count != i) {
982                                 list->dn[new_count] = list->dn[i];
983                         }
984                         new_count++;
985                 }
986         }
987         
988         list->count = new_count;
989 }
990
991 /*
992   search the database with a LDAP-like expression using indexes
993   returns -1 if an indexed search is not possible, in which
994   case the caller should call ltdb_search_full()
995 */
996 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
997 {
998         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
999         struct dn_list *dn_list;
1000         int ret;
1001
1002         /* see if indexing is enabled */
1003         if (!ltdb->cache->attribute_indexes && 
1004             !ltdb->cache->one_level_indexes &&
1005             ac->scope != LDB_SCOPE_BASE) {
1006                 /* fallback to a full search */
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         dn_list = talloc_zero(ac, struct dn_list);
1011         if (dn_list == NULL) {
1012                 ldb_module_oom(ac->module);
1013                 return LDB_ERR_OPERATIONS_ERROR;
1014         }
1015
1016         switch (ac->scope) {
1017         case LDB_SCOPE_BASE:
1018                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1019                 if (dn_list->dn == NULL) {
1020                         ldb_module_oom(ac->module);
1021                         talloc_free(dn_list);
1022                         return LDB_ERR_OPERATIONS_ERROR;
1023                 }
1024                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1025                 if (dn_list->dn[0].data == NULL) {
1026                         ldb_module_oom(ac->module);
1027                         talloc_free(dn_list);
1028                         return LDB_ERR_OPERATIONS_ERROR;
1029                 }
1030                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1031                 dn_list->count = 1;
1032                 break;          
1033
1034         case LDB_SCOPE_ONELEVEL:
1035                 if (!ltdb->cache->one_level_indexes) {
1036                         talloc_free(dn_list);
1037                         return LDB_ERR_OPERATIONS_ERROR;
1038                 }
1039                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1040                 if (ret != LDB_SUCCESS) {
1041                         talloc_free(dn_list);
1042                         return ret;
1043                 }
1044                 break;
1045
1046         case LDB_SCOPE_SUBTREE:
1047         case LDB_SCOPE_DEFAULT:
1048                 if (!ltdb->cache->attribute_indexes) {
1049                         talloc_free(dn_list);
1050                         return LDB_ERR_OPERATIONS_ERROR;
1051                 }
1052                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1053                 if (ret != LDB_SUCCESS) {
1054                         talloc_free(dn_list);
1055                         return ret;
1056                 }
1057                 ltdb_dn_list_remove_duplicates(dn_list);
1058                 break;
1059         }
1060
1061         ret = ltdb_index_filter(dn_list, ac, match_count);
1062         talloc_free(dn_list);
1063         return ret;
1064 }
1065
1066 /*
1067   add an index entry for one message element
1068 */
1069 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1070                            struct ldb_message_element *el, int v_idx)
1071 {
1072         struct ldb_context *ldb;
1073         struct ldb_dn *dn_key;
1074         int ret;
1075         const struct ldb_schema_attribute *a;
1076         struct dn_list *list;
1077         unsigned alloc_len;
1078
1079         ldb = ldb_module_get_ctx(module);
1080
1081         list = talloc_zero(module, struct dn_list);
1082         if (list == NULL) {
1083                 return LDB_ERR_OPERATIONS_ERROR;
1084         }
1085
1086         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1087         if (!dn_key) {
1088                 talloc_free(list);
1089                 return LDB_ERR_OPERATIONS_ERROR;
1090         }
1091         talloc_steal(list, dn_key);
1092
1093         ret = ltdb_dn_list_load(module, dn_key, list);
1094         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1095                 talloc_free(list);
1096                 return ret;
1097         }
1098
1099         if (ltdb_dn_list_find_str(list, dn) != -1) {
1100                 talloc_free(list);
1101                 return LDB_SUCCESS;
1102         }
1103
1104         if (list->count > 0 &&
1105             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1106                 talloc_free(list);
1107                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1108         }
1109
1110         /* overallocate the list a bit, to reduce the number of
1111          * realloc trigered copies */    
1112         alloc_len = ((list->count+1)+7) & ~7;
1113         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1114         if (list->dn == NULL) {
1115                 talloc_free(list);
1116                 return LDB_ERR_OPERATIONS_ERROR;
1117         }
1118         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1119         list->dn[list->count].length = strlen(dn);
1120         list->count++;
1121
1122         ret = ltdb_dn_list_store(module, dn_key, list);
1123
1124         talloc_free(list);
1125
1126         return ret;
1127 }
1128
1129 /*
1130   add index entries for one elements in a message
1131  */
1132 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1133                              struct ldb_message_element *el)
1134 {
1135         unsigned int i;
1136         for (i = 0; i < el->num_values; i++) {
1137                 int ret = ltdb_index_add1(module, dn, el, i);
1138                 if (ret != LDB_SUCCESS) {
1139                         return ret;
1140                 }
1141         }
1142
1143         return LDB_SUCCESS;
1144 }
1145
1146 /*
1147   add index entries for all elements in a message
1148  */
1149 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1150                               struct ldb_message_element *elements, int num_el)
1151 {
1152         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1153         unsigned int i;
1154
1155         if (dn[0] == '@') {
1156                 return LDB_SUCCESS;
1157         }
1158
1159         if (ltdb->cache->indexlist->num_elements == 0) {
1160                 /* no indexed fields */
1161                 return LDB_SUCCESS;
1162         }
1163
1164         for (i = 0; i < num_el; i++) {
1165                 int ret;
1166                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1167                         continue;
1168                 }
1169                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1170                 if (ret != LDB_SUCCESS) {
1171                         return ret;
1172                 }
1173         }
1174
1175         return LDB_SUCCESS;
1176 }
1177
1178
1179 /*
1180   insert a one level index for a message
1181 */
1182 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1183 {
1184         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1185         struct ldb_message_element el;
1186         struct ldb_val val;
1187         struct ldb_dn *pdn;
1188         const char *dn;
1189         int ret;
1190
1191         /* We index for ONE Level only if requested */
1192         if (!ltdb->cache->one_level_indexes) {
1193                 return LDB_SUCCESS;
1194         }
1195
1196         pdn = ldb_dn_get_parent(module, msg->dn);
1197         if (pdn == NULL) {
1198                 return LDB_ERR_OPERATIONS_ERROR;
1199         }
1200
1201         dn = ldb_dn_get_linearized(msg->dn);
1202         if (dn == NULL) {
1203                 talloc_free(pdn);
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206
1207         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1208         if (val.data == NULL) {
1209                 talloc_free(pdn);
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         val.length = strlen((char *)val.data);
1214         el.name = LTDB_IDXONE;
1215         el.values = &val;
1216         el.num_values = 1;
1217
1218         if (add) {
1219                 ret = ltdb_index_add1(module, dn, &el, 0);
1220         } else { /* delete */
1221                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1222         }
1223
1224         talloc_free(pdn);
1225
1226         return ret;
1227 }
1228
1229 /*
1230   add the index entries for a new element in a record
1231   The caller guarantees that these element values are not yet indexed
1232 */
1233 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1234                            struct ldb_message_element *el)
1235 {
1236         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1237         if (ldb_dn_is_special(dn)) {
1238                 return LDB_SUCCESS;
1239         }
1240         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1241                 return LDB_SUCCESS;
1242         }
1243         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1244 }
1245
1246 /*
1247   add the index entries for a new record
1248 */
1249 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1250 {
1251         const char *dn;
1252         int ret;
1253
1254         if (ldb_dn_is_special(msg->dn)) {
1255                 return LDB_SUCCESS;
1256         }
1257
1258         dn = ldb_dn_get_linearized(msg->dn);
1259         if (dn == NULL) {
1260                 return LDB_ERR_OPERATIONS_ERROR;
1261         }
1262
1263         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1264         if (ret != LDB_SUCCESS) {
1265                 return ret;
1266         }
1267
1268         return ltdb_index_onelevel(module, msg, 1);
1269 }
1270
1271
1272 /*
1273   delete an index entry for one message element
1274 */
1275 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1276                          struct ldb_message_element *el, int v_idx)
1277 {
1278         struct ldb_context *ldb;
1279         struct ldb_dn *dn_key;
1280         const char *dn_str;
1281         int ret, i;
1282         struct dn_list *list;
1283
1284         ldb = ldb_module_get_ctx(module);
1285
1286         dn_str = ldb_dn_get_linearized(dn);
1287         if (dn_str == NULL) {
1288                 return LDB_ERR_OPERATIONS_ERROR;
1289         }
1290
1291         if (dn_str[0] == '@') {
1292                 return LDB_SUCCESS;
1293         }
1294
1295         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1296         if (!dn_key) {
1297                 return LDB_ERR_OPERATIONS_ERROR;
1298         }
1299
1300         list = talloc_zero(dn_key, struct dn_list);
1301         if (list == NULL) {
1302                 talloc_free(dn_key);
1303                 return LDB_ERR_OPERATIONS_ERROR;
1304         }
1305
1306         ret = ltdb_dn_list_load(module, dn_key, list);
1307         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1308                 /* it wasn't indexed. Did we have an earlier error? If we did then
1309                    its gone now */
1310                 talloc_free(dn_key);
1311                 return LDB_SUCCESS;
1312         }
1313
1314         if (ret != LDB_SUCCESS) {
1315                 talloc_free(dn_key);
1316                 return ret;
1317         }
1318
1319         i = ltdb_dn_list_find_str(list, dn_str);
1320         if (i == -1) {
1321                 /* nothing to delete */
1322                 talloc_free(dn_key);
1323                 return LDB_SUCCESS;             
1324         }
1325
1326         if (i != list->count-1) {
1327                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1328         }
1329         list->count--;
1330         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1331
1332         ret = ltdb_dn_list_store(module, dn_key, list);
1333
1334         talloc_free(dn_key);
1335
1336         return ret;
1337 }
1338
1339 /*
1340   delete the index entries for a element
1341   return -1 on failure
1342 */
1343 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1344                            struct ldb_message_element *el)
1345 {
1346         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1347         const char *dn_str;
1348         int ret;
1349         unsigned int i;
1350
1351         if (!ltdb->cache->attribute_indexes) {
1352                 /* no indexed fields */
1353                 return LDB_SUCCESS;
1354         }
1355
1356         dn_str = ldb_dn_get_linearized(dn);
1357         if (dn_str == NULL) {
1358                 return LDB_ERR_OPERATIONS_ERROR;
1359         }
1360
1361         if (dn_str[0] == '@') {
1362                 return LDB_SUCCESS;
1363         }
1364
1365         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1366                 return LDB_SUCCESS;
1367         }
1368         for (i = 0; i < el->num_values; i++) {
1369                 ret = ltdb_index_del_value(module, dn, el, i);
1370                 if (ret != LDB_SUCCESS) {
1371                         return ret;
1372                 }
1373         }
1374
1375         return LDB_SUCCESS;
1376 }
1377
1378 /*
1379   delete the index entries for a record
1380   return -1 on failure
1381 */
1382 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1383 {
1384         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1385         int ret;
1386         unsigned int i;
1387
1388         if (ldb_dn_is_special(msg->dn)) {
1389                 return LDB_SUCCESS;
1390         }
1391
1392         ret = ltdb_index_onelevel(module, msg, 0);
1393         if (ret != LDB_SUCCESS) {
1394                 return ret;
1395         }
1396
1397         if (!ltdb->cache->attribute_indexes) {
1398                 /* no indexed fields */
1399                 return LDB_SUCCESS;
1400         }
1401
1402         for (i = 0; i < msg->num_elements; i++) {
1403                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1404                 if (ret != LDB_SUCCESS) {
1405                         return ret;
1406                 }
1407         }
1408
1409         return LDB_SUCCESS;
1410 }
1411
1412
1413 /*
1414   traversal function that deletes all @INDEX records
1415 */
1416 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1417 {
1418         struct ldb_module *module = state;
1419         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1420         const char *dnstr = "DN=" LTDB_INDEX ":";
1421         struct dn_list list;
1422         struct ldb_dn *dn;
1423         struct ldb_val v;
1424         int ret;
1425
1426         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1427                 return 0;
1428         }
1429         /* we need to put a empty list in the internal tdb for this
1430          * index entry */
1431         list.dn = NULL;
1432         list.count = 0;
1433         v.data = key.dptr;
1434         v.length = strnlen((char *)key.dptr, key.dsize);
1435
1436         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1437         ret = ltdb_dn_list_store(module, dn, &list);
1438         if (ret != LDB_SUCCESS) {
1439                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1440                                        "Unable to store null index for %s\n",
1441                                                 ldb_dn_get_linearized(dn));
1442                 talloc_free(dn);
1443                 return -1;
1444         }
1445         talloc_free(dn);
1446         return 0;
1447 }
1448
1449 /*
1450   traversal function that adds @INDEX records during a re index
1451 */
1452 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1453 {
1454         struct ldb_context *ldb;
1455         struct ldb_module *module = (struct ldb_module *)state;
1456         struct ldb_message *msg;
1457         const char *dn = NULL;
1458         int ret;
1459         TDB_DATA key2;
1460
1461         ldb = ldb_module_get_ctx(module);
1462
1463         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1464             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1465                 return 0;
1466         }
1467
1468         msg = ldb_msg_new(module);
1469         if (msg == NULL) {
1470                 return -1;
1471         }
1472
1473         ret = ltdb_unpack_data(module, &data, msg);
1474         if (ret != 0) {
1475                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1476                                                 ldb_dn_get_linearized(msg->dn));
1477                 talloc_free(msg);
1478                 return -1;
1479         }
1480
1481         /* check if the DN key has changed, perhaps due to the
1482            case insensitivity of an element changing */
1483         key2 = ltdb_key(module, msg->dn);
1484         if (key2.dptr == NULL) {
1485                 /* probably a corrupt record ... darn */
1486                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1487                                                 ldb_dn_get_linearized(msg->dn));
1488                 talloc_free(msg);
1489                 return 0;
1490         }
1491         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1492                 tdb_delete(tdb, key);
1493                 tdb_store(tdb, key2, data, 0);
1494         }
1495         talloc_free(key2.dptr);
1496
1497         if (msg->dn == NULL) {
1498                 dn = (char *)key.dptr + 3;
1499         } else {
1500                 dn = ldb_dn_get_linearized(msg->dn);
1501         }
1502
1503         ret = ltdb_index_onelevel(module, msg, 1);
1504         if (ret != LDB_SUCCESS) {
1505                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1506                           "Adding special ONE LEVEL index failed (%s)!",
1507                                                 ldb_dn_get_linearized(msg->dn));
1508                 talloc_free(msg);
1509                 return -1;
1510         }
1511
1512         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1513
1514         talloc_free(msg);
1515
1516         if (ret != LDB_SUCCESS) return -1;
1517
1518         return 0;
1519 }
1520
1521 /*
1522   force a complete reindex of the database
1523 */
1524 int ltdb_reindex(struct ldb_module *module)
1525 {
1526         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1527         int ret;
1528
1529         if (ltdb_cache_reload(module) != 0) {
1530                 return LDB_ERR_OPERATIONS_ERROR;
1531         }
1532
1533         /* first traverse the database deleting any @INDEX records by
1534          * putting NULL entries in the in-memory tdb
1535          */
1536         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1537         if (ret == -1) {
1538                 return LDB_ERR_OPERATIONS_ERROR;
1539         }
1540
1541         /* if we don't have indexes we have nothing todo */
1542         if (ltdb->cache->indexlist->num_elements == 0) {
1543                 return LDB_SUCCESS;
1544         }
1545
1546         /* now traverse adding any indexes for normal LDB records */
1547         ret = tdb_traverse(ltdb->tdb, re_index, module);
1548         if (ret == -1) {
1549                 return LDB_ERR_OPERATIONS_ERROR;
1550         }
1551
1552         return LDB_SUCCESS;
1553 }