s4-ldb: add some comments explaining the ltdb_index_idxptr() function
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         if (v1->length > v2->length && v1->data[v2->length] != 0) {
64                 return -1;
65         }
66         if (v1->length < v2->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         unsigned int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 /*
99   this is effectively a cast function, but with lots of paranoia
100   checks and also copes with CPUs that are fussy about pointer
101   alignment
102  */
103 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
104 {
105         struct dn_list *list;
106         if (rec.dsize != sizeof(void *)) {
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
108                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
109                 return NULL;
110         }
111         /* note that we can't just use a cast here, as rec.dptr may
112            not be aligned sufficiently for a pointer. A cast would cause
113            platforms like some ARM CPUs to crash */
114         memcpy(&list, rec.dptr, sizeof(void *));
115         list = talloc_get_type(list, struct dn_list);
116         if (list == NULL) {
117                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
118                                        "Bad type '%s' for idxptr", 
119                                        talloc_get_name(list));
120                 return NULL;
121         }
122         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
123                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
124                                        "Bad parent '%s' for idxptr", 
125                                        talloc_get_name(talloc_parent(list->dn)));
126                 return NULL;
127         }
128         return list;
129 }
130
131 /*
132   return the @IDX list in an index entry for a dn as a 
133   struct dn_list
134  */
135 static int ltdb_dn_list_load(struct ldb_module *module,
136                              struct ldb_dn *dn, struct dn_list *list)
137 {
138         struct ldb_message *msg;
139         int ret;
140         struct ldb_message_element *el;
141         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
142         TDB_DATA rec;
143         struct dn_list *list2;
144         TDB_DATA key;
145
146         list->dn = NULL;
147         list->count = 0;
148
149         /* see if we have any in-memory index entries */
150         if (ltdb->idxptr == NULL ||
151             ltdb->idxptr->itdb == NULL) {
152                 goto normal_index;
153         }
154
155         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
156         key.dsize = strlen((char *)key.dptr);
157
158         rec = tdb_fetch(ltdb->idxptr->itdb, key);
159         if (rec.dptr == NULL) {
160                 goto normal_index;
161         }
162
163         /* we've found an in-memory index entry */
164         list2 = ltdb_index_idxptr(module, rec, true);
165         if (list2 == NULL) {
166                 free(rec.dptr);
167                 return LDB_ERR_OPERATIONS_ERROR;
168         }
169         free(rec.dptr);
170
171         *list = *list2;
172         return LDB_SUCCESS;
173
174 normal_index:
175         msg = ldb_msg_new(list);
176         if (msg == NULL) {
177                 return LDB_ERR_OPERATIONS_ERROR;
178         }
179
180         ret = ltdb_search_dn1(module, dn, msg);
181         if (ret != LDB_SUCCESS) {
182                 return ret;
183         }
184
185         /* TODO: check indexing version number */
186
187         el = ldb_msg_find_element(msg, LTDB_IDX);
188         if (!el) {
189                 talloc_free(msg);
190                 return LDB_SUCCESS;
191         }
192
193         /* we avoid copying the strings by stealing the list */
194         list->dn = talloc_steal(list, el->values);
195         list->count = el->num_values;
196
197         return LDB_SUCCESS;
198 }
199
200
201 /*
202   save a dn_list into a full @IDX style record
203  */
204 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
205                                    struct dn_list *list)
206 {
207         struct ldb_message *msg;
208         int ret;
209
210         if (list->count == 0) {
211                 ret = ltdb_delete_noindex(module, dn);
212                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
213                         return LDB_SUCCESS;
214                 }
215                 return ret;
216         }
217
218         msg = ldb_msg_new(module);
219         if (!msg) {
220                 ldb_module_oom(module);
221                 return LDB_ERR_OPERATIONS_ERROR;
222         }
223
224         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
225         if (ret != LDB_SUCCESS) {
226                 talloc_free(msg);
227                 ldb_module_oom(module);
228                 return LDB_ERR_OPERATIONS_ERROR;
229         }
230
231         msg->dn = dn;
232         if (list->count > 0) {
233                 struct ldb_message_element *el;
234
235                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
236                 if (ret != LDB_SUCCESS) {
237                         ldb_module_oom(module);
238                         talloc_free(msg);
239                         return LDB_ERR_OPERATIONS_ERROR;
240                 }
241                 el->values = list->dn;
242                 el->num_values = list->count;
243         }
244
245         ret = ltdb_store(module, msg, TDB_REPLACE);
246         talloc_free(msg);
247         return ret;
248 }
249
250 /*
251   save a dn_list into the database, in either @IDX or internal format
252  */
253 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
254                               struct dn_list *list)
255 {
256         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
257         TDB_DATA rec, key;
258         int ret;
259         struct dn_list *list2;
260
261         if (ltdb->idxptr == NULL) {
262                 return ltdb_dn_list_store_full(module, dn, list);
263         }
264
265         if (ltdb->idxptr->itdb == NULL) {
266                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
267                 if (ltdb->idxptr->itdb == NULL) {
268                         return LDB_ERR_OPERATIONS_ERROR;
269                 }
270         }
271
272         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
273         key.dsize = strlen((char *)key.dptr);
274
275         rec = tdb_fetch(ltdb->idxptr->itdb, key);
276         if (rec.dptr != NULL) {
277                 list2 = ltdb_index_idxptr(module, rec, false);
278                 if (list2 == NULL) {
279                         free(rec.dptr);
280                         return LDB_ERR_OPERATIONS_ERROR;
281                 }
282                 free(rec.dptr);
283                 list2->dn = talloc_steal(list2, list->dn);
284                 list2->count = list->count;
285                 return LDB_SUCCESS;
286         }
287
288         list2 = talloc(ltdb->idxptr, struct dn_list);
289         if (list2 == NULL) {
290                 return LDB_ERR_OPERATIONS_ERROR;
291         }
292         list2->dn = talloc_steal(list2, list->dn);
293         list2->count = list->count;
294
295         rec.dptr = (uint8_t *)&list2;
296         rec.dsize = sizeof(void *);
297
298         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
299         if (ret == -1) {
300                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
301         }
302         return LDB_SUCCESS;
303 }
304
305 /*
306   traverse function for storing the in-memory index entries on disk
307  */
308 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
309 {
310         struct ldb_module *module = state;
311         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
312         struct ldb_dn *dn;
313         struct ldb_context *ldb = ldb_module_get_ctx(module);
314         struct ldb_val v;
315         struct dn_list *list;
316
317         list = ltdb_index_idxptr(module, data, true);
318         if (list == NULL) {
319                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
320                 return -1;
321         }
322
323         v.data = key.dptr;
324         v.length = strnlen((char *)key.dptr, key.dsize);
325
326         dn = ldb_dn_from_ldb_val(module, ldb, &v);
327         if (dn == NULL) {
328                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
329                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
330                 return -1;
331         }
332
333         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
334         talloc_free(dn);
335         if (ltdb->idxptr->error != 0) {
336                 return -1;
337         }
338         return 0;
339 }
340
341 /* cleanup the idxptr mode when transaction commits */
342 int ltdb_index_transaction_commit(struct ldb_module *module)
343 {
344         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
345         int ret;
346
347         struct ldb_context *ldb = ldb_module_get_ctx(module);
348
349         ldb_reset_err_string(ldb);
350         if (ltdb->idxptr->itdb) {
351                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
352                 tdb_close(ltdb->idxptr->itdb);
353         }
354
355         ret = ltdb->idxptr->error;
356
357         if (ret != LDB_SUCCESS) {
358                 if (!ldb_errstring(ldb)) {
359                         ldb_set_errstring(ldb, ldb_strerror(ret));
360                 }
361                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
362         }
363
364         talloc_free(ltdb->idxptr);
365         ltdb->idxptr = NULL;
366         return ret;
367 }
368
369 /* cleanup the idxptr mode when transaction cancels */
370 int ltdb_index_transaction_cancel(struct ldb_module *module)
371 {
372         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
373         if (ltdb->idxptr && ltdb->idxptr->itdb) {
374                 tdb_close(ltdb->idxptr->itdb);
375         }
376         talloc_free(ltdb->idxptr);
377         ltdb->idxptr = NULL;
378         return LDB_SUCCESS;
379 }
380
381
382 /*
383   return the dn key to be used for an index
384   the caller is responsible for freeing
385 */
386 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
387                                      const char *attr, const struct ldb_val *value,
388                                      const struct ldb_schema_attribute **ap)
389 {
390         struct ldb_dn *ret;
391         struct ldb_val v;
392         const struct ldb_schema_attribute *a;
393         char *attr_folded;
394         int r;
395
396         attr_folded = ldb_attr_casefold(ldb, attr);
397         if (!attr_folded) {
398                 return NULL;
399         }
400
401         a = ldb_schema_attribute_by_name(ldb, attr);
402         if (ap) {
403                 *ap = a;
404         }
405         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
406         if (r != LDB_SUCCESS) {
407                 const char *errstr = ldb_errstring(ldb);
408                 /* canonicalisation can be refused. For example, 
409                    a attribute that takes wildcards will refuse to canonicalise
410                    if the value contains a wildcard */
411                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
412                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
413                 talloc_free(attr_folded);
414                 return NULL;
415         }
416         if (ldb_should_b64_encode(ldb, &v)) {
417                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
418                 if (!vstr) return NULL;
419                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
420                 talloc_free(vstr);
421         } else {
422                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
423         }
424
425         if (v.data != value->data) {
426                 talloc_free(v.data);
427         }
428         talloc_free(attr_folded);
429
430         return ret;
431 }
432
433 /*
434   see if a attribute value is in the list of indexed attributes
435 */
436 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
437 {
438         unsigned int i;
439         struct ldb_message_element *el;
440
441         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
442         if (el == NULL) {
443                 return false;
444         }
445
446         /* TODO: this is too expensive! At least use a binary search */
447         for (i=0; i<el->num_values; i++) {
448                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
449                         return true;
450                 }
451         }
452         return false;
453 }
454
455 /*
456   in the following logic functions, the return value is treated as
457   follows:
458
459      LDB_SUCCESS: we found some matching index values
460
461      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
462
463      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
464                                we'll need a full search
465  */
466
467 /*
468   return a list of dn's that might match a simple indexed search (an
469   equality search only)
470  */
471 static int ltdb_index_dn_simple(struct ldb_module *module,
472                                 const struct ldb_parse_tree *tree,
473                                 const struct ldb_message *index_list,
474                                 struct dn_list *list)
475 {
476         struct ldb_context *ldb;
477         struct ldb_dn *dn;
478         int ret;
479
480         ldb = ldb_module_get_ctx(module);
481
482         list->count = 0;
483         list->dn = NULL;
484
485         /* if the attribute isn't in the list of indexed attributes then
486            this node needs a full search */
487         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
488                 return LDB_ERR_OPERATIONS_ERROR;
489         }
490
491         /* the attribute is indexed. Pull the list of DNs that match the 
492            search criterion */
493         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
494         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
495
496         ret = ltdb_dn_list_load(module, dn, list);
497         talloc_free(dn);
498         return ret;
499 }
500
501
502 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
503
504 /*
505   return a list of dn's that might match a leaf indexed search
506  */
507 static int ltdb_index_dn_leaf(struct ldb_module *module,
508                               const struct ldb_parse_tree *tree,
509                               const struct ldb_message *index_list,
510                               struct dn_list *list)
511 {
512         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
513                 list->dn = talloc_array(list, struct ldb_val, 1);
514                 if (list->dn == NULL) {
515                         ldb_module_oom(module);
516                         return LDB_ERR_OPERATIONS_ERROR;
517                 }
518                 list->dn[0] = tree->u.equality.value;
519                 list->count = 1;
520                 return LDB_SUCCESS;
521         }
522         return ltdb_index_dn_simple(module, tree, index_list, list);
523 }
524
525
526 /*
527   list intersection
528   list = list & list2
529 */
530 static bool list_intersect(struct ldb_context *ldb,
531                            struct dn_list *list, const struct dn_list *list2)
532 {
533         struct dn_list *list3;
534         unsigned int i;
535
536         if (list->count == 0) {
537                 /* 0 & X == 0 */
538                 return true;
539         }
540         if (list2->count == 0) {
541                 /* X & 0 == 0 */
542                 list->count = 0;
543                 list->dn = NULL;
544                 return true;
545         }
546
547         /* the indexing code is allowed to return a longer list than
548            what really matches, as all results are filtered by the
549            full expression at the end - this shortcut avoids a lot of
550            work in some cases */
551         if (list->count < 2 && list2->count > 10) {
552                 return true;
553         }
554         if (list2->count < 2 && list->count > 10) {
555                 list->count = list2->count;
556                 list->dn = list2->dn;
557                 /* note that list2 may not be the parent of list2->dn,
558                    as list2->dn may be owned by ltdb->idxptr. In that
559                    case we expect this reparent call to fail, which is
560                    OK */
561                 talloc_reparent(list2, list, list2->dn);
562                 return true;
563         }
564
565         list3 = talloc_zero(list, struct dn_list);
566         if (list3 == NULL) {
567                 return false;
568         }
569
570         list3->dn = talloc_array(list3, struct ldb_val, list->count);
571         if (!list3->dn) {
572                 talloc_free(list3);
573                 return false;
574         }
575         list3->count = 0;
576
577         for (i=0;i<list->count;i++) {
578                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
579                         list3->dn[list3->count] = list->dn[i];
580                         list3->count++;
581                 }
582         }
583
584         list->dn = talloc_steal(list, list3->dn);
585         list->count = list3->count;
586         talloc_free(list3);
587
588         return true;
589 }
590
591
592 /*
593   list union
594   list = list | list2
595 */
596 static bool list_union(struct ldb_context *ldb,
597                        struct dn_list *list, const struct dn_list *list2)
598 {
599         struct ldb_val *dn3;
600
601         if (list2->count == 0) {
602                 /* X | 0 == X */
603                 return true;
604         }
605
606         if (list->count == 0) {
607                 /* 0 | X == X */
608                 list->count = list2->count;
609                 list->dn = list2->dn;
610                 /* note that list2 may not be the parent of list2->dn,
611                    as list2->dn may be owned by ltdb->idxptr. In that
612                    case we expect this reparent call to fail, which is
613                    OK */
614                 talloc_reparent(list2, list, list2->dn);
615                 return true;
616         }
617
618         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
619         if (!dn3) {
620                 ldb_oom(ldb);
621                 return false;
622         }
623
624         /* we allow for duplicates here, and get rid of them later */
625         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
626         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
627
628         list->dn = dn3;
629         list->count += list2->count;
630
631         return true;
632 }
633
634 static int ltdb_index_dn(struct ldb_module *module,
635                          const struct ldb_parse_tree *tree,
636                          const struct ldb_message *index_list,
637                          struct dn_list *list);
638
639
640 /*
641   process an OR list (a union)
642  */
643 static int ltdb_index_dn_or(struct ldb_module *module,
644                             const struct ldb_parse_tree *tree,
645                             const struct ldb_message *index_list,
646                             struct dn_list *list)
647 {
648         struct ldb_context *ldb;
649         unsigned int i;
650
651         ldb = ldb_module_get_ctx(module);
652
653         list->dn = NULL;
654         list->count = 0;
655
656         for (i=0; i<tree->u.list.num_elements; i++) {
657                 struct dn_list *list2;
658                 int ret;
659
660                 list2 = talloc_zero(list, struct dn_list);
661                 if (list2 == NULL) {
662                         return LDB_ERR_OPERATIONS_ERROR;
663                 }
664
665                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
666
667                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
668                         /* X || 0 == X */
669                         talloc_free(list2);
670                         continue;
671                 }
672
673                 if (ret != LDB_SUCCESS) {
674                         /* X || * == * */
675                         talloc_free(list2);
676                         return ret;
677                 }
678
679                 if (!list_union(ldb, list, list2)) {
680                         talloc_free(list2);
681                         return LDB_ERR_OPERATIONS_ERROR;
682                 }
683         }
684
685         if (list->count == 0) {
686                 return LDB_ERR_NO_SUCH_OBJECT;
687         }
688
689         return LDB_SUCCESS;
690 }
691
692
693 /*
694   NOT an index results
695  */
696 static int ltdb_index_dn_not(struct ldb_module *module,
697                              const struct ldb_parse_tree *tree,
698                              const struct ldb_message *index_list,
699                              struct dn_list *list)
700 {
701         /* the only way to do an indexed not would be if we could
702            negate the not via another not or if we knew the total
703            number of database elements so we could know that the
704            existing expression covered the whole database.
705
706            instead, we just give up, and rely on a full index scan
707            (unless an outer & manages to reduce the list)
708         */
709         return LDB_ERR_OPERATIONS_ERROR;
710 }
711
712
713 static bool ltdb_index_unique(struct ldb_context *ldb,
714                               const char *attr)
715 {
716         const struct ldb_schema_attribute *a;
717         a = ldb_schema_attribute_by_name(ldb, attr);
718         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
719                 return true;
720         }
721         return false;
722 }
723
724 /*
725   process an AND expression (intersection)
726  */
727 static int ltdb_index_dn_and(struct ldb_module *module,
728                              const struct ldb_parse_tree *tree,
729                              const struct ldb_message *index_list,
730                              struct dn_list *list)
731 {
732         struct ldb_context *ldb;
733         unsigned int i;
734         bool found;
735
736         ldb = ldb_module_get_ctx(module);
737
738         list->dn = NULL;
739         list->count = 0;
740
741         /* in the first pass we only look for unique simple
742            equality tests, in the hope of avoiding having to look
743            at any others */
744         for (i=0; i<tree->u.list.num_elements; i++) {
745                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
746                 int ret;
747
748                 if (subtree->operation != LDB_OP_EQUALITY ||
749                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
750                         continue;
751                 }
752                 
753                 ret = ltdb_index_dn(module, subtree, index_list, list);
754                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
755                         /* 0 && X == 0 */
756                         return LDB_ERR_NO_SUCH_OBJECT;
757                 }
758                 if (ret == LDB_SUCCESS) {
759                         /* a unique index match means we can
760                          * stop. Note that we don't care if we return
761                          * a few too many objects, due to later
762                          * filtering */
763                         return LDB_SUCCESS;
764                 }
765         }       
766
767         /* now do a full intersection */
768         found = false;
769
770         for (i=0; i<tree->u.list.num_elements; i++) {
771                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
772                 struct dn_list *list2;
773                 int ret;
774
775                 list2 = talloc_zero(list, struct dn_list);
776                 if (list2 == NULL) {
777                         ldb_module_oom(module);
778                         return LDB_ERR_OPERATIONS_ERROR;
779                 }
780                         
781                 ret = ltdb_index_dn(module, subtree, index_list, list2);
782
783                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
784                         /* X && 0 == 0 */
785                         list->dn = NULL;
786                         list->count = 0;
787                         talloc_free(list2);
788                         return LDB_ERR_NO_SUCH_OBJECT;
789                 }
790                 
791                 if (ret != LDB_SUCCESS) {
792                         /* this didn't adding anything */
793                         talloc_free(list2);
794                         continue;
795                 }
796
797                 if (!found) {
798                         talloc_reparent(list2, list, list->dn);
799                         list->dn = list2->dn;
800                         list->count = list2->count;
801                         found = true;
802                 } else if (!list_intersect(ldb, list, list2)) {
803                         talloc_free(list2);
804                         return LDB_ERR_OPERATIONS_ERROR;
805                 }
806                         
807                 if (list->count == 0) {
808                         list->dn = NULL;
809                         return LDB_ERR_NO_SUCH_OBJECT;
810                 }
811                         
812                 if (list->count < 2) {
813                         /* it isn't worth loading the next part of the tree */
814                         return LDB_SUCCESS;
815                 }
816         }       
817
818         if (!found) {
819                 /* none of the attributes were indexed */
820                 return LDB_ERR_OPERATIONS_ERROR;
821         }
822
823         return LDB_SUCCESS;
824 }
825         
826 /*
827   return a list of matching objects using a one-level index
828  */
829 static int ltdb_index_dn_one(struct ldb_module *module,
830                              struct ldb_dn *parent_dn,
831                              struct dn_list *list)
832 {
833         struct ldb_context *ldb;
834         struct ldb_dn *key;
835         struct ldb_val val;
836         int ret;
837
838         ldb = ldb_module_get_ctx(module);
839
840         /* work out the index key from the parent DN */
841         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
842         val.length = strlen((char *)val.data);
843         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
844         if (!key) {
845                 ldb_oom(ldb);
846                 return LDB_ERR_OPERATIONS_ERROR;
847         }
848
849         ret = ltdb_dn_list_load(module, key, list);
850         talloc_free(key);
851         if (ret != LDB_SUCCESS) {
852                 return ret;
853         }
854
855         if (list->count == 0) {
856                 return LDB_ERR_NO_SUCH_OBJECT;
857         }
858
859         return LDB_SUCCESS;
860 }
861
862 /*
863   return a list of dn's that might match a indexed search or
864   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
865  */
866 static int ltdb_index_dn(struct ldb_module *module,
867                          const struct ldb_parse_tree *tree,
868                          const struct ldb_message *index_list,
869                          struct dn_list *list)
870 {
871         int ret = LDB_ERR_OPERATIONS_ERROR;
872
873         switch (tree->operation) {
874         case LDB_OP_AND:
875                 ret = ltdb_index_dn_and(module, tree, index_list, list);
876                 break;
877
878         case LDB_OP_OR:
879                 ret = ltdb_index_dn_or(module, tree, index_list, list);
880                 break;
881
882         case LDB_OP_NOT:
883                 ret = ltdb_index_dn_not(module, tree, index_list, list);
884                 break;
885
886         case LDB_OP_EQUALITY:
887                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
888                 break;
889
890         case LDB_OP_SUBSTRING:
891         case LDB_OP_GREATER:
892         case LDB_OP_LESS:
893         case LDB_OP_PRESENT:
894         case LDB_OP_APPROX:
895         case LDB_OP_EXTENDED:
896                 /* we can't index with fancy bitops yet */
897                 ret = LDB_ERR_OPERATIONS_ERROR;
898                 break;
899         }
900
901         return ret;
902 }
903
904 /*
905   filter a candidate dn_list from an indexed search into a set of results
906   extracting just the given attributes
907 */
908 static int ltdb_index_filter(const struct dn_list *dn_list,
909                              struct ltdb_context *ac, 
910                              uint32_t *match_count)
911 {
912         struct ldb_context *ldb;
913         struct ldb_message *msg;
914         unsigned int i;
915
916         ldb = ldb_module_get_ctx(ac->module);
917
918         for (i = 0; i < dn_list->count; i++) {
919                 struct ldb_dn *dn;
920                 int ret;
921
922                 msg = ldb_msg_new(ac);
923                 if (!msg) {
924                         return LDB_ERR_OPERATIONS_ERROR;
925                 }
926
927                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
928                 if (dn == NULL) {
929                         talloc_free(msg);
930                         return LDB_ERR_OPERATIONS_ERROR;
931                 }
932
933                 ret = ltdb_search_dn1(ac->module, dn, msg);
934                 talloc_free(dn);
935                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
936                         /* the record has disappeared? yes, this can happen */
937                         talloc_free(msg);
938                         continue;
939                 }
940
941                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
942                         /* an internal error */
943                         talloc_free(msg);
944                         return LDB_ERR_OPERATIONS_ERROR;
945                 }
946
947                 if (!ldb_match_msg(ldb, msg,
948                                    ac->tree, ac->base, ac->scope)) {
949                         talloc_free(msg);
950                         continue;
951                 }
952
953                 /* filter the attributes that the user wants */
954                 ret = ltdb_filter_attrs(msg, ac->attrs);
955
956                 if (ret == -1) {
957                         talloc_free(msg);
958                         return LDB_ERR_OPERATIONS_ERROR;
959                 }
960
961                 ret = ldb_module_send_entry(ac->req, msg, NULL);
962                 if (ret != LDB_SUCCESS) {
963                         ac->request_terminated = true;
964                         return ret;
965                 }
966
967                 (*match_count)++;
968         }
969
970         return LDB_SUCCESS;
971 }
972
973 /*
974   remove any duplicated entries in a indexed result
975  */
976 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
977 {
978         unsigned int i, new_count;
979
980         if (list->count < 2) {
981                 return;
982         }
983
984         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
985
986         new_count = 1;
987         for (i=1; i<list->count; i++) {
988                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
989                         if (new_count != i) {
990                                 list->dn[new_count] = list->dn[i];
991                         }
992                         new_count++;
993                 }
994         }
995         
996         list->count = new_count;
997 }
998
999 /*
1000   search the database with a LDAP-like expression using indexes
1001   returns -1 if an indexed search is not possible, in which
1002   case the caller should call ltdb_search_full()
1003 */
1004 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1005 {
1006         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1007         struct dn_list *dn_list;
1008         int ret;
1009
1010         /* see if indexing is enabled */
1011         if (!ltdb->cache->attribute_indexes && 
1012             !ltdb->cache->one_level_indexes &&
1013             ac->scope != LDB_SCOPE_BASE) {
1014                 /* fallback to a full search */
1015                 return LDB_ERR_OPERATIONS_ERROR;
1016         }
1017
1018         dn_list = talloc_zero(ac, struct dn_list);
1019         if (dn_list == NULL) {
1020                 ldb_module_oom(ac->module);
1021                 return LDB_ERR_OPERATIONS_ERROR;
1022         }
1023
1024         switch (ac->scope) {
1025         case LDB_SCOPE_BASE:
1026                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1027                 if (dn_list->dn == NULL) {
1028                         ldb_module_oom(ac->module);
1029                         talloc_free(dn_list);
1030                         return LDB_ERR_OPERATIONS_ERROR;
1031                 }
1032                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1033                 if (dn_list->dn[0].data == NULL) {
1034                         ldb_module_oom(ac->module);
1035                         talloc_free(dn_list);
1036                         return LDB_ERR_OPERATIONS_ERROR;
1037                 }
1038                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1039                 dn_list->count = 1;
1040                 break;          
1041
1042         case LDB_SCOPE_ONELEVEL:
1043                 if (!ltdb->cache->one_level_indexes) {
1044                         talloc_free(dn_list);
1045                         return LDB_ERR_OPERATIONS_ERROR;
1046                 }
1047                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1048                 if (ret != LDB_SUCCESS) {
1049                         talloc_free(dn_list);
1050                         return ret;
1051                 }
1052                 break;
1053
1054         case LDB_SCOPE_SUBTREE:
1055         case LDB_SCOPE_DEFAULT:
1056                 if (!ltdb->cache->attribute_indexes) {
1057                         talloc_free(dn_list);
1058                         return LDB_ERR_OPERATIONS_ERROR;
1059                 }
1060                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1061                 if (ret != LDB_SUCCESS) {
1062                         talloc_free(dn_list);
1063                         return ret;
1064                 }
1065                 ltdb_dn_list_remove_duplicates(dn_list);
1066                 break;
1067         }
1068
1069         ret = ltdb_index_filter(dn_list, ac, match_count);
1070         talloc_free(dn_list);
1071         return ret;
1072 }
1073
1074 /*
1075   add an index entry for one message element
1076 */
1077 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1078                            struct ldb_message_element *el, int v_idx)
1079 {
1080         struct ldb_context *ldb;
1081         struct ldb_dn *dn_key;
1082         int ret;
1083         const struct ldb_schema_attribute *a;
1084         struct dn_list *list;
1085         unsigned alloc_len;
1086
1087         ldb = ldb_module_get_ctx(module);
1088
1089         list = talloc_zero(module, struct dn_list);
1090         if (list == NULL) {
1091                 return LDB_ERR_OPERATIONS_ERROR;
1092         }
1093
1094         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1095         if (!dn_key) {
1096                 talloc_free(list);
1097                 return LDB_ERR_OPERATIONS_ERROR;
1098         }
1099         talloc_steal(list, dn_key);
1100
1101         ret = ltdb_dn_list_load(module, dn_key, list);
1102         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1103                 talloc_free(list);
1104                 return ret;
1105         }
1106
1107         if (ltdb_dn_list_find_str(list, dn) != -1) {
1108                 talloc_free(list);
1109                 return LDB_SUCCESS;
1110         }
1111
1112         if (list->count > 0 &&
1113             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1114                 talloc_free(list);
1115                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1116                                        el->name, dn);
1117                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1118         }
1119
1120         /* overallocate the list a bit, to reduce the number of
1121          * realloc trigered copies */    
1122         alloc_len = ((list->count+1)+7) & ~7;
1123         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1124         if (list->dn == NULL) {
1125                 talloc_free(list);
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1129         list->dn[list->count].length = strlen(dn);
1130         list->count++;
1131
1132         ret = ltdb_dn_list_store(module, dn_key, list);
1133
1134         talloc_free(list);
1135
1136         return ret;
1137 }
1138
1139 /*
1140   add index entries for one elements in a message
1141  */
1142 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1143                              struct ldb_message_element *el)
1144 {
1145         unsigned int i;
1146         for (i = 0; i < el->num_values; i++) {
1147                 int ret = ltdb_index_add1(module, dn, el, i);
1148                 if (ret != LDB_SUCCESS) {
1149                         return ret;
1150                 }
1151         }
1152
1153         return LDB_SUCCESS;
1154 }
1155
1156 /*
1157   add index entries for all elements in a message
1158  */
1159 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1160                               struct ldb_message_element *elements, int num_el)
1161 {
1162         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1163         unsigned int i;
1164
1165         if (dn[0] == '@') {
1166                 return LDB_SUCCESS;
1167         }
1168
1169         if (ltdb->cache->indexlist->num_elements == 0) {
1170                 /* no indexed fields */
1171                 return LDB_SUCCESS;
1172         }
1173
1174         for (i = 0; i < num_el; i++) {
1175                 int ret;
1176                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1177                         continue;
1178                 }
1179                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1180                 if (ret != LDB_SUCCESS) {
1181                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1182                         ldb_asprintf_errstring(ldb,
1183                                                __location__ ": Failed to re-index %s in %s - %s",
1184                                                elements[i].name, dn, ldb_errstring(ldb));
1185                         return ret;
1186                 }
1187         }
1188
1189         return LDB_SUCCESS;
1190 }
1191
1192
1193 /*
1194   insert a one level index for a message
1195 */
1196 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1197 {
1198         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1199         struct ldb_message_element el;
1200         struct ldb_val val;
1201         struct ldb_dn *pdn;
1202         const char *dn;
1203         int ret;
1204
1205         /* We index for ONE Level only if requested */
1206         if (!ltdb->cache->one_level_indexes) {
1207                 return LDB_SUCCESS;
1208         }
1209
1210         pdn = ldb_dn_get_parent(module, msg->dn);
1211         if (pdn == NULL) {
1212                 return LDB_ERR_OPERATIONS_ERROR;
1213         }
1214
1215         dn = ldb_dn_get_linearized(msg->dn);
1216         if (dn == NULL) {
1217                 talloc_free(pdn);
1218                 return LDB_ERR_OPERATIONS_ERROR;
1219         }
1220
1221         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1222         if (val.data == NULL) {
1223                 talloc_free(pdn);
1224                 return LDB_ERR_OPERATIONS_ERROR;
1225         }
1226
1227         val.length = strlen((char *)val.data);
1228         el.name = LTDB_IDXONE;
1229         el.values = &val;
1230         el.num_values = 1;
1231
1232         if (add) {
1233                 ret = ltdb_index_add1(module, dn, &el, 0);
1234         } else { /* delete */
1235                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1236         }
1237
1238         talloc_free(pdn);
1239
1240         return ret;
1241 }
1242
1243 /*
1244   add the index entries for a new element in a record
1245   The caller guarantees that these element values are not yet indexed
1246 */
1247 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1248                            struct ldb_message_element *el)
1249 {
1250         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1251         if (ldb_dn_is_special(dn)) {
1252                 return LDB_SUCCESS;
1253         }
1254         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1255                 return LDB_SUCCESS;
1256         }
1257         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1258 }
1259
1260 /*
1261   add the index entries for a new record
1262 */
1263 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1264 {
1265         const char *dn;
1266         int ret;
1267
1268         if (ldb_dn_is_special(msg->dn)) {
1269                 return LDB_SUCCESS;
1270         }
1271
1272         dn = ldb_dn_get_linearized(msg->dn);
1273         if (dn == NULL) {
1274                 return LDB_ERR_OPERATIONS_ERROR;
1275         }
1276
1277         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1278         if (ret != LDB_SUCCESS) {
1279                 return ret;
1280         }
1281
1282         return ltdb_index_onelevel(module, msg, 1);
1283 }
1284
1285
1286 /*
1287   delete an index entry for one message element
1288 */
1289 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1290                          struct ldb_message_element *el, unsigned int v_idx)
1291 {
1292         struct ldb_context *ldb;
1293         struct ldb_dn *dn_key;
1294         const char *dn_str;
1295         int ret, i;
1296         unsigned int j;
1297         struct dn_list *list;
1298
1299         ldb = ldb_module_get_ctx(module);
1300
1301         dn_str = ldb_dn_get_linearized(dn);
1302         if (dn_str == NULL) {
1303                 return LDB_ERR_OPERATIONS_ERROR;
1304         }
1305
1306         if (dn_str[0] == '@') {
1307                 return LDB_SUCCESS;
1308         }
1309
1310         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1311         if (!dn_key) {
1312                 return LDB_ERR_OPERATIONS_ERROR;
1313         }
1314
1315         list = talloc_zero(dn_key, struct dn_list);
1316         if (list == NULL) {
1317                 talloc_free(dn_key);
1318                 return LDB_ERR_OPERATIONS_ERROR;
1319         }
1320
1321         ret = ltdb_dn_list_load(module, dn_key, list);
1322         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1323                 /* it wasn't indexed. Did we have an earlier error? If we did then
1324                    its gone now */
1325                 talloc_free(dn_key);
1326                 return LDB_SUCCESS;
1327         }
1328
1329         if (ret != LDB_SUCCESS) {
1330                 talloc_free(dn_key);
1331                 return ret;
1332         }
1333
1334         i = ltdb_dn_list_find_str(list, dn_str);
1335         if (i == -1) {
1336                 /* nothing to delete */
1337                 talloc_free(dn_key);
1338                 return LDB_SUCCESS;             
1339         }
1340
1341         j = (unsigned int) i;
1342         if (j != list->count - 1) {
1343                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1344         }
1345         list->count--;
1346         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1347
1348         ret = ltdb_dn_list_store(module, dn_key, list);
1349
1350         talloc_free(dn_key);
1351
1352         return ret;
1353 }
1354
1355 /*
1356   delete the index entries for a element
1357   return -1 on failure
1358 */
1359 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1360                            struct ldb_message_element *el)
1361 {
1362         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1363         const char *dn_str;
1364         int ret;
1365         unsigned int i;
1366
1367         if (!ltdb->cache->attribute_indexes) {
1368                 /* no indexed fields */
1369                 return LDB_SUCCESS;
1370         }
1371
1372         dn_str = ldb_dn_get_linearized(dn);
1373         if (dn_str == NULL) {
1374                 return LDB_ERR_OPERATIONS_ERROR;
1375         }
1376
1377         if (dn_str[0] == '@') {
1378                 return LDB_SUCCESS;
1379         }
1380
1381         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1382                 return LDB_SUCCESS;
1383         }
1384         for (i = 0; i < el->num_values; i++) {
1385                 ret = ltdb_index_del_value(module, dn, el, i);
1386                 if (ret != LDB_SUCCESS) {
1387                         return ret;
1388                 }
1389         }
1390
1391         return LDB_SUCCESS;
1392 }
1393
1394 /*
1395   delete the index entries for a record
1396   return -1 on failure
1397 */
1398 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1399 {
1400         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1401         int ret;
1402         unsigned int i;
1403
1404         if (ldb_dn_is_special(msg->dn)) {
1405                 return LDB_SUCCESS;
1406         }
1407
1408         ret = ltdb_index_onelevel(module, msg, 0);
1409         if (ret != LDB_SUCCESS) {
1410                 return ret;
1411         }
1412
1413         if (!ltdb->cache->attribute_indexes) {
1414                 /* no indexed fields */
1415                 return LDB_SUCCESS;
1416         }
1417
1418         for (i = 0; i < msg->num_elements; i++) {
1419                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1420                 if (ret != LDB_SUCCESS) {
1421                         return ret;
1422                 }
1423         }
1424
1425         return LDB_SUCCESS;
1426 }
1427
1428
1429 /*
1430   traversal function that deletes all @INDEX records
1431 */
1432 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1433 {
1434         struct ldb_module *module = state;
1435         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1436         const char *dnstr = "DN=" LTDB_INDEX ":";
1437         struct dn_list list;
1438         struct ldb_dn *dn;
1439         struct ldb_val v;
1440         int ret;
1441
1442         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1443                 return 0;
1444         }
1445         /* we need to put a empty list in the internal tdb for this
1446          * index entry */
1447         list.dn = NULL;
1448         list.count = 0;
1449         v.data = key.dptr;
1450         v.length = strnlen((char *)key.dptr, key.dsize);
1451
1452         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1453         ret = ltdb_dn_list_store(module, dn, &list);
1454         if (ret != LDB_SUCCESS) {
1455                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1456                                        "Unable to store null index for %s\n",
1457                                                 ldb_dn_get_linearized(dn));
1458                 talloc_free(dn);
1459                 return -1;
1460         }
1461         talloc_free(dn);
1462         return 0;
1463 }
1464
1465 struct ltdb_reindex_context {
1466         struct ldb_module *module;
1467         int error;
1468 };
1469
1470 /*
1471   traversal function that adds @INDEX records during a re index
1472 */
1473 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1474 {
1475         struct ldb_context *ldb;
1476         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1477         struct ldb_module *module = ctx->module;
1478         struct ldb_message *msg;
1479         const char *dn = NULL;
1480         int ret;
1481         TDB_DATA key2;
1482
1483         ldb = ldb_module_get_ctx(module);
1484
1485         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1486             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1487                 return 0;
1488         }
1489
1490         msg = ldb_msg_new(module);
1491         if (msg == NULL) {
1492                 return -1;
1493         }
1494
1495         ret = ltdb_unpack_data(module, &data, msg);
1496         if (ret != 0) {
1497                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1498                                                 ldb_dn_get_linearized(msg->dn));
1499                 talloc_free(msg);
1500                 return -1;
1501         }
1502
1503         /* check if the DN key has changed, perhaps due to the
1504            case insensitivity of an element changing */
1505         key2 = ltdb_key(module, msg->dn);
1506         if (key2.dptr == NULL) {
1507                 /* probably a corrupt record ... darn */
1508                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1509                                                 ldb_dn_get_linearized(msg->dn));
1510                 talloc_free(msg);
1511                 return 0;
1512         }
1513         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1514                 tdb_delete(tdb, key);
1515                 tdb_store(tdb, key2, data, 0);
1516         }
1517         talloc_free(key2.dptr);
1518
1519         if (msg->dn == NULL) {
1520                 dn = (char *)key.dptr + 3;
1521         } else {
1522                 dn = ldb_dn_get_linearized(msg->dn);
1523         }
1524
1525         ret = ltdb_index_onelevel(module, msg, 1);
1526         if (ret != LDB_SUCCESS) {
1527                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1528                           "Adding special ONE LEVEL index failed (%s)!",
1529                                                 ldb_dn_get_linearized(msg->dn));
1530                 talloc_free(msg);
1531                 return -1;
1532         }
1533
1534         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1535
1536         if (ret != LDB_SUCCESS) {
1537                 ctx->error = ret;
1538                 talloc_free(msg);
1539                 return -1;
1540         }
1541
1542         talloc_free(msg);
1543
1544         return 0;
1545 }
1546
1547 /*
1548   force a complete reindex of the database
1549 */
1550 int ltdb_reindex(struct ldb_module *module)
1551 {
1552         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1553         int ret;
1554         struct ltdb_reindex_context ctx;
1555
1556         if (ltdb_cache_reload(module) != 0) {
1557                 return LDB_ERR_OPERATIONS_ERROR;
1558         }
1559
1560         /* first traverse the database deleting any @INDEX records by
1561          * putting NULL entries in the in-memory tdb
1562          */
1563         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1564         if (ret == -1) {
1565                 return LDB_ERR_OPERATIONS_ERROR;
1566         }
1567
1568         /* if we don't have indexes we have nothing todo */
1569         if (ltdb->cache->indexlist->num_elements == 0) {
1570                 return LDB_SUCCESS;
1571         }
1572
1573         ctx.module = module;
1574         ctx.error = 0;
1575
1576         /* now traverse adding any indexes for normal LDB records */
1577         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1578         if (ret == -1) {
1579                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1580                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1581                 return LDB_ERR_OPERATIONS_ERROR;
1582         }
1583
1584         if (ctx.error != LDB_SUCCESS) {
1585                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1586                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1587                 return ctx.error;
1588         }
1589
1590         return LDB_SUCCESS;
1591 }