tdb_compat: use tdb_open_compat.
[nivanova/samba-autobuild/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         if (v1->length > v2->length && v1->data[v2->length] != 0) {
64                 return -1;
65         }
66         if (v1->length < v2->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         unsigned int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 /*
99   this is effectively a cast function, but with lots of paranoia
100   checks and also copes with CPUs that are fussy about pointer
101   alignment
102  */
103 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
104 {
105         struct dn_list *list;
106         if (rec.dsize != sizeof(void *)) {
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
108                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
109                 return NULL;
110         }
111         /* note that we can't just use a cast here, as rec.dptr may
112            not be aligned sufficiently for a pointer. A cast would cause
113            platforms like some ARM CPUs to crash */
114         memcpy(&list, rec.dptr, sizeof(void *));
115         list = talloc_get_type(list, struct dn_list);
116         if (list == NULL) {
117                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
118                                        "Bad type '%s' for idxptr", 
119                                        talloc_get_name(list));
120                 return NULL;
121         }
122         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
123                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
124                                        "Bad parent '%s' for idxptr", 
125                                        talloc_get_name(talloc_parent(list->dn)));
126                 return NULL;
127         }
128         return list;
129 }
130
131 /*
132   return the @IDX list in an index entry for a dn as a 
133   struct dn_list
134  */
135 static int ltdb_dn_list_load(struct ldb_module *module,
136                              struct ldb_dn *dn, struct dn_list *list)
137 {
138         struct ldb_message *msg;
139         int ret;
140         struct ldb_message_element *el;
141         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
142         TDB_DATA rec;
143         struct dn_list *list2;
144         TDB_DATA key;
145
146         list->dn = NULL;
147         list->count = 0;
148
149         /* see if we have any in-memory index entries */
150         if (ltdb->idxptr == NULL ||
151             ltdb->idxptr->itdb == NULL) {
152                 goto normal_index;
153         }
154
155         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
156         key.dsize = strlen((char *)key.dptr);
157
158         rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
159         if (rec.dptr == NULL) {
160                 goto normal_index;
161         }
162
163         /* we've found an in-memory index entry */
164         list2 = ltdb_index_idxptr(module, rec, true);
165         if (list2 == NULL) {
166                 free(rec.dptr);
167                 return LDB_ERR_OPERATIONS_ERROR;
168         }
169         free(rec.dptr);
170
171         *list = *list2;
172         return LDB_SUCCESS;
173
174 normal_index:
175         msg = ldb_msg_new(list);
176         if (msg == NULL) {
177                 return LDB_ERR_OPERATIONS_ERROR;
178         }
179
180         ret = ltdb_search_dn1(module, dn, msg);
181         if (ret != LDB_SUCCESS) {
182                 talloc_free(msg);
183                 return ret;
184         }
185
186         /* TODO: check indexing version number */
187
188         el = ldb_msg_find_element(msg, LTDB_IDX);
189         if (!el) {
190                 talloc_free(msg);
191                 return LDB_SUCCESS;
192         }
193
194         /* we avoid copying the strings by stealing the list */
195         list->dn = talloc_steal(list, el->values);
196         list->count = el->num_values;
197
198         return LDB_SUCCESS;
199 }
200
201
202 /*
203   save a dn_list into a full @IDX style record
204  */
205 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
206                                    struct dn_list *list)
207 {
208         struct ldb_message *msg;
209         int ret;
210
211         if (list->count == 0) {
212                 ret = ltdb_delete_noindex(module, dn);
213                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
214                         return LDB_SUCCESS;
215                 }
216                 return ret;
217         }
218
219         msg = ldb_msg_new(module);
220         if (!msg) {
221                 return ldb_module_oom(module);
222         }
223
224         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
225         if (ret != LDB_SUCCESS) {
226                 talloc_free(msg);
227                 return ldb_module_oom(module);
228         }
229
230         msg->dn = dn;
231         if (list->count > 0) {
232                 struct ldb_message_element *el;
233
234                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
235                 if (ret != LDB_SUCCESS) {
236                         talloc_free(msg);
237                         return ldb_module_oom(module);
238                 }
239                 el->values = list->dn;
240                 el->num_values = list->count;
241         }
242
243         ret = ltdb_store(module, msg, TDB_REPLACE);
244         talloc_free(msg);
245         return ret;
246 }
247
248 /*
249   save a dn_list into the database, in either @IDX or internal format
250  */
251 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
252                               struct dn_list *list)
253 {
254         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
255         TDB_DATA rec, key;
256         int ret;
257         struct dn_list *list2;
258
259         if (ltdb->idxptr == NULL) {
260                 return ltdb_dn_list_store_full(module, dn, list);
261         }
262
263         if (ltdb->idxptr->itdb == NULL) {
264                 ltdb->idxptr->itdb = tdb_open_compat(NULL, 1000, TDB_INTERNAL, O_RDWR, 0, NULL, NULL);
265                 if (ltdb->idxptr->itdb == NULL) {
266                         return LDB_ERR_OPERATIONS_ERROR;
267                 }
268         }
269
270         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
271         key.dsize = strlen((char *)key.dptr);
272
273         rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
274         if (rec.dptr != NULL) {
275                 list2 = ltdb_index_idxptr(module, rec, false);
276                 if (list2 == NULL) {
277                         free(rec.dptr);
278                         return LDB_ERR_OPERATIONS_ERROR;
279                 }
280                 free(rec.dptr);
281                 list2->dn = talloc_steal(list2, list->dn);
282                 list2->count = list->count;
283                 return LDB_SUCCESS;
284         }
285
286         list2 = talloc(ltdb->idxptr, struct dn_list);
287         if (list2 == NULL) {
288                 return LDB_ERR_OPERATIONS_ERROR;
289         }
290         list2->dn = talloc_steal(list2, list->dn);
291         list2->count = list->count;
292
293         rec.dptr = (uint8_t *)&list2;
294         rec.dsize = sizeof(void *);
295
296         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
297         if (ret != 0) {
298                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
299         }
300         return LDB_SUCCESS;
301 }
302
303 /*
304   traverse function for storing the in-memory index entries on disk
305  */
306 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
307 {
308         struct ldb_module *module = state;
309         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
310         struct ldb_dn *dn;
311         struct ldb_context *ldb = ldb_module_get_ctx(module);
312         struct ldb_val v;
313         struct dn_list *list;
314
315         list = ltdb_index_idxptr(module, data, true);
316         if (list == NULL) {
317                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
318                 return -1;
319         }
320
321         v.data = key.dptr;
322         v.length = strnlen((char *)key.dptr, key.dsize);
323
324         dn = ldb_dn_from_ldb_val(module, ldb, &v);
325         if (dn == NULL) {
326                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
327                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
328                 return -1;
329         }
330
331         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
332         talloc_free(dn);
333         if (ltdb->idxptr->error != 0) {
334                 return -1;
335         }
336         return 0;
337 }
338
339 /* cleanup the idxptr mode when transaction commits */
340 int ltdb_index_transaction_commit(struct ldb_module *module)
341 {
342         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
343         int ret;
344
345         struct ldb_context *ldb = ldb_module_get_ctx(module);
346
347         ldb_reset_err_string(ldb);
348
349         if (ltdb->idxptr->itdb) {
350                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
351                 tdb_close(ltdb->idxptr->itdb);
352         }
353
354         ret = ltdb->idxptr->error;
355         if (ret != LDB_SUCCESS) {
356                 if (!ldb_errstring(ldb)) {
357                         ldb_set_errstring(ldb, ldb_strerror(ret));
358                 }
359                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
360         }
361
362         talloc_free(ltdb->idxptr);
363         ltdb->idxptr = NULL;
364         return ret;
365 }
366
367 /* cleanup the idxptr mode when transaction cancels */
368 int ltdb_index_transaction_cancel(struct ldb_module *module)
369 {
370         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
371         if (ltdb->idxptr && ltdb->idxptr->itdb) {
372                 tdb_close(ltdb->idxptr->itdb);
373         }
374         talloc_free(ltdb->idxptr);
375         ltdb->idxptr = NULL;
376         return LDB_SUCCESS;
377 }
378
379
380 /*
381   return the dn key to be used for an index
382   the caller is responsible for freeing
383 */
384 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
385                                      const char *attr, const struct ldb_val *value,
386                                      const struct ldb_schema_attribute **ap)
387 {
388         struct ldb_dn *ret;
389         struct ldb_val v;
390         const struct ldb_schema_attribute *a;
391         char *attr_folded;
392         int r;
393
394         attr_folded = ldb_attr_casefold(ldb, attr);
395         if (!attr_folded) {
396                 return NULL;
397         }
398
399         a = ldb_schema_attribute_by_name(ldb, attr);
400         if (ap) {
401                 *ap = a;
402         }
403         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
404         if (r != LDB_SUCCESS) {
405                 const char *errstr = ldb_errstring(ldb);
406                 /* canonicalisation can be refused. For example, 
407                    a attribute that takes wildcards will refuse to canonicalise
408                    if the value contains a wildcard */
409                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
410                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
411                 talloc_free(attr_folded);
412                 return NULL;
413         }
414         if (ldb_should_b64_encode(ldb, &v)) {
415                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
416                 if (!vstr) {
417                         talloc_free(attr_folded);
418                         return NULL;
419                 }
420                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
421                 talloc_free(vstr);
422         } else {
423                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
424         }
425
426         if (v.data != value->data) {
427                 talloc_free(v.data);
428         }
429         talloc_free(attr_folded);
430
431         return ret;
432 }
433
434 /*
435   see if a attribute value is in the list of indexed attributes
436 */
437 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
438 {
439         unsigned int i;
440         struct ldb_message_element *el;
441
442         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
443         if (el == NULL) {
444                 return false;
445         }
446
447         /* TODO: this is too expensive! At least use a binary search */
448         for (i=0; i<el->num_values; i++) {
449                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
450                         return true;
451                 }
452         }
453         return false;
454 }
455
456 /*
457   in the following logic functions, the return value is treated as
458   follows:
459
460      LDB_SUCCESS: we found some matching index values
461
462      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
463
464      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
465                                we'll need a full search
466  */
467
468 /*
469   return a list of dn's that might match a simple indexed search (an
470   equality search only)
471  */
472 static int ltdb_index_dn_simple(struct ldb_module *module,
473                                 const struct ldb_parse_tree *tree,
474                                 const struct ldb_message *index_list,
475                                 struct dn_list *list)
476 {
477         struct ldb_context *ldb;
478         struct ldb_dn *dn;
479         int ret;
480
481         ldb = ldb_module_get_ctx(module);
482
483         list->count = 0;
484         list->dn = NULL;
485
486         /* if the attribute isn't in the list of indexed attributes then
487            this node needs a full search */
488         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
489                 return LDB_ERR_OPERATIONS_ERROR;
490         }
491
492         /* the attribute is indexed. Pull the list of DNs that match the 
493            search criterion */
494         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
495         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
496
497         ret = ltdb_dn_list_load(module, dn, list);
498         talloc_free(dn);
499         return ret;
500 }
501
502
503 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
504
505 /*
506   return a list of dn's that might match a leaf indexed search
507  */
508 static int ltdb_index_dn_leaf(struct ldb_module *module,
509                               const struct ldb_parse_tree *tree,
510                               const struct ldb_message *index_list,
511                               struct dn_list *list)
512 {
513         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
514                 list->dn = talloc_array(list, struct ldb_val, 1);
515                 if (list->dn == NULL) {
516                         ldb_module_oom(module);
517                         return LDB_ERR_OPERATIONS_ERROR;
518                 }
519                 list->dn[0] = tree->u.equality.value;
520                 list->count = 1;
521                 return LDB_SUCCESS;
522         }
523         return ltdb_index_dn_simple(module, tree, index_list, list);
524 }
525
526
527 /*
528   list intersection
529   list = list & list2
530 */
531 static bool list_intersect(struct ldb_context *ldb,
532                            struct dn_list *list, const struct dn_list *list2)
533 {
534         struct dn_list *list3;
535         unsigned int i;
536
537         if (list->count == 0) {
538                 /* 0 & X == 0 */
539                 return true;
540         }
541         if (list2->count == 0) {
542                 /* X & 0 == 0 */
543                 list->count = 0;
544                 list->dn = NULL;
545                 return true;
546         }
547
548         /* the indexing code is allowed to return a longer list than
549            what really matches, as all results are filtered by the
550            full expression at the end - this shortcut avoids a lot of
551            work in some cases */
552         if (list->count < 2 && list2->count > 10) {
553                 return true;
554         }
555         if (list2->count < 2 && list->count > 10) {
556                 list->count = list2->count;
557                 list->dn = list2->dn;
558                 /* note that list2 may not be the parent of list2->dn,
559                    as list2->dn may be owned by ltdb->idxptr. In that
560                    case we expect this reparent call to fail, which is
561                    OK */
562                 talloc_reparent(list2, list, list2->dn);
563                 return true;
564         }
565
566         list3 = talloc_zero(list, struct dn_list);
567         if (list3 == NULL) {
568                 return false;
569         }
570
571         list3->dn = talloc_array(list3, struct ldb_val, list->count);
572         if (!list3->dn) {
573                 talloc_free(list3);
574                 return false;
575         }
576         list3->count = 0;
577
578         for (i=0;i<list->count;i++) {
579                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
580                         list3->dn[list3->count] = list->dn[i];
581                         list3->count++;
582                 }
583         }
584
585         list->dn = talloc_steal(list, list3->dn);
586         list->count = list3->count;
587         talloc_free(list3);
588
589         return true;
590 }
591
592
593 /*
594   list union
595   list = list | list2
596 */
597 static bool list_union(struct ldb_context *ldb,
598                        struct dn_list *list, const struct dn_list *list2)
599 {
600         struct ldb_val *dn3;
601
602         if (list2->count == 0) {
603                 /* X | 0 == X */
604                 return true;
605         }
606
607         if (list->count == 0) {
608                 /* 0 | X == X */
609                 list->count = list2->count;
610                 list->dn = list2->dn;
611                 /* note that list2 may not be the parent of list2->dn,
612                    as list2->dn may be owned by ltdb->idxptr. In that
613                    case we expect this reparent call to fail, which is
614                    OK */
615                 talloc_reparent(list2, list, list2->dn);
616                 return true;
617         }
618
619         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
620         if (!dn3) {
621                 ldb_oom(ldb);
622                 return false;
623         }
624
625         /* we allow for duplicates here, and get rid of them later */
626         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
627         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
628
629         list->dn = dn3;
630         list->count += list2->count;
631
632         return true;
633 }
634
635 static int ltdb_index_dn(struct ldb_module *module,
636                          const struct ldb_parse_tree *tree,
637                          const struct ldb_message *index_list,
638                          struct dn_list *list);
639
640
641 /*
642   process an OR list (a union)
643  */
644 static int ltdb_index_dn_or(struct ldb_module *module,
645                             const struct ldb_parse_tree *tree,
646                             const struct ldb_message *index_list,
647                             struct dn_list *list)
648 {
649         struct ldb_context *ldb;
650         unsigned int i;
651
652         ldb = ldb_module_get_ctx(module);
653
654         list->dn = NULL;
655         list->count = 0;
656
657         for (i=0; i<tree->u.list.num_elements; i++) {
658                 struct dn_list *list2;
659                 int ret;
660
661                 list2 = talloc_zero(list, struct dn_list);
662                 if (list2 == NULL) {
663                         return LDB_ERR_OPERATIONS_ERROR;
664                 }
665
666                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
667
668                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
669                         /* X || 0 == X */
670                         talloc_free(list2);
671                         continue;
672                 }
673
674                 if (ret != LDB_SUCCESS) {
675                         /* X || * == * */
676                         talloc_free(list2);
677                         return ret;
678                 }
679
680                 if (!list_union(ldb, list, list2)) {
681                         talloc_free(list2);
682                         return LDB_ERR_OPERATIONS_ERROR;
683                 }
684         }
685
686         if (list->count == 0) {
687                 return LDB_ERR_NO_SUCH_OBJECT;
688         }
689
690         return LDB_SUCCESS;
691 }
692
693
694 /*
695   NOT an index results
696  */
697 static int ltdb_index_dn_not(struct ldb_module *module,
698                              const struct ldb_parse_tree *tree,
699                              const struct ldb_message *index_list,
700                              struct dn_list *list)
701 {
702         /* the only way to do an indexed not would be if we could
703            negate the not via another not or if we knew the total
704            number of database elements so we could know that the
705            existing expression covered the whole database.
706
707            instead, we just give up, and rely on a full index scan
708            (unless an outer & manages to reduce the list)
709         */
710         return LDB_ERR_OPERATIONS_ERROR;
711 }
712
713
714 static bool ltdb_index_unique(struct ldb_context *ldb,
715                               const char *attr)
716 {
717         const struct ldb_schema_attribute *a;
718         a = ldb_schema_attribute_by_name(ldb, attr);
719         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
720                 return true;
721         }
722         return false;
723 }
724
725 /*
726   process an AND expression (intersection)
727  */
728 static int ltdb_index_dn_and(struct ldb_module *module,
729                              const struct ldb_parse_tree *tree,
730                              const struct ldb_message *index_list,
731                              struct dn_list *list)
732 {
733         struct ldb_context *ldb;
734         unsigned int i;
735         bool found;
736
737         ldb = ldb_module_get_ctx(module);
738
739         list->dn = NULL;
740         list->count = 0;
741
742         /* in the first pass we only look for unique simple
743            equality tests, in the hope of avoiding having to look
744            at any others */
745         for (i=0; i<tree->u.list.num_elements; i++) {
746                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
747                 int ret;
748
749                 if (subtree->operation != LDB_OP_EQUALITY ||
750                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
751                         continue;
752                 }
753                 
754                 ret = ltdb_index_dn(module, subtree, index_list, list);
755                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
756                         /* 0 && X == 0 */
757                         return LDB_ERR_NO_SUCH_OBJECT;
758                 }
759                 if (ret == LDB_SUCCESS) {
760                         /* a unique index match means we can
761                          * stop. Note that we don't care if we return
762                          * a few too many objects, due to later
763                          * filtering */
764                         return LDB_SUCCESS;
765                 }
766         }       
767
768         /* now do a full intersection */
769         found = false;
770
771         for (i=0; i<tree->u.list.num_elements; i++) {
772                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
773                 struct dn_list *list2;
774                 int ret;
775
776                 list2 = talloc_zero(list, struct dn_list);
777                 if (list2 == NULL) {
778                         return ldb_module_oom(module);
779                 }
780                         
781                 ret = ltdb_index_dn(module, subtree, index_list, list2);
782
783                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
784                         /* X && 0 == 0 */
785                         list->dn = NULL;
786                         list->count = 0;
787                         talloc_free(list2);
788                         return LDB_ERR_NO_SUCH_OBJECT;
789                 }
790                 
791                 if (ret != LDB_SUCCESS) {
792                         /* this didn't adding anything */
793                         talloc_free(list2);
794                         continue;
795                 }
796
797                 if (!found) {
798                         talloc_reparent(list2, list, list->dn);
799                         list->dn = list2->dn;
800                         list->count = list2->count;
801                         found = true;
802                 } else if (!list_intersect(ldb, list, list2)) {
803                         talloc_free(list2);
804                         return LDB_ERR_OPERATIONS_ERROR;
805                 }
806                         
807                 if (list->count == 0) {
808                         list->dn = NULL;
809                         return LDB_ERR_NO_SUCH_OBJECT;
810                 }
811                         
812                 if (list->count < 2) {
813                         /* it isn't worth loading the next part of the tree */
814                         return LDB_SUCCESS;
815                 }
816         }       
817
818         if (!found) {
819                 /* none of the attributes were indexed */
820                 return LDB_ERR_OPERATIONS_ERROR;
821         }
822
823         return LDB_SUCCESS;
824 }
825         
826 /*
827   return a list of matching objects using a one-level index
828  */
829 static int ltdb_index_dn_one(struct ldb_module *module,
830                              struct ldb_dn *parent_dn,
831                              struct dn_list *list)
832 {
833         struct ldb_context *ldb;
834         struct ldb_dn *key;
835         struct ldb_val val;
836         int ret;
837
838         ldb = ldb_module_get_ctx(module);
839
840         /* work out the index key from the parent DN */
841         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
842         val.length = strlen((char *)val.data);
843         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
844         if (!key) {
845                 ldb_oom(ldb);
846                 return LDB_ERR_OPERATIONS_ERROR;
847         }
848
849         ret = ltdb_dn_list_load(module, key, list);
850         talloc_free(key);
851         if (ret != LDB_SUCCESS) {
852                 return ret;
853         }
854
855         if (list->count == 0) {
856                 return LDB_ERR_NO_SUCH_OBJECT;
857         }
858
859         return LDB_SUCCESS;
860 }
861
862 /*
863   return a list of dn's that might match a indexed search or
864   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
865  */
866 static int ltdb_index_dn(struct ldb_module *module,
867                          const struct ldb_parse_tree *tree,
868                          const struct ldb_message *index_list,
869                          struct dn_list *list)
870 {
871         int ret = LDB_ERR_OPERATIONS_ERROR;
872
873         switch (tree->operation) {
874         case LDB_OP_AND:
875                 ret = ltdb_index_dn_and(module, tree, index_list, list);
876                 break;
877
878         case LDB_OP_OR:
879                 ret = ltdb_index_dn_or(module, tree, index_list, list);
880                 break;
881
882         case LDB_OP_NOT:
883                 ret = ltdb_index_dn_not(module, tree, index_list, list);
884                 break;
885
886         case LDB_OP_EQUALITY:
887                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
888                 break;
889
890         case LDB_OP_SUBSTRING:
891         case LDB_OP_GREATER:
892         case LDB_OP_LESS:
893         case LDB_OP_PRESENT:
894         case LDB_OP_APPROX:
895         case LDB_OP_EXTENDED:
896                 /* we can't index with fancy bitops yet */
897                 ret = LDB_ERR_OPERATIONS_ERROR;
898                 break;
899         }
900
901         return ret;
902 }
903
904 /*
905   filter a candidate dn_list from an indexed search into a set of results
906   extracting just the given attributes
907 */
908 static int ltdb_index_filter(const struct dn_list *dn_list,
909                              struct ltdb_context *ac, 
910                              uint32_t *match_count)
911 {
912         struct ldb_context *ldb;
913         struct ldb_message *msg;
914         unsigned int i;
915
916         ldb = ldb_module_get_ctx(ac->module);
917
918         for (i = 0; i < dn_list->count; i++) {
919                 struct ldb_dn *dn;
920                 int ret;
921                 bool matched;
922
923                 msg = ldb_msg_new(ac);
924                 if (!msg) {
925                         return LDB_ERR_OPERATIONS_ERROR;
926                 }
927
928                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
929                 if (dn == NULL) {
930                         talloc_free(msg);
931                         return LDB_ERR_OPERATIONS_ERROR;
932                 }
933
934                 ret = ltdb_search_dn1(ac->module, dn, msg);
935                 talloc_free(dn);
936                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
937                         /* the record has disappeared? yes, this can happen */
938                         talloc_free(msg);
939                         continue;
940                 }
941
942                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
943                         /* an internal error */
944                         talloc_free(msg);
945                         return LDB_ERR_OPERATIONS_ERROR;
946                 }
947
948                 ret = ldb_match_msg_error(ldb, msg,
949                                           ac->tree, ac->base, ac->scope, &matched);
950                 if (ret != LDB_SUCCESS) {
951                         talloc_free(msg);
952                         return ret;
953                 }
954                 if (!matched) {
955                         talloc_free(msg);
956                         continue;
957                 }
958
959                 /* filter the attributes that the user wants */
960                 ret = ltdb_filter_attrs(msg, ac->attrs);
961
962                 if (ret == -1) {
963                         talloc_free(msg);
964                         return LDB_ERR_OPERATIONS_ERROR;
965                 }
966
967                 ret = ldb_module_send_entry(ac->req, msg, NULL);
968                 if (ret != LDB_SUCCESS) {
969                         /* Regardless of success or failure, the msg
970                          * is the callbacks responsiblity, and should
971                          * not be talloc_free()'ed */
972                         ac->request_terminated = true;
973                         return ret;
974                 }
975
976                 (*match_count)++;
977         }
978
979         return LDB_SUCCESS;
980 }
981
982 /*
983   remove any duplicated entries in a indexed result
984  */
985 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
986 {
987         unsigned int i, new_count;
988
989         if (list->count < 2) {
990                 return;
991         }
992
993         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
994
995         new_count = 1;
996         for (i=1; i<list->count; i++) {
997                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
998                         if (new_count != i) {
999                                 list->dn[new_count] = list->dn[i];
1000                         }
1001                         new_count++;
1002                 }
1003         }
1004         
1005         list->count = new_count;
1006 }
1007
1008 /*
1009   search the database with a LDAP-like expression using indexes
1010   returns -1 if an indexed search is not possible, in which
1011   case the caller should call ltdb_search_full()
1012 */
1013 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1014 {
1015         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1016         struct dn_list *dn_list;
1017         int ret;
1018
1019         /* see if indexing is enabled */
1020         if (!ltdb->cache->attribute_indexes && 
1021             !ltdb->cache->one_level_indexes &&
1022             ac->scope != LDB_SCOPE_BASE) {
1023                 /* fallback to a full search */
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026
1027         dn_list = talloc_zero(ac, struct dn_list);
1028         if (dn_list == NULL) {
1029                 return ldb_module_oom(ac->module);
1030         }
1031
1032         switch (ac->scope) {
1033         case LDB_SCOPE_BASE:
1034                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1035                 if (dn_list->dn == NULL) {
1036                         talloc_free(dn_list);
1037                         return ldb_module_oom(ac->module);
1038                 }
1039                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1040                 if (dn_list->dn[0].data == NULL) {
1041                         talloc_free(dn_list);
1042                         return ldb_module_oom(ac->module);
1043                 }
1044                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1045                 dn_list->count = 1;
1046                 break;          
1047
1048         case LDB_SCOPE_ONELEVEL:
1049                 if (!ltdb->cache->one_level_indexes) {
1050                         talloc_free(dn_list);
1051                         return LDB_ERR_OPERATIONS_ERROR;
1052                 }
1053                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1054                 if (ret != LDB_SUCCESS) {
1055                         talloc_free(dn_list);
1056                         return ret;
1057                 }
1058                 break;
1059
1060         case LDB_SCOPE_SUBTREE:
1061         case LDB_SCOPE_DEFAULT:
1062                 if (!ltdb->cache->attribute_indexes) {
1063                         talloc_free(dn_list);
1064                         return LDB_ERR_OPERATIONS_ERROR;
1065                 }
1066                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1067                 if (ret != LDB_SUCCESS) {
1068                         talloc_free(dn_list);
1069                         return ret;
1070                 }
1071                 ltdb_dn_list_remove_duplicates(dn_list);
1072                 break;
1073         }
1074
1075         ret = ltdb_index_filter(dn_list, ac, match_count);
1076         talloc_free(dn_list);
1077         return ret;
1078 }
1079
1080 /*
1081   add an index entry for one message element
1082 */
1083 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1084                            struct ldb_message_element *el, int v_idx)
1085 {
1086         struct ldb_context *ldb;
1087         struct ldb_dn *dn_key;
1088         int ret;
1089         const struct ldb_schema_attribute *a;
1090         struct dn_list *list;
1091         unsigned alloc_len;
1092
1093         ldb = ldb_module_get_ctx(module);
1094
1095         list = talloc_zero(module, struct dn_list);
1096         if (list == NULL) {
1097                 return LDB_ERR_OPERATIONS_ERROR;
1098         }
1099
1100         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1101         if (!dn_key) {
1102                 talloc_free(list);
1103                 return LDB_ERR_OPERATIONS_ERROR;
1104         }
1105         talloc_steal(list, dn_key);
1106
1107         ret = ltdb_dn_list_load(module, dn_key, list);
1108         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1109                 talloc_free(list);
1110                 return ret;
1111         }
1112
1113         if (ltdb_dn_list_find_str(list, dn) != -1) {
1114                 talloc_free(list);
1115                 return LDB_SUCCESS;
1116         }
1117
1118         if (list->count > 0 &&
1119             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1120                 talloc_free(list);
1121                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1122                                        el->name, dn);
1123                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1124         }
1125
1126         /* overallocate the list a bit, to reduce the number of
1127          * realloc trigered copies */    
1128         alloc_len = ((list->count+1)+7) & ~7;
1129         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1130         if (list->dn == NULL) {
1131                 talloc_free(list);
1132                 return LDB_ERR_OPERATIONS_ERROR;
1133         }
1134         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1135         list->dn[list->count].length = strlen(dn);
1136         list->count++;
1137
1138         ret = ltdb_dn_list_store(module, dn_key, list);
1139
1140         talloc_free(list);
1141
1142         return ret;
1143 }
1144
1145 /*
1146   add index entries for one elements in a message
1147  */
1148 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1149                              struct ldb_message_element *el)
1150 {
1151         unsigned int i;
1152         for (i = 0; i < el->num_values; i++) {
1153                 int ret = ltdb_index_add1(module, dn, el, i);
1154                 if (ret != LDB_SUCCESS) {
1155                         return ret;
1156                 }
1157         }
1158
1159         return LDB_SUCCESS;
1160 }
1161
1162 /*
1163   add index entries for all elements in a message
1164  */
1165 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1166                               struct ldb_message_element *elements, int num_el)
1167 {
1168         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1169         unsigned int i;
1170
1171         if (dn[0] == '@') {
1172                 return LDB_SUCCESS;
1173         }
1174
1175         if (ltdb->cache->indexlist->num_elements == 0) {
1176                 /* no indexed fields */
1177                 return LDB_SUCCESS;
1178         }
1179
1180         for (i = 0; i < num_el; i++) {
1181                 int ret;
1182                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1183                         continue;
1184                 }
1185                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1186                 if (ret != LDB_SUCCESS) {
1187                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1188                         ldb_asprintf_errstring(ldb,
1189                                                __location__ ": Failed to re-index %s in %s - %s",
1190                                                elements[i].name, dn, ldb_errstring(ldb));
1191                         return ret;
1192                 }
1193         }
1194
1195         return LDB_SUCCESS;
1196 }
1197
1198
1199 /*
1200   insert a one level index for a message
1201 */
1202 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1203 {
1204         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1205         struct ldb_message_element el;
1206         struct ldb_val val;
1207         struct ldb_dn *pdn;
1208         const char *dn;
1209         int ret;
1210
1211         /* We index for ONE Level only if requested */
1212         if (!ltdb->cache->one_level_indexes) {
1213                 return LDB_SUCCESS;
1214         }
1215
1216         pdn = ldb_dn_get_parent(module, msg->dn);
1217         if (pdn == NULL) {
1218                 return LDB_ERR_OPERATIONS_ERROR;
1219         }
1220
1221         dn = ldb_dn_get_linearized(msg->dn);
1222         if (dn == NULL) {
1223                 talloc_free(pdn);
1224                 return LDB_ERR_OPERATIONS_ERROR;
1225         }
1226
1227         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1228         if (val.data == NULL) {
1229                 talloc_free(pdn);
1230                 return LDB_ERR_OPERATIONS_ERROR;
1231         }
1232
1233         val.length = strlen((char *)val.data);
1234         el.name = LTDB_IDXONE;
1235         el.values = &val;
1236         el.num_values = 1;
1237
1238         if (add) {
1239                 ret = ltdb_index_add1(module, dn, &el, 0);
1240         } else { /* delete */
1241                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1242         }
1243
1244         talloc_free(pdn);
1245
1246         return ret;
1247 }
1248
1249 /*
1250   add the index entries for a new element in a record
1251   The caller guarantees that these element values are not yet indexed
1252 */
1253 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1254                            struct ldb_message_element *el)
1255 {
1256         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1257         if (ldb_dn_is_special(dn)) {
1258                 return LDB_SUCCESS;
1259         }
1260         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1261                 return LDB_SUCCESS;
1262         }
1263         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1264 }
1265
1266 /*
1267   add the index entries for a new record
1268 */
1269 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1270 {
1271         const char *dn;
1272         int ret;
1273
1274         if (ldb_dn_is_special(msg->dn)) {
1275                 return LDB_SUCCESS;
1276         }
1277
1278         dn = ldb_dn_get_linearized(msg->dn);
1279         if (dn == NULL) {
1280                 return LDB_ERR_OPERATIONS_ERROR;
1281         }
1282
1283         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1284         if (ret != LDB_SUCCESS) {
1285                 return ret;
1286         }
1287
1288         return ltdb_index_onelevel(module, msg, 1);
1289 }
1290
1291
1292 /*
1293   delete an index entry for one message element
1294 */
1295 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1296                          struct ldb_message_element *el, unsigned int v_idx)
1297 {
1298         struct ldb_context *ldb;
1299         struct ldb_dn *dn_key;
1300         const char *dn_str;
1301         int ret, i;
1302         unsigned int j;
1303         struct dn_list *list;
1304
1305         ldb = ldb_module_get_ctx(module);
1306
1307         dn_str = ldb_dn_get_linearized(dn);
1308         if (dn_str == NULL) {
1309                 return LDB_ERR_OPERATIONS_ERROR;
1310         }
1311
1312         if (dn_str[0] == '@') {
1313                 return LDB_SUCCESS;
1314         }
1315
1316         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1317         if (!dn_key) {
1318                 return LDB_ERR_OPERATIONS_ERROR;
1319         }
1320
1321         list = talloc_zero(dn_key, struct dn_list);
1322         if (list == NULL) {
1323                 talloc_free(dn_key);
1324                 return LDB_ERR_OPERATIONS_ERROR;
1325         }
1326
1327         ret = ltdb_dn_list_load(module, dn_key, list);
1328         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1329                 /* it wasn't indexed. Did we have an earlier error? If we did then
1330                    its gone now */
1331                 talloc_free(dn_key);
1332                 return LDB_SUCCESS;
1333         }
1334
1335         if (ret != LDB_SUCCESS) {
1336                 talloc_free(dn_key);
1337                 return ret;
1338         }
1339
1340         i = ltdb_dn_list_find_str(list, dn_str);
1341         if (i == -1) {
1342                 /* nothing to delete */
1343                 talloc_free(dn_key);
1344                 return LDB_SUCCESS;             
1345         }
1346
1347         j = (unsigned int) i;
1348         if (j != list->count - 1) {
1349                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1350         }
1351         list->count--;
1352         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1353
1354         ret = ltdb_dn_list_store(module, dn_key, list);
1355
1356         talloc_free(dn_key);
1357
1358         return ret;
1359 }
1360
1361 /*
1362   delete the index entries for a element
1363   return -1 on failure
1364 */
1365 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1366                            struct ldb_message_element *el)
1367 {
1368         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1369         const char *dn_str;
1370         int ret;
1371         unsigned int i;
1372
1373         if (!ltdb->cache->attribute_indexes) {
1374                 /* no indexed fields */
1375                 return LDB_SUCCESS;
1376         }
1377
1378         dn_str = ldb_dn_get_linearized(dn);
1379         if (dn_str == NULL) {
1380                 return LDB_ERR_OPERATIONS_ERROR;
1381         }
1382
1383         if (dn_str[0] == '@') {
1384                 return LDB_SUCCESS;
1385         }
1386
1387         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1388                 return LDB_SUCCESS;
1389         }
1390         for (i = 0; i < el->num_values; i++) {
1391                 ret = ltdb_index_del_value(module, dn, el, i);
1392                 if (ret != LDB_SUCCESS) {
1393                         return ret;
1394                 }
1395         }
1396
1397         return LDB_SUCCESS;
1398 }
1399
1400 /*
1401   delete the index entries for a record
1402   return -1 on failure
1403 */
1404 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1405 {
1406         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1407         int ret;
1408         unsigned int i;
1409
1410         if (ldb_dn_is_special(msg->dn)) {
1411                 return LDB_SUCCESS;
1412         }
1413
1414         ret = ltdb_index_onelevel(module, msg, 0);
1415         if (ret != LDB_SUCCESS) {
1416                 return ret;
1417         }
1418
1419         if (!ltdb->cache->attribute_indexes) {
1420                 /* no indexed fields */
1421                 return LDB_SUCCESS;
1422         }
1423
1424         for (i = 0; i < msg->num_elements; i++) {
1425                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1426                 if (ret != LDB_SUCCESS) {
1427                         return ret;
1428                 }
1429         }
1430
1431         return LDB_SUCCESS;
1432 }
1433
1434
1435 /*
1436   traversal function that deletes all @INDEX records
1437 */
1438 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1439 {
1440         struct ldb_module *module = state;
1441         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1442         const char *dnstr = "DN=" LTDB_INDEX ":";
1443         struct dn_list list;
1444         struct ldb_dn *dn;
1445         struct ldb_val v;
1446         int ret;
1447
1448         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1449                 return 0;
1450         }
1451         /* we need to put a empty list in the internal tdb for this
1452          * index entry */
1453         list.dn = NULL;
1454         list.count = 0;
1455
1456         /* the offset of 3 is to remove the DN= prefix. */
1457         v.data = key.dptr + 3;
1458         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1459
1460         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1461         ret = ltdb_dn_list_store(module, dn, &list);
1462         if (ret != LDB_SUCCESS) {
1463                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1464                                        "Unable to store null index for %s\n",
1465                                                 ldb_dn_get_linearized(dn));
1466                 talloc_free(dn);
1467                 return -1;
1468         }
1469         talloc_free(dn);
1470         return 0;
1471 }
1472
1473 struct ltdb_reindex_context {
1474         struct ldb_module *module;
1475         int error;
1476 };
1477
1478 /*
1479   traversal function that adds @INDEX records during a re index
1480 */
1481 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1482 {
1483         struct ldb_context *ldb;
1484         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1485         struct ldb_module *module = ctx->module;
1486         struct ldb_message *msg;
1487         const char *dn = NULL;
1488         int ret;
1489         TDB_DATA key2;
1490
1491         ldb = ldb_module_get_ctx(module);
1492
1493         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1494             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1495                 return 0;
1496         }
1497
1498         msg = ldb_msg_new(module);
1499         if (msg == NULL) {
1500                 return -1;
1501         }
1502
1503         ret = ltdb_unpack_data(module, &data, msg);
1504         if (ret != 0) {
1505                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1506                                                 ldb_dn_get_linearized(msg->dn));
1507                 talloc_free(msg);
1508                 return -1;
1509         }
1510
1511         /* check if the DN key has changed, perhaps due to the
1512            case insensitivity of an element changing */
1513         key2 = ltdb_key(module, msg->dn);
1514         if (key2.dptr == NULL) {
1515                 /* probably a corrupt record ... darn */
1516                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1517                                                 ldb_dn_get_linearized(msg->dn));
1518                 talloc_free(msg);
1519                 return 0;
1520         }
1521         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1522                 tdb_delete(tdb, key);
1523                 tdb_store(tdb, key2, data, 0);
1524         }
1525         talloc_free(key2.dptr);
1526
1527         if (msg->dn == NULL) {
1528                 dn = (char *)key.dptr + 3;
1529         } else {
1530                 dn = ldb_dn_get_linearized(msg->dn);
1531         }
1532
1533         ret = ltdb_index_onelevel(module, msg, 1);
1534         if (ret != LDB_SUCCESS) {
1535                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1536                           "Adding special ONE LEVEL index failed (%s)!",
1537                                                 ldb_dn_get_linearized(msg->dn));
1538                 talloc_free(msg);
1539                 return -1;
1540         }
1541
1542         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1543
1544         if (ret != LDB_SUCCESS) {
1545                 ctx->error = ret;
1546                 talloc_free(msg);
1547                 return -1;
1548         }
1549
1550         talloc_free(msg);
1551
1552         return 0;
1553 }
1554
1555 /*
1556   force a complete reindex of the database
1557 */
1558 int ltdb_reindex(struct ldb_module *module)
1559 {
1560         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1561         int ret;
1562         struct ltdb_reindex_context ctx;
1563
1564         if (ltdb_cache_reload(module) != 0) {
1565                 return LDB_ERR_OPERATIONS_ERROR;
1566         }
1567
1568         /* first traverse the database deleting any @INDEX records by
1569          * putting NULL entries in the in-memory tdb
1570          */
1571         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1572         if (ret < 0) {
1573                 return LDB_ERR_OPERATIONS_ERROR;
1574         }
1575
1576         /* if we don't have indexes we have nothing todo */
1577         if (ltdb->cache->indexlist->num_elements == 0) {
1578                 return LDB_SUCCESS;
1579         }
1580
1581         ctx.module = module;
1582         ctx.error = 0;
1583
1584         /* now traverse adding any indexes for normal LDB records */
1585         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1586         if (ret < 0) {
1587                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1588                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1589                 return LDB_ERR_OPERATIONS_ERROR;
1590         }
1591
1592         if (ctx.error != LDB_SUCCESS) {
1593                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1594                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1595                 return ctx.error;
1596         }
1597
1598         return LDB_SUCCESS;
1599 }