4b31021001115e2f32f3e4518479eb0f8bdb119c
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         if (v1->length > v2->length && v1->data[v2->length] != 0) {
64                 return -1;
65         }
66         if (v1->length < v2->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 {
100         struct dn_list *list;
101         if (rec.dsize != sizeof(void *)) {
102                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
103                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
104                 return NULL;
105         }
106         memcpy(&list, rec.dptr, sizeof(void *));
107         list = talloc_get_type(list, struct dn_list);
108         if (list == NULL) {
109                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
110                                        "Bad type '%s' for idxptr", 
111                                        talloc_get_name(list));
112                 return NULL;
113         }
114         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
116                                        "Bad parent '%s' for idxptr", 
117                                        talloc_get_name(talloc_parent(list->dn)));
118                 return NULL;
119         }
120         return list;
121 }
122
123 /*
124   return the @IDX list in an index entry for a dn as a 
125   struct dn_list
126  */
127 static int ltdb_dn_list_load(struct ldb_module *module,
128                              struct ldb_dn *dn, struct dn_list *list)
129 {
130         struct ldb_message *msg;
131         int ret;
132         struct ldb_message_element *el;
133         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134         TDB_DATA rec;
135         struct dn_list *list2;
136         TDB_DATA key;
137
138         list->dn = NULL;
139         list->count = 0;
140
141         /* see if we have any in-memory index entries */
142         if (ltdb->idxptr == NULL ||
143             ltdb->idxptr->itdb == NULL) {
144                 goto normal_index;
145         }
146
147         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148         key.dsize = strlen((char *)key.dptr);
149
150         rec = tdb_fetch(ltdb->idxptr->itdb, key);
151         if (rec.dptr == NULL) {
152                 goto normal_index;
153         }
154
155         /* we've found an in-memory index entry */
156         list2 = ltdb_index_idxptr(module, rec, true);
157         if (list2 == NULL) {
158                 free(rec.dptr);
159                 return LDB_ERR_OPERATIONS_ERROR;
160         }
161         free(rec.dptr);
162
163         *list = *list2;
164         return LDB_SUCCESS;
165
166 normal_index:
167         msg = ldb_msg_new(list);
168         if (msg == NULL) {
169                 return LDB_ERR_OPERATIONS_ERROR;
170         }
171
172         ret = ltdb_search_dn1(module, dn, msg);
173         if (ret != LDB_SUCCESS) {
174                 return ret;
175         }
176
177         /* TODO: check indexing version number */
178
179         el = ldb_msg_find_element(msg, LTDB_IDX);
180         if (!el) {
181                 talloc_free(msg);
182                 return LDB_SUCCESS;
183         }
184
185         /* we avoid copying the strings by stealing the list */
186         list->dn = talloc_steal(list, el->values);
187         list->count = el->num_values;
188
189         return LDB_SUCCESS;
190 }
191
192
193 /*
194   save a dn_list into a full @IDX style record
195  */
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
197                                    struct dn_list *list)
198 {
199         struct ldb_message *msg;
200         int ret;
201
202         if (list->count == 0) {
203                 ret = ltdb_delete_noindex(module, dn);
204                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205                         return LDB_SUCCESS;
206                 }
207                 return ret;
208         }
209
210         msg = ldb_msg_new(module);
211         if (!msg) {
212                 ldb_module_oom(module);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217         if (ret != LDB_SUCCESS) {
218                 talloc_free(msg);
219                 ldb_module_oom(module);
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         msg->dn = dn;
224         if (list->count > 0) {
225                 struct ldb_message_element *el;
226
227                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228                 if (ret != LDB_SUCCESS) {
229                         ldb_module_oom(module);
230                         talloc_free(msg);
231                         return LDB_ERR_OPERATIONS_ERROR;
232                 }
233                 el->values = list->dn;
234                 el->num_values = list->count;
235         }
236
237         ret = ltdb_store(module, msg, TDB_REPLACE);
238         talloc_free(msg);
239         return ret;
240 }
241
242 /*
243   save a dn_list into the database, in either @IDX or internal format
244  */
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
246                               struct dn_list *list)
247 {
248         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
249         TDB_DATA rec, key;
250         int ret;
251         struct dn_list *list2;
252
253         if (ltdb->idxptr == NULL) {
254                 return ltdb_dn_list_store_full(module, dn, list);
255         }
256
257         if (ltdb->idxptr->itdb == NULL) {
258                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259                 if (ltdb->idxptr->itdb == NULL) {
260                         return LDB_ERR_OPERATIONS_ERROR;
261                 }
262         }
263
264         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265         key.dsize = strlen((char *)key.dptr);
266
267         rec = tdb_fetch(ltdb->idxptr->itdb, key);
268         if (rec.dptr != NULL) {
269                 list2 = ltdb_index_idxptr(module, rec, false);
270                 if (list2 == NULL) {
271                         free(rec.dptr);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 free(rec.dptr);
275                 list2->dn = talloc_steal(list2, list->dn);
276                 list2->count = list->count;
277                 return LDB_SUCCESS;
278         }
279
280         list2 = talloc(ltdb->idxptr, struct dn_list);
281         if (list2 == NULL) {
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284         list2->dn = talloc_steal(list2, list->dn);
285         list2->count = list->count;
286
287         rec.dptr = (uint8_t *)&list2;
288         rec.dsize = sizeof(void *);
289
290         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
291         if (ret == -1) {
292                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
293         }
294         return LDB_SUCCESS;
295 }
296
297 /*
298   traverse function for storing the in-memory index entries on disk
299  */
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
301 {
302         struct ldb_module *module = state;
303         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
304         struct ldb_dn *dn;
305         struct ldb_context *ldb = ldb_module_get_ctx(module);
306         struct ldb_val v;
307         struct dn_list *list;
308
309         list = ltdb_index_idxptr(module, data, true);
310         if (list == NULL) {
311                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
312                 return -1;
313         }
314
315         v.data = key.dptr;
316         v.length = strnlen((char *)key.dptr, key.dsize);
317
318         dn = ldb_dn_from_ldb_val(module, ldb, &v);
319         if (dn == NULL) {
320                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
321                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
322                 return -1;
323         }
324
325         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
326         talloc_free(dn);
327         if (ltdb->idxptr->error != 0) {
328                 return -1;
329         }
330         return 0;
331 }
332
333 /* cleanup the idxptr mode when transaction commits */
334 int ltdb_index_transaction_commit(struct ldb_module *module)
335 {
336         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
337         int ret;
338
339         struct ldb_context *ldb = ldb_module_get_ctx(module);
340
341         ldb_reset_err_string(ldb);
342         if (ltdb->idxptr->itdb) {
343                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
344                 tdb_close(ltdb->idxptr->itdb);
345         }
346
347         ret = ltdb->idxptr->error;
348
349         if (ret != LDB_SUCCESS) {
350                 if (!ldb_errstring(ldb)) {
351                         ldb_set_errstring(ldb, ldb_strerror(ret));
352                 }
353                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
354         }
355
356         talloc_free(ltdb->idxptr);
357         ltdb->idxptr = NULL;
358         return ret;
359 }
360
361 /* cleanup the idxptr mode when transaction cancels */
362 int ltdb_index_transaction_cancel(struct ldb_module *module)
363 {
364         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
365         if (ltdb->idxptr && ltdb->idxptr->itdb) {
366                 tdb_close(ltdb->idxptr->itdb);
367         }
368         talloc_free(ltdb->idxptr);
369         ltdb->idxptr = NULL;
370         return LDB_SUCCESS;
371 }
372
373
374 /*
375   return the dn key to be used for an index
376   the caller is responsible for freeing
377 */
378 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
379                                      const char *attr, const struct ldb_val *value,
380                                      const struct ldb_schema_attribute **ap)
381 {
382         struct ldb_dn *ret;
383         struct ldb_val v;
384         const struct ldb_schema_attribute *a;
385         char *attr_folded;
386         int r;
387
388         attr_folded = ldb_attr_casefold(ldb, attr);
389         if (!attr_folded) {
390                 return NULL;
391         }
392
393         a = ldb_schema_attribute_by_name(ldb, attr);
394         if (ap) {
395                 *ap = a;
396         }
397         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
398         if (r != LDB_SUCCESS) {
399                 const char *errstr = ldb_errstring(ldb);
400                 /* canonicalisation can be refused. For example, 
401                    a attribute that takes wildcards will refuse to canonicalise
402                    if the value contains a wildcard */
403                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
404                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
405                 talloc_free(attr_folded);
406                 return NULL;
407         }
408         if (ldb_should_b64_encode(ldb, &v)) {
409                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
410                 if (!vstr) return NULL;
411                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
412                 talloc_free(vstr);
413         } else {
414                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
415         }
416
417         if (v.data != value->data) {
418                 talloc_free(v.data);
419         }
420         talloc_free(attr_folded);
421
422         return ret;
423 }
424
425 /*
426   see if a attribute value is in the list of indexed attributes
427 */
428 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
429 {
430         unsigned int i;
431         struct ldb_message_element *el;
432
433         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
434         if (el == NULL) {
435                 return false;
436         }
437
438         /* TODO: this is too expensive! At least use a binary search */
439         for (i=0; i<el->num_values; i++) {
440                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
441                         return true;
442                 }
443         }
444         return false;
445 }
446
447 /*
448   in the following logic functions, the return value is treated as
449   follows:
450
451      LDB_SUCCESS: we found some matching index values
452
453      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
454
455      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
456                                we'll need a full search
457  */
458
459 /*
460   return a list of dn's that might match a simple indexed search (an
461   equality search only)
462  */
463 static int ltdb_index_dn_simple(struct ldb_module *module,
464                                 const struct ldb_parse_tree *tree,
465                                 const struct ldb_message *index_list,
466                                 struct dn_list *list)
467 {
468         struct ldb_context *ldb;
469         struct ldb_dn *dn;
470         int ret;
471
472         ldb = ldb_module_get_ctx(module);
473
474         list->count = 0;
475         list->dn = NULL;
476
477         /* if the attribute isn't in the list of indexed attributes then
478            this node needs a full search */
479         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483         /* the attribute is indexed. Pull the list of DNs that match the 
484            search criterion */
485         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
486         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
487
488         ret = ltdb_dn_list_load(module, dn, list);
489         talloc_free(dn);
490         return ret;
491 }
492
493
494 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
495
496 /*
497   return a list of dn's that might match a leaf indexed search
498  */
499 static int ltdb_index_dn_leaf(struct ldb_module *module,
500                               const struct ldb_parse_tree *tree,
501                               const struct ldb_message *index_list,
502                               struct dn_list *list)
503 {
504         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
505                 list->dn = talloc_array(list, struct ldb_val, 1);
506                 if (list->dn == NULL) {
507                         ldb_module_oom(module);
508                         return LDB_ERR_OPERATIONS_ERROR;
509                 }
510                 list->dn[0] = tree->u.equality.value;
511                 list->count = 1;
512                 return LDB_SUCCESS;
513         }
514         return ltdb_index_dn_simple(module, tree, index_list, list);
515 }
516
517
518 /*
519   list intersection
520   list = list & list2
521 */
522 static bool list_intersect(struct ldb_context *ldb,
523                            struct dn_list *list, const struct dn_list *list2)
524 {
525         struct dn_list *list3;
526         unsigned int i;
527
528         if (list->count == 0) {
529                 /* 0 & X == 0 */
530                 return true;
531         }
532         if (list2->count == 0) {
533                 /* X & 0 == 0 */
534                 list->count = 0;
535                 list->dn = NULL;
536                 return true;
537         }
538
539         /* the indexing code is allowed to return a longer list than
540            what really matches, as all results are filtered by the
541            full expression at the end - this shortcut avoids a lot of
542            work in some cases */
543         if (list->count < 2 && list2->count > 10) {
544                 return true;
545         }
546         if (list2->count < 2 && list->count > 10) {
547                 list->count = list2->count;
548                 list->dn = list2->dn;
549                 /* note that list2 may not be the parent of list2->dn,
550                    as list2->dn may be owned by ltdb->idxptr. In that
551                    case we expect this reparent call to fail, which is
552                    OK */
553                 talloc_reparent(list2, list, list2->dn);
554                 return true;
555         }
556
557         list3 = talloc_zero(list, struct dn_list);
558         if (list3 == NULL) {
559                 return false;
560         }
561
562         list3->dn = talloc_array(list3, struct ldb_val, list->count);
563         if (!list3->dn) {
564                 talloc_free(list3);
565                 return false;
566         }
567         list3->count = 0;
568
569         for (i=0;i<list->count;i++) {
570                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
571                         list3->dn[list3->count] = list->dn[i];
572                         list3->count++;
573                 }
574         }
575
576         list->dn = talloc_steal(list, list3->dn);
577         list->count = list3->count;
578         talloc_free(list3);
579
580         return true;
581 }
582
583
584 /*
585   list union
586   list = list | list2
587 */
588 static bool list_union(struct ldb_context *ldb,
589                        struct dn_list *list, const struct dn_list *list2)
590 {
591         struct ldb_val *dn3;
592
593         if (list2->count == 0) {
594                 /* X | 0 == X */
595                 return true;
596         }
597
598         if (list->count == 0) {
599                 /* 0 | X == X */
600                 list->count = list2->count;
601                 list->dn = list2->dn;
602                 /* note that list2 may not be the parent of list2->dn,
603                    as list2->dn may be owned by ltdb->idxptr. In that
604                    case we expect this reparent call to fail, which is
605                    OK */
606                 talloc_reparent(list2, list, list2->dn);
607                 return true;
608         }
609
610         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
611         if (!dn3) {
612                 ldb_oom(ldb);
613                 return false;
614         }
615
616         /* we allow for duplicates here, and get rid of them later */
617         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
618         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
619
620         list->dn = dn3;
621         list->count += list2->count;
622
623         return true;
624 }
625
626 static int ltdb_index_dn(struct ldb_module *module,
627                          const struct ldb_parse_tree *tree,
628                          const struct ldb_message *index_list,
629                          struct dn_list *list);
630
631
632 /*
633   process an OR list (a union)
634  */
635 static int ltdb_index_dn_or(struct ldb_module *module,
636                             const struct ldb_parse_tree *tree,
637                             const struct ldb_message *index_list,
638                             struct dn_list *list)
639 {
640         struct ldb_context *ldb;
641         unsigned int i;
642
643         ldb = ldb_module_get_ctx(module);
644
645         list->dn = NULL;
646         list->count = 0;
647
648         for (i=0; i<tree->u.list.num_elements; i++) {
649                 struct dn_list *list2;
650                 int ret;
651
652                 list2 = talloc_zero(list, struct dn_list);
653                 if (list2 == NULL) {
654                         return LDB_ERR_OPERATIONS_ERROR;
655                 }
656
657                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
658
659                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
660                         /* X || 0 == X */
661                         talloc_free(list2);
662                         continue;
663                 }
664
665                 if (ret != LDB_SUCCESS) {
666                         /* X || * == * */
667                         talloc_free(list2);
668                         return ret;
669                 }
670
671                 if (!list_union(ldb, list, list2)) {
672                         talloc_free(list2);
673                         return LDB_ERR_OPERATIONS_ERROR;
674                 }
675         }
676
677         if (list->count == 0) {
678                 return LDB_ERR_NO_SUCH_OBJECT;
679         }
680
681         return LDB_SUCCESS;
682 }
683
684
685 /*
686   NOT an index results
687  */
688 static int ltdb_index_dn_not(struct ldb_module *module,
689                              const struct ldb_parse_tree *tree,
690                              const struct ldb_message *index_list,
691                              struct dn_list *list)
692 {
693         /* the only way to do an indexed not would be if we could
694            negate the not via another not or if we knew the total
695            number of database elements so we could know that the
696            existing expression covered the whole database.
697
698            instead, we just give up, and rely on a full index scan
699            (unless an outer & manages to reduce the list)
700         */
701         return LDB_ERR_OPERATIONS_ERROR;
702 }
703
704
705 static bool ltdb_index_unique(struct ldb_context *ldb,
706                               const char *attr)
707 {
708         const struct ldb_schema_attribute *a;
709         a = ldb_schema_attribute_by_name(ldb, attr);
710         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
711                 return true;
712         }
713         return false;
714 }
715
716 /*
717   process an AND expression (intersection)
718  */
719 static int ltdb_index_dn_and(struct ldb_module *module,
720                              const struct ldb_parse_tree *tree,
721                              const struct ldb_message *index_list,
722                              struct dn_list *list)
723 {
724         struct ldb_context *ldb;
725         unsigned int i;
726         bool found;
727
728         ldb = ldb_module_get_ctx(module);
729
730         list->dn = NULL;
731         list->count = 0;
732
733         /* in the first pass we only look for unique simple
734            equality tests, in the hope of avoiding having to look
735            at any others */
736         for (i=0; i<tree->u.list.num_elements; i++) {
737                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
738                 int ret;
739
740                 if (subtree->operation != LDB_OP_EQUALITY ||
741                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
742                         continue;
743                 }
744                 
745                 ret = ltdb_index_dn(module, subtree, index_list, list);
746                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
747                         /* 0 && X == 0 */
748                         return LDB_ERR_NO_SUCH_OBJECT;
749                 }
750                 if (ret == LDB_SUCCESS) {
751                         /* a unique index match means we can
752                          * stop. Note that we don't care if we return
753                          * a few too many objects, due to later
754                          * filtering */
755                         return LDB_SUCCESS;
756                 }
757         }       
758
759         /* now do a full intersection */
760         found = false;
761
762         for (i=0; i<tree->u.list.num_elements; i++) {
763                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
764                 struct dn_list *list2;
765                 int ret;
766
767                 list2 = talloc_zero(list, struct dn_list);
768                 if (list2 == NULL) {
769                         ldb_module_oom(module);
770                         return LDB_ERR_OPERATIONS_ERROR;
771                 }
772                         
773                 ret = ltdb_index_dn(module, subtree, index_list, list2);
774
775                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
776                         /* X && 0 == 0 */
777                         list->dn = NULL;
778                         list->count = 0;
779                         talloc_free(list2);
780                         return LDB_ERR_NO_SUCH_OBJECT;
781                 }
782                 
783                 if (ret != LDB_SUCCESS) {
784                         /* this didn't adding anything */
785                         talloc_free(list2);
786                         continue;
787                 }
788
789                 if (!found) {
790                         talloc_reparent(list2, list, list->dn);
791                         list->dn = list2->dn;
792                         list->count = list2->count;
793                         found = true;
794                 } else if (!list_intersect(ldb, list, list2)) {
795                         talloc_free(list2);
796                         return LDB_ERR_OPERATIONS_ERROR;
797                 }
798                         
799                 if (list->count == 0) {
800                         list->dn = NULL;
801                         return LDB_ERR_NO_SUCH_OBJECT;
802                 }
803                         
804                 if (list->count < 2) {
805                         /* it isn't worth loading the next part of the tree */
806                         return LDB_SUCCESS;
807                 }
808         }       
809
810         if (!found) {
811                 /* none of the attributes were indexed */
812                 return LDB_ERR_OPERATIONS_ERROR;
813         }
814
815         return LDB_SUCCESS;
816 }
817         
818 /*
819   return a list of matching objects using a one-level index
820  */
821 static int ltdb_index_dn_one(struct ldb_module *module,
822                              struct ldb_dn *parent_dn,
823                              struct dn_list *list)
824 {
825         struct ldb_context *ldb;
826         struct ldb_dn *key;
827         struct ldb_val val;
828         int ret;
829
830         ldb = ldb_module_get_ctx(module);
831
832         /* work out the index key from the parent DN */
833         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
834         val.length = strlen((char *)val.data);
835         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
836         if (!key) {
837                 ldb_oom(ldb);
838                 return LDB_ERR_OPERATIONS_ERROR;
839         }
840
841         ret = ltdb_dn_list_load(module, key, list);
842         talloc_free(key);
843         if (ret != LDB_SUCCESS) {
844                 return ret;
845         }
846
847         if (list->count == 0) {
848                 return LDB_ERR_NO_SUCH_OBJECT;
849         }
850
851         return LDB_SUCCESS;
852 }
853
854 /*
855   return a list of dn's that might match a indexed search or
856   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
857  */
858 static int ltdb_index_dn(struct ldb_module *module,
859                          const struct ldb_parse_tree *tree,
860                          const struct ldb_message *index_list,
861                          struct dn_list *list)
862 {
863         int ret = LDB_ERR_OPERATIONS_ERROR;
864
865         switch (tree->operation) {
866         case LDB_OP_AND:
867                 ret = ltdb_index_dn_and(module, tree, index_list, list);
868                 break;
869
870         case LDB_OP_OR:
871                 ret = ltdb_index_dn_or(module, tree, index_list, list);
872                 break;
873
874         case LDB_OP_NOT:
875                 ret = ltdb_index_dn_not(module, tree, index_list, list);
876                 break;
877
878         case LDB_OP_EQUALITY:
879                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
880                 break;
881
882         case LDB_OP_SUBSTRING:
883         case LDB_OP_GREATER:
884         case LDB_OP_LESS:
885         case LDB_OP_PRESENT:
886         case LDB_OP_APPROX:
887         case LDB_OP_EXTENDED:
888                 /* we can't index with fancy bitops yet */
889                 ret = LDB_ERR_OPERATIONS_ERROR;
890                 break;
891         }
892
893         return ret;
894 }
895
896 /*
897   filter a candidate dn_list from an indexed search into a set of results
898   extracting just the given attributes
899 */
900 static int ltdb_index_filter(const struct dn_list *dn_list,
901                              struct ltdb_context *ac, 
902                              uint32_t *match_count)
903 {
904         struct ldb_context *ldb;
905         struct ldb_message *msg;
906         unsigned int i;
907
908         ldb = ldb_module_get_ctx(ac->module);
909
910         for (i = 0; i < dn_list->count; i++) {
911                 struct ldb_dn *dn;
912                 int ret;
913
914                 msg = ldb_msg_new(ac);
915                 if (!msg) {
916                         return LDB_ERR_OPERATIONS_ERROR;
917                 }
918
919                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
920                 if (dn == NULL) {
921                         talloc_free(msg);
922                         return LDB_ERR_OPERATIONS_ERROR;
923                 }
924
925                 ret = ltdb_search_dn1(ac->module, dn, msg);
926                 talloc_free(dn);
927                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
928                         /* the record has disappeared? yes, this can happen */
929                         talloc_free(msg);
930                         continue;
931                 }
932
933                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
934                         /* an internal error */
935                         talloc_free(msg);
936                         return LDB_ERR_OPERATIONS_ERROR;
937                 }
938
939                 if (!ldb_match_msg(ldb, msg,
940                                    ac->tree, ac->base, ac->scope)) {
941                         talloc_free(msg);
942                         continue;
943                 }
944
945                 /* filter the attributes that the user wants */
946                 ret = ltdb_filter_attrs(msg, ac->attrs);
947
948                 if (ret == -1) {
949                         talloc_free(msg);
950                         return LDB_ERR_OPERATIONS_ERROR;
951                 }
952
953                 ret = ldb_module_send_entry(ac->req, msg, NULL);
954                 if (ret != LDB_SUCCESS) {
955                         ac->request_terminated = true;
956                         return ret;
957                 }
958
959                 (*match_count)++;
960         }
961
962         return LDB_SUCCESS;
963 }
964
965 /*
966   remove any duplicated entries in a indexed result
967  */
968 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
969 {
970         int i, new_count;
971
972         if (list->count < 2) {
973                 return;
974         }
975
976         qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
977
978         new_count = 1;
979         for (i=1; i<list->count; i++) {
980                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
981                         if (new_count != i) {
982                                 list->dn[new_count] = list->dn[i];
983                         }
984                         new_count++;
985                 }
986         }
987         
988         list->count = new_count;
989 }
990
991 /*
992   search the database with a LDAP-like expression using indexes
993   returns -1 if an indexed search is not possible, in which
994   case the caller should call ltdb_search_full()
995 */
996 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
997 {
998         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
999         struct dn_list *dn_list;
1000         int ret;
1001
1002         /* see if indexing is enabled */
1003         if (!ltdb->cache->attribute_indexes && 
1004             !ltdb->cache->one_level_indexes &&
1005             ac->scope != LDB_SCOPE_BASE) {
1006                 /* fallback to a full search */
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         dn_list = talloc_zero(ac, struct dn_list);
1011         if (dn_list == NULL) {
1012                 ldb_module_oom(ac->module);
1013                 return LDB_ERR_OPERATIONS_ERROR;
1014         }
1015
1016         switch (ac->scope) {
1017         case LDB_SCOPE_BASE:
1018                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1019                 if (dn_list->dn == NULL) {
1020                         ldb_module_oom(ac->module);
1021                         talloc_free(dn_list);
1022                         return LDB_ERR_OPERATIONS_ERROR;
1023                 }
1024                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1025                 if (dn_list->dn[0].data == NULL) {
1026                         ldb_module_oom(ac->module);
1027                         talloc_free(dn_list);
1028                         return LDB_ERR_OPERATIONS_ERROR;
1029                 }
1030                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1031                 dn_list->count = 1;
1032                 break;          
1033
1034         case LDB_SCOPE_ONELEVEL:
1035                 if (!ltdb->cache->one_level_indexes) {
1036                         talloc_free(dn_list);
1037                         return LDB_ERR_OPERATIONS_ERROR;
1038                 }
1039                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1040                 if (ret != LDB_SUCCESS) {
1041                         talloc_free(dn_list);
1042                         return ret;
1043                 }
1044                 break;
1045
1046         case LDB_SCOPE_SUBTREE:
1047         case LDB_SCOPE_DEFAULT:
1048                 if (!ltdb->cache->attribute_indexes) {
1049                         talloc_free(dn_list);
1050                         return LDB_ERR_OPERATIONS_ERROR;
1051                 }
1052                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1053                 if (ret != LDB_SUCCESS) {
1054                         talloc_free(dn_list);
1055                         return ret;
1056                 }
1057                 ltdb_dn_list_remove_duplicates(dn_list);
1058                 break;
1059         }
1060
1061         ret = ltdb_index_filter(dn_list, ac, match_count);
1062         talloc_free(dn_list);
1063         return ret;
1064 }
1065
1066 /*
1067   add an index entry for one message element
1068 */
1069 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1070                            struct ldb_message_element *el, int v_idx)
1071 {
1072         struct ldb_context *ldb;
1073         struct ldb_dn *dn_key;
1074         int ret;
1075         const struct ldb_schema_attribute *a;
1076         struct dn_list *list;
1077         unsigned alloc_len;
1078
1079         ldb = ldb_module_get_ctx(module);
1080
1081         list = talloc_zero(module, struct dn_list);
1082         if (list == NULL) {
1083                 return LDB_ERR_OPERATIONS_ERROR;
1084         }
1085
1086         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1087         if (!dn_key) {
1088                 talloc_free(list);
1089                 return LDB_ERR_OPERATIONS_ERROR;
1090         }
1091         talloc_steal(list, dn_key);
1092
1093         ret = ltdb_dn_list_load(module, dn_key, list);
1094         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1095                 talloc_free(list);
1096                 return ret;
1097         }
1098
1099         if (ltdb_dn_list_find_str(list, dn) != -1) {
1100                 talloc_free(list);
1101                 return LDB_SUCCESS;
1102         }
1103
1104         if (list->count > 0 &&
1105             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1106                 talloc_free(list);
1107                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1108                                        el->name, dn);
1109                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1110         }
1111
1112         /* overallocate the list a bit, to reduce the number of
1113          * realloc trigered copies */    
1114         alloc_len = ((list->count+1)+7) & ~7;
1115         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1116         if (list->dn == NULL) {
1117                 talloc_free(list);
1118                 return LDB_ERR_OPERATIONS_ERROR;
1119         }
1120         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1121         list->dn[list->count].length = strlen(dn);
1122         list->count++;
1123
1124         ret = ltdb_dn_list_store(module, dn_key, list);
1125
1126         talloc_free(list);
1127
1128         return ret;
1129 }
1130
1131 /*
1132   add index entries for one elements in a message
1133  */
1134 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1135                              struct ldb_message_element *el)
1136 {
1137         unsigned int i;
1138         for (i = 0; i < el->num_values; i++) {
1139                 int ret = ltdb_index_add1(module, dn, el, i);
1140                 if (ret != LDB_SUCCESS) {
1141                         return ret;
1142                 }
1143         }
1144
1145         return LDB_SUCCESS;
1146 }
1147
1148 /*
1149   add index entries for all elements in a message
1150  */
1151 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1152                               struct ldb_message_element *elements, int num_el)
1153 {
1154         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1155         unsigned int i;
1156
1157         if (dn[0] == '@') {
1158                 return LDB_SUCCESS;
1159         }
1160
1161         if (ltdb->cache->indexlist->num_elements == 0) {
1162                 /* no indexed fields */
1163                 return LDB_SUCCESS;
1164         }
1165
1166         for (i = 0; i < num_el; i++) {
1167                 int ret;
1168                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1169                         continue;
1170                 }
1171                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1172                 if (ret != LDB_SUCCESS) {
1173                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1174                         ldb_asprintf_errstring(ldb,
1175                                                __location__ ": Failed to re-index %s in %s - %s",
1176                                                elements[i].name, dn, ldb_errstring(ldb));
1177                         return ret;
1178                 }
1179         }
1180
1181         return LDB_SUCCESS;
1182 }
1183
1184
1185 /*
1186   insert a one level index for a message
1187 */
1188 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1189 {
1190         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1191         struct ldb_message_element el;
1192         struct ldb_val val;
1193         struct ldb_dn *pdn;
1194         const char *dn;
1195         int ret;
1196
1197         /* We index for ONE Level only if requested */
1198         if (!ltdb->cache->one_level_indexes) {
1199                 return LDB_SUCCESS;
1200         }
1201
1202         pdn = ldb_dn_get_parent(module, msg->dn);
1203         if (pdn == NULL) {
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206
1207         dn = ldb_dn_get_linearized(msg->dn);
1208         if (dn == NULL) {
1209                 talloc_free(pdn);
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1214         if (val.data == NULL) {
1215                 talloc_free(pdn);
1216                 return LDB_ERR_OPERATIONS_ERROR;
1217         }
1218
1219         val.length = strlen((char *)val.data);
1220         el.name = LTDB_IDXONE;
1221         el.values = &val;
1222         el.num_values = 1;
1223
1224         if (add) {
1225                 ret = ltdb_index_add1(module, dn, &el, 0);
1226         } else { /* delete */
1227                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1228         }
1229
1230         talloc_free(pdn);
1231
1232         return ret;
1233 }
1234
1235 /*
1236   add the index entries for a new element in a record
1237   The caller guarantees that these element values are not yet indexed
1238 */
1239 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1240                            struct ldb_message_element *el)
1241 {
1242         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1243         if (ldb_dn_is_special(dn)) {
1244                 return LDB_SUCCESS;
1245         }
1246         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1247                 return LDB_SUCCESS;
1248         }
1249         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1250 }
1251
1252 /*
1253   add the index entries for a new record
1254 */
1255 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1256 {
1257         const char *dn;
1258         int ret;
1259
1260         if (ldb_dn_is_special(msg->dn)) {
1261                 return LDB_SUCCESS;
1262         }
1263
1264         dn = ldb_dn_get_linearized(msg->dn);
1265         if (dn == NULL) {
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1270         if (ret != LDB_SUCCESS) {
1271                 return ret;
1272         }
1273
1274         return ltdb_index_onelevel(module, msg, 1);
1275 }
1276
1277
1278 /*
1279   delete an index entry for one message element
1280 */
1281 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1282                          struct ldb_message_element *el, int v_idx)
1283 {
1284         struct ldb_context *ldb;
1285         struct ldb_dn *dn_key;
1286         const char *dn_str;
1287         int ret, i;
1288         struct dn_list *list;
1289
1290         ldb = ldb_module_get_ctx(module);
1291
1292         dn_str = ldb_dn_get_linearized(dn);
1293         if (dn_str == NULL) {
1294                 return LDB_ERR_OPERATIONS_ERROR;
1295         }
1296
1297         if (dn_str[0] == '@') {
1298                 return LDB_SUCCESS;
1299         }
1300
1301         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1302         if (!dn_key) {
1303                 return LDB_ERR_OPERATIONS_ERROR;
1304         }
1305
1306         list = talloc_zero(dn_key, struct dn_list);
1307         if (list == NULL) {
1308                 talloc_free(dn_key);
1309                 return LDB_ERR_OPERATIONS_ERROR;
1310         }
1311
1312         ret = ltdb_dn_list_load(module, dn_key, list);
1313         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1314                 /* it wasn't indexed. Did we have an earlier error? If we did then
1315                    its gone now */
1316                 talloc_free(dn_key);
1317                 return LDB_SUCCESS;
1318         }
1319
1320         if (ret != LDB_SUCCESS) {
1321                 talloc_free(dn_key);
1322                 return ret;
1323         }
1324
1325         i = ltdb_dn_list_find_str(list, dn_str);
1326         if (i == -1) {
1327                 /* nothing to delete */
1328                 talloc_free(dn_key);
1329                 return LDB_SUCCESS;             
1330         }
1331
1332         if (i != list->count-1) {
1333                 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1334         }
1335         list->count--;
1336         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1337
1338         ret = ltdb_dn_list_store(module, dn_key, list);
1339
1340         talloc_free(dn_key);
1341
1342         return ret;
1343 }
1344
1345 /*
1346   delete the index entries for a element
1347   return -1 on failure
1348 */
1349 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1350                            struct ldb_message_element *el)
1351 {
1352         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1353         const char *dn_str;
1354         int ret;
1355         unsigned int i;
1356
1357         if (!ltdb->cache->attribute_indexes) {
1358                 /* no indexed fields */
1359                 return LDB_SUCCESS;
1360         }
1361
1362         dn_str = ldb_dn_get_linearized(dn);
1363         if (dn_str == NULL) {
1364                 return LDB_ERR_OPERATIONS_ERROR;
1365         }
1366
1367         if (dn_str[0] == '@') {
1368                 return LDB_SUCCESS;
1369         }
1370
1371         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1372                 return LDB_SUCCESS;
1373         }
1374         for (i = 0; i < el->num_values; i++) {
1375                 ret = ltdb_index_del_value(module, dn, el, i);
1376                 if (ret != LDB_SUCCESS) {
1377                         return ret;
1378                 }
1379         }
1380
1381         return LDB_SUCCESS;
1382 }
1383
1384 /*
1385   delete the index entries for a record
1386   return -1 on failure
1387 */
1388 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1389 {
1390         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1391         int ret;
1392         unsigned int i;
1393
1394         if (ldb_dn_is_special(msg->dn)) {
1395                 return LDB_SUCCESS;
1396         }
1397
1398         ret = ltdb_index_onelevel(module, msg, 0);
1399         if (ret != LDB_SUCCESS) {
1400                 return ret;
1401         }
1402
1403         if (!ltdb->cache->attribute_indexes) {
1404                 /* no indexed fields */
1405                 return LDB_SUCCESS;
1406         }
1407
1408         for (i = 0; i < msg->num_elements; i++) {
1409                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1410                 if (ret != LDB_SUCCESS) {
1411                         return ret;
1412                 }
1413         }
1414
1415         return LDB_SUCCESS;
1416 }
1417
1418
1419 /*
1420   traversal function that deletes all @INDEX records
1421 */
1422 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1423 {
1424         struct ldb_module *module = state;
1425         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1426         const char *dnstr = "DN=" LTDB_INDEX ":";
1427         struct dn_list list;
1428         struct ldb_dn *dn;
1429         struct ldb_val v;
1430         int ret;
1431
1432         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1433                 return 0;
1434         }
1435         /* we need to put a empty list in the internal tdb for this
1436          * index entry */
1437         list.dn = NULL;
1438         list.count = 0;
1439         v.data = key.dptr;
1440         v.length = strnlen((char *)key.dptr, key.dsize);
1441
1442         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1443         ret = ltdb_dn_list_store(module, dn, &list);
1444         if (ret != LDB_SUCCESS) {
1445                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1446                                        "Unable to store null index for %s\n",
1447                                                 ldb_dn_get_linearized(dn));
1448                 talloc_free(dn);
1449                 return -1;
1450         }
1451         talloc_free(dn);
1452         return 0;
1453 }
1454
1455 struct ltdb_reindex_context {
1456         struct ldb_module *module;
1457         int error;
1458 };
1459
1460 /*
1461   traversal function that adds @INDEX records during a re index
1462 */
1463 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1464 {
1465         struct ldb_context *ldb;
1466         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1467         struct ldb_module *module = ctx->module;
1468         struct ldb_message *msg;
1469         const char *dn = NULL;
1470         int ret;
1471         TDB_DATA key2;
1472
1473         ldb = ldb_module_get_ctx(module);
1474
1475         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1476             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1477                 return 0;
1478         }
1479
1480         msg = ldb_msg_new(module);
1481         if (msg == NULL) {
1482                 return -1;
1483         }
1484
1485         ret = ltdb_unpack_data(module, &data, msg);
1486         if (ret != 0) {
1487                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1488                                                 ldb_dn_get_linearized(msg->dn));
1489                 talloc_free(msg);
1490                 return -1;
1491         }
1492
1493         /* check if the DN key has changed, perhaps due to the
1494            case insensitivity of an element changing */
1495         key2 = ltdb_key(module, msg->dn);
1496         if (key2.dptr == NULL) {
1497                 /* probably a corrupt record ... darn */
1498                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1499                                                 ldb_dn_get_linearized(msg->dn));
1500                 talloc_free(msg);
1501                 return 0;
1502         }
1503         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1504                 tdb_delete(tdb, key);
1505                 tdb_store(tdb, key2, data, 0);
1506         }
1507         talloc_free(key2.dptr);
1508
1509         if (msg->dn == NULL) {
1510                 dn = (char *)key.dptr + 3;
1511         } else {
1512                 dn = ldb_dn_get_linearized(msg->dn);
1513         }
1514
1515         ret = ltdb_index_onelevel(module, msg, 1);
1516         if (ret != LDB_SUCCESS) {
1517                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1518                           "Adding special ONE LEVEL index failed (%s)!",
1519                                                 ldb_dn_get_linearized(msg->dn));
1520                 talloc_free(msg);
1521                 return -1;
1522         }
1523
1524         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1525
1526         if (ret != LDB_SUCCESS) {
1527                 ctx->error = ret;
1528                 talloc_free(msg);
1529                 return -1;
1530         }
1531
1532         talloc_free(msg);
1533
1534         return 0;
1535 }
1536
1537 /*
1538   force a complete reindex of the database
1539 */
1540 int ltdb_reindex(struct ldb_module *module)
1541 {
1542         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1543         int ret;
1544         struct ltdb_reindex_context ctx;
1545
1546         if (ltdb_cache_reload(module) != 0) {
1547                 return LDB_ERR_OPERATIONS_ERROR;
1548         }
1549
1550         /* first traverse the database deleting any @INDEX records by
1551          * putting NULL entries in the in-memory tdb
1552          */
1553         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1554         if (ret == -1) {
1555                 return LDB_ERR_OPERATIONS_ERROR;
1556         }
1557
1558         /* if we don't have indexes we have nothing todo */
1559         if (ltdb->cache->indexlist->num_elements == 0) {
1560                 return LDB_SUCCESS;
1561         }
1562
1563         ctx.module = module;
1564         ctx.error = 0;
1565
1566         /* now traverse adding any indexes for normal LDB records */
1567         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1568         if (ret == -1) {
1569                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1570                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1571                 return LDB_ERR_OPERATIONS_ERROR;
1572         }
1573
1574         if (ctx.error != LDB_SUCCESS) {
1575                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1576                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1577                 return ctx.error;
1578         }
1579
1580         return LDB_SUCCESS;
1581 }