ldb_tdb: Add an index shortcut for a <GUID= DN
[samba.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 static int ltdb_write_index_dn_guid(struct ldb_module *module,
48                                     const struct ldb_message *msg,
49                                     int add);
50 static int ltdb_index_dn_base_dn(struct ldb_module *module,
51                                  struct ltdb_private *ltdb,
52                                  struct ldb_dn *base_dn,
53                                  struct dn_list *dn_list);
54
55 /* we put a @IDXVERSION attribute on index entries. This
56    allows us to tell if it was written by an older version
57 */
58 #define LTDB_INDEXING_VERSION 2
59
60 #define LTDB_GUID_INDEXING_VERSION 3
61
62 /* enable the idxptr mode when transactions start */
63 int ltdb_index_transaction_start(struct ldb_module *module)
64 {
65         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
66         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
67         if (ltdb->idxptr == NULL) {
68                 return ldb_oom(ldb_module_get_ctx(module));
69         }
70
71         return LDB_SUCCESS;
72 }
73
74 /*
75   see if two ldb_val structures contain exactly the same data
76   return -1 or 1 for a mismatch, 0 for match
77 */
78 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
79                                          const struct ldb_val *v2)
80 {
81         if (v1->length > v2->length) {
82                 return -1;
83         }
84         if (v1->length < v2->length) {
85                 return 1;
86         }
87         return memcmp(v1->data, v2->data, v1->length);
88 }
89
90
91 /*
92   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
93   binary-safe comparison for the 'dn' returns -1 if not found
94
95   This is therefore safe when the value is a GUID in the future
96  */
97 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
98                                  const struct dn_list *list,
99                                  const struct ldb_val *v)
100 {
101         unsigned int i;
102         for (i=0; i<list->count; i++) {
103                 if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
104                         return i;
105                 }
106         }
107         return -1;
108 }
109
110 /*
111   find a entry in a dn_list. Uses a case sensitive comparison with the dn
112   returns -1 if not found
113  */
114 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
115                                  struct dn_list *list,
116                                  const struct ldb_message *msg)
117 {
118         struct ldb_val v;
119         const struct ldb_val *key_val;
120         if (ltdb->cache->GUID_index_attribute == NULL) {
121                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
122                 v.data = discard_const_p(unsigned char, dn_str);
123                 v.length = strlen(dn_str);
124         } else {
125                 key_val = ldb_msg_find_ldb_val(msg,
126                                                ltdb->cache->GUID_index_attribute);
127                 if (key_val == NULL) {
128                         return -1;
129                 }
130                 v = *key_val;
131         }
132         return ltdb_dn_list_find_val(ltdb, list, &v);
133 }
134
135 /*
136   this is effectively a cast function, but with lots of paranoia
137   checks and also copes with CPUs that are fussy about pointer
138   alignment
139  */
140 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
141 {
142         struct dn_list *list;
143         if (rec.dsize != sizeof(void *)) {
144                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
145                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
146                 return NULL;
147         }
148         /* note that we can't just use a cast here, as rec.dptr may
149            not be aligned sufficiently for a pointer. A cast would cause
150            platforms like some ARM CPUs to crash */
151         memcpy(&list, rec.dptr, sizeof(void *));
152         list = talloc_get_type(list, struct dn_list);
153         if (list == NULL) {
154                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
155                                        "Bad type '%s' for idxptr",
156                                        talloc_get_name(list));
157                 return NULL;
158         }
159         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
160                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
161                                        "Bad parent '%s' for idxptr",
162                                        talloc_get_name(talloc_parent(list->dn)));
163                 return NULL;
164         }
165         return list;
166 }
167
168 /*
169   return the @IDX list in an index entry for a dn as a
170   struct dn_list
171  */
172 static int ltdb_dn_list_load(struct ldb_module *module,
173                              struct ltdb_private *ltdb,
174                              struct ldb_dn *dn, struct dn_list *list)
175 {
176         struct ldb_message *msg;
177         int ret;
178         struct ldb_message_element *el;
179         TDB_DATA rec;
180         struct dn_list *list2;
181         TDB_DATA key;
182
183         list->dn = NULL;
184         list->count = 0;
185
186         /* see if we have any in-memory index entries */
187         if (ltdb->idxptr == NULL ||
188             ltdb->idxptr->itdb == NULL) {
189                 goto normal_index;
190         }
191
192         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
193         key.dsize = strlen((char *)key.dptr);
194
195         rec = tdb_fetch(ltdb->idxptr->itdb, key);
196         if (rec.dptr == NULL) {
197                 goto normal_index;
198         }
199
200         /* we've found an in-memory index entry */
201         list2 = ltdb_index_idxptr(module, rec, true);
202         if (list2 == NULL) {
203                 free(rec.dptr);
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206         free(rec.dptr);
207
208         *list = *list2;
209         return LDB_SUCCESS;
210
211 normal_index:
212         msg = ldb_msg_new(list);
213         if (msg == NULL) {
214                 return LDB_ERR_OPERATIONS_ERROR;
215         }
216
217         ret = ltdb_search_dn1(module, dn, msg,
218                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
219                               |LDB_UNPACK_DATA_FLAG_NO_DN);
220         if (ret != LDB_SUCCESS) {
221                 talloc_free(msg);
222                 return ret;
223         }
224
225         /* TODO: check indexing version number */
226
227         el = ldb_msg_find_element(msg, LTDB_IDX);
228         if (!el) {
229                 talloc_free(msg);
230                 return LDB_SUCCESS;
231         }
232
233         /*
234          * we avoid copying the strings by stealing the list.  We have
235          * to steal msg onto el->values (which looks odd) because we
236          * asked for the memory to be allocated on msg, not on each
237          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
238          */
239         if (ltdb->cache->GUID_index_attribute == NULL) {
240                 talloc_steal(el->values, msg);
241                 list->dn = talloc_steal(list, el->values);
242                 list->count = el->num_values;
243         } else {
244                 unsigned int i;
245                 static const size_t GUID_val_size = 16;
246                 if (el->num_values != 1) {
247                         return LDB_ERR_OPERATIONS_ERROR;
248                 }
249
250                 if ((el->values[0].length % GUID_val_size) != 0) {
251                         return LDB_ERR_OPERATIONS_ERROR;
252                 }
253
254                 list->count = el->values[0].length / GUID_val_size;
255                 list->dn = talloc_array(list, struct ldb_val, list->count);
256
257                 /*
258                  * The actual data is on msg, due to
259                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
260                  */
261                 talloc_steal(list->dn, msg);
262                 for (i = 0; i < list->count; i++) {
263                         list->dn[i].data = &el->values[0].data[i * GUID_val_size];
264                         list->dn[i].length = GUID_val_size;
265                 }
266         }
267
268         /* We don't need msg->elements any more */
269         talloc_free(msg->elements);
270         return LDB_SUCCESS;
271 }
272
273 int ltdb_key_dn_from_idx(struct ldb_module *module,
274                          struct ltdb_private *ltdb,
275                          TALLOC_CTX *mem_ctx,
276                          struct ldb_dn *dn,
277                          TDB_DATA *tdb_key)
278 {
279         struct ldb_context *ldb = ldb_module_get_ctx(module);
280         int ret;
281         struct dn_list *list = talloc(mem_ctx, struct dn_list);
282         if (list == NULL) {
283                 ldb_oom(ldb);
284                 return LDB_ERR_OPERATIONS_ERROR;
285         }
286
287         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list);
288         if (ret != LDB_SUCCESS) {
289                 return ret;
290         }
291
292         if (list->count == 0) {
293                 return LDB_ERR_NO_SUCH_OBJECT;
294         }
295         if (list->count > 1) {
296                 return LDB_ERR_CONSTRAINT_VIOLATION;
297         }
298
299         *tdb_key = ltdb_guid_to_key(module, ltdb,
300                                     mem_ctx, &list->dn[0]);
301         if (tdb_key->dptr == NULL) {
302                 return LDB_ERR_OPERATIONS_ERROR;
303         }
304
305         return LDB_SUCCESS;
306 }
307
308
309
310 /*
311   save a dn_list into a full @IDX style record
312  */
313 static int ltdb_dn_list_store_full(struct ldb_module *module,
314                                    struct ltdb_private *ltdb,
315                                    struct ldb_dn *dn,
316                                    struct dn_list *list)
317 {
318         struct ldb_message *msg;
319         int ret;
320
321         msg = ldb_msg_new(module);
322         if (!msg) {
323                 return ldb_module_oom(module);
324         }
325
326         msg->dn = dn;
327
328         if (list->count == 0) {
329                 ret = ltdb_delete_noindex(module, msg);
330                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
331                         talloc_free(msg);
332                         return LDB_SUCCESS;
333                 }
334                 return ret;
335         }
336
337         if (ltdb->cache->GUID_index_attribute == NULL) {
338                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
339                                       LTDB_INDEXING_VERSION);
340                 if (ret != LDB_SUCCESS) {
341                         talloc_free(msg);
342                         return ldb_module_oom(module);
343                 }
344         } else {
345                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
346                                       LTDB_GUID_INDEXING_VERSION);
347                 if (ret != LDB_SUCCESS) {
348                         talloc_free(msg);
349                         return ldb_module_oom(module);
350                 }
351         }
352
353         if (list->count > 0) {
354                 struct ldb_message_element *el;
355
356                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
357                 if (ret != LDB_SUCCESS) {
358                         talloc_free(msg);
359                         return ldb_module_oom(module);
360                 }
361
362                 if (ltdb->cache->GUID_index_attribute == NULL) {
363                         el->values = list->dn;
364                         el->num_values = list->count;
365                 } else {
366                         struct ldb_val v;
367                         unsigned int i;
368                         el->values = talloc_array(msg,
369                                                   struct ldb_val, 1);
370                         if (el->values == NULL) {
371                                 talloc_free(msg);
372                                 return ldb_module_oom(module);
373                         }
374
375                         v.data = talloc_array_size(el->values,
376                                                    list->count,
377                                                    LTDB_GUID_SIZE);
378                         if (v.data == NULL) {
379                                 talloc_free(msg);
380                                 return ldb_module_oom(module);
381                         }
382
383                         v.length = talloc_get_size(v.data);
384
385                         for (i = 0; i < list->count; i++) {
386                                 if (list->dn[i].length !=
387                                     LTDB_GUID_SIZE) {
388                                         talloc_free(msg);
389                                         return ldb_module_operr(module);
390                                 }
391                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
392                                        list->dn[i].data,
393                                        LTDB_GUID_SIZE);
394                         }
395                         el->values[0] = v;
396                         el->num_values = 1;
397                 }
398         }
399
400         ret = ltdb_store(module, msg, TDB_REPLACE);
401         talloc_free(msg);
402         return ret;
403 }
404
405 /*
406   save a dn_list into the database, in either @IDX or internal format
407  */
408 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
409                               struct dn_list *list)
410 {
411         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
412         TDB_DATA rec, key;
413         int ret;
414         struct dn_list *list2;
415
416         if (ltdb->idxptr == NULL) {
417                 return ltdb_dn_list_store_full(module, ltdb,
418                                                dn, list);
419         }
420
421         if (ltdb->idxptr->itdb == NULL) {
422                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
423                 if (ltdb->idxptr->itdb == NULL) {
424                         return LDB_ERR_OPERATIONS_ERROR;
425                 }
426         }
427
428         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
429         key.dsize = strlen((char *)key.dptr);
430
431         rec = tdb_fetch(ltdb->idxptr->itdb, key);
432         if (rec.dptr != NULL) {
433                 list2 = ltdb_index_idxptr(module, rec, false);
434                 if (list2 == NULL) {
435                         free(rec.dptr);
436                         return LDB_ERR_OPERATIONS_ERROR;
437                 }
438                 free(rec.dptr);
439                 list2->dn = talloc_steal(list2, list->dn);
440                 list2->count = list->count;
441                 return LDB_SUCCESS;
442         }
443
444         list2 = talloc(ltdb->idxptr, struct dn_list);
445         if (list2 == NULL) {
446                 return LDB_ERR_OPERATIONS_ERROR;
447         }
448         list2->dn = talloc_steal(list2, list->dn);
449         list2->count = list->count;
450
451         rec.dptr = (uint8_t *)&list2;
452         rec.dsize = sizeof(void *);
453
454
455         /*
456          * This is not a store into the main DB, but into an in-memory
457          * TDB, so we don't need a guard on ltdb->read_only
458          */
459         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
460         if (ret != 0) {
461                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
462         }
463         return LDB_SUCCESS;
464 }
465
466 /*
467   traverse function for storing the in-memory index entries on disk
468  */
469 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
470 {
471         struct ldb_module *module = state;
472         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
473         struct ldb_dn *dn;
474         struct ldb_context *ldb = ldb_module_get_ctx(module);
475         struct ldb_val v;
476         struct dn_list *list;
477
478         list = ltdb_index_idxptr(module, data, true);
479         if (list == NULL) {
480                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
481                 return -1;
482         }
483
484         v.data = key.dptr;
485         v.length = strnlen((char *)key.dptr, key.dsize);
486
487         dn = ldb_dn_from_ldb_val(module, ldb, &v);
488         if (dn == NULL) {
489                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
490                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
491                 return -1;
492         }
493
494         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
495                                                       dn, list);
496         talloc_free(dn);
497         if (ltdb->idxptr->error != 0) {
498                 return -1;
499         }
500         return 0;
501 }
502
503 /* cleanup the idxptr mode when transaction commits */
504 int ltdb_index_transaction_commit(struct ldb_module *module)
505 {
506         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
507         int ret;
508
509         struct ldb_context *ldb = ldb_module_get_ctx(module);
510
511         ldb_reset_err_string(ldb);
512
513         if (ltdb->idxptr->itdb) {
514                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
515                 tdb_close(ltdb->idxptr->itdb);
516         }
517
518         ret = ltdb->idxptr->error;
519         if (ret != LDB_SUCCESS) {
520                 if (!ldb_errstring(ldb)) {
521                         ldb_set_errstring(ldb, ldb_strerror(ret));
522                 }
523                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
524         }
525
526         talloc_free(ltdb->idxptr);
527         ltdb->idxptr = NULL;
528         return ret;
529 }
530
531 /* cleanup the idxptr mode when transaction cancels */
532 int ltdb_index_transaction_cancel(struct ldb_module *module)
533 {
534         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
535         if (ltdb->idxptr && ltdb->idxptr->itdb) {
536                 tdb_close(ltdb->idxptr->itdb);
537         }
538         talloc_free(ltdb->idxptr);
539         ltdb->idxptr = NULL;
540         return LDB_SUCCESS;
541 }
542
543
544 /*
545   return the dn key to be used for an index
546   the caller is responsible for freeing
547 */
548 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
549                                      const char *attr, const struct ldb_val *value,
550                                      const struct ldb_schema_attribute **ap)
551 {
552         struct ldb_dn *ret;
553         struct ldb_val v;
554         const struct ldb_schema_attribute *a;
555         char *attr_folded;
556         int r;
557
558         attr_folded = ldb_attr_casefold(ldb, attr);
559         if (!attr_folded) {
560                 return NULL;
561         }
562
563         a = ldb_schema_attribute_by_name(ldb, attr);
564         if (ap) {
565                 *ap = a;
566         }
567         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
568         if (r != LDB_SUCCESS) {
569                 const char *errstr = ldb_errstring(ldb);
570                 /* canonicalisation can be refused. For example,
571                    a attribute that takes wildcards will refuse to canonicalise
572                    if the value contains a wildcard */
573                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
574                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
575                 talloc_free(attr_folded);
576                 return NULL;
577         }
578         if (ldb_should_b64_encode(ldb, &v)) {
579                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
580                 if (!vstr) {
581                         talloc_free(attr_folded);
582                         return NULL;
583                 }
584                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
585                 talloc_free(vstr);
586         } else {
587                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
588         }
589
590         if (v.data != value->data) {
591                 talloc_free(v.data);
592         }
593         talloc_free(attr_folded);
594
595         return ret;
596 }
597
598 /*
599   see if a attribute value is in the list of indexed attributes
600 */
601 static bool ltdb_is_indexed(struct ldb_module *module,
602                             struct ltdb_private *ltdb,
603                             const char *attr)
604 {
605         struct ldb_context *ldb = ldb_module_get_ctx(module);
606         unsigned int i;
607         struct ldb_message_element *el;
608
609         if (ldb->schema.index_handler_override) {
610                 const struct ldb_schema_attribute *a
611                         = ldb_schema_attribute_by_name(ldb, attr);
612
613                 if (a == NULL) {
614                         return false;
615                 }
616
617                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
618                         return true;
619                 } else {
620                         return false;
621                 }
622         }
623
624         if (!ltdb->cache->attribute_indexes) {
625                 return false;
626         }
627
628         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
629         if (el == NULL) {
630                 return false;
631         }
632
633         /* TODO: this is too expensive! At least use a binary search */
634         for (i=0; i<el->num_values; i++) {
635                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
636                         return true;
637                 }
638         }
639         return false;
640 }
641
642 /*
643   in the following logic functions, the return value is treated as
644   follows:
645
646      LDB_SUCCESS: we found some matching index values
647
648      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
649
650      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
651                                we'll need a full search
652  */
653
654 /*
655   return a list of dn's that might match a simple indexed search (an
656   equality search only)
657  */
658 static int ltdb_index_dn_simple(struct ldb_module *module,
659                                 struct ltdb_private *ltdb,
660                                 const struct ldb_parse_tree *tree,
661                                 struct dn_list *list)
662 {
663         struct ldb_context *ldb;
664         struct ldb_dn *dn;
665         int ret;
666
667         ldb = ldb_module_get_ctx(module);
668
669         list->count = 0;
670         list->dn = NULL;
671
672         /* if the attribute isn't in the list of indexed attributes then
673            this node needs a full search */
674         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
675                 return LDB_ERR_OPERATIONS_ERROR;
676         }
677
678         /* the attribute is indexed. Pull the list of DNs that match the
679            search criterion */
680         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
681         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
682
683         ret = ltdb_dn_list_load(module, ltdb, dn, list);
684         talloc_free(dn);
685         return ret;
686 }
687
688
689 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
690
691 /*
692   return a list of dn's that might match a leaf indexed search
693  */
694 static int ltdb_index_dn_leaf(struct ldb_module *module,
695                               struct ltdb_private *ltdb,
696                               const struct ldb_parse_tree *tree,
697                               struct dn_list *list)
698 {
699         if (ltdb->disallow_dn_filter &&
700             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
701                 /* in AD mode we do not support "(dn=...)" search filters */
702                 list->dn = NULL;
703                 list->count = 0;
704                 return LDB_SUCCESS;
705         }
706         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
707                 struct ldb_dn *dn
708                         = ldb_dn_from_ldb_val(list,
709                                               ldb_module_get_ctx(module),
710                                               &tree->u.equality.value);
711                 if (dn == NULL) {
712                         /* If we can't parse it, no match */
713                         list->dn = NULL;
714                         list->count = 0;
715                         return LDB_SUCCESS;
716                 }
717
718                 /*
719                  * Re-use the same code we use for a SCOPE_BASE
720                  * search
721                  *
722                  * We can't call TALLOC_FREE(dn) as this must belong
723                  * to list for the memory to remain valid.
724                  */
725                 return ltdb_index_dn_base_dn(module, ltdb, dn, list);
726         }
727         return ltdb_index_dn_simple(module, ltdb, tree, list);
728 }
729
730
731 /*
732   list intersection
733   list = list & list2
734 */
735 static bool list_intersect(struct ldb_context *ldb,
736                            struct ltdb_private *ltdb,
737                            struct dn_list *list, const struct dn_list *list2)
738 {
739         struct dn_list *list3;
740         unsigned int i;
741
742         if (list->count == 0) {
743                 /* 0 & X == 0 */
744                 return true;
745         }
746         if (list2->count == 0) {
747                 /* X & 0 == 0 */
748                 list->count = 0;
749                 list->dn = NULL;
750                 return true;
751         }
752
753         /* the indexing code is allowed to return a longer list than
754            what really matches, as all results are filtered by the
755            full expression at the end - this shortcut avoids a lot of
756            work in some cases */
757         if (list->count < 2 && list2->count > 10) {
758                 return true;
759         }
760         if (list2->count < 2 && list->count > 10) {
761                 list->count = list2->count;
762                 list->dn = list2->dn;
763                 /* note that list2 may not be the parent of list2->dn,
764                    as list2->dn may be owned by ltdb->idxptr. In that
765                    case we expect this reparent call to fail, which is
766                    OK */
767                 talloc_reparent(list2, list, list2->dn);
768                 return true;
769         }
770
771         list3 = talloc_zero(list, struct dn_list);
772         if (list3 == NULL) {
773                 return false;
774         }
775
776         list3->dn = talloc_array(list3, struct ldb_val, list->count);
777         if (!list3->dn) {
778                 talloc_free(list3);
779                 return false;
780         }
781         list3->count = 0;
782
783         for (i=0;i<list->count;i++) {
784                 if (ltdb_dn_list_find_val(ltdb, list2,
785                                           &list->dn[i]) != -1) {
786                         list3->dn[list3->count] = list->dn[i];
787                         list3->count++;
788                 }
789         }
790
791         list->dn = talloc_steal(list, list3->dn);
792         list->count = list3->count;
793         talloc_free(list3);
794
795         return true;
796 }
797
798
799 /*
800   list union
801   list = list | list2
802 */
803 static bool list_union(struct ldb_context *ldb,
804                        struct dn_list *list, const struct dn_list *list2)
805 {
806         struct ldb_val *dn3;
807
808         if (list2->count == 0) {
809                 /* X | 0 == X */
810                 return true;
811         }
812
813         if (list->count == 0) {
814                 /* 0 | X == X */
815                 list->count = list2->count;
816                 list->dn = list2->dn;
817                 /* note that list2 may not be the parent of list2->dn,
818                    as list2->dn may be owned by ltdb->idxptr. In that
819                    case we expect this reparent call to fail, which is
820                    OK */
821                 talloc_reparent(list2, list, list2->dn);
822                 return true;
823         }
824
825         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
826         if (!dn3) {
827                 ldb_oom(ldb);
828                 return false;
829         }
830
831         /* we allow for duplicates here, and get rid of them later */
832         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
833         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
834
835         list->dn = dn3;
836         list->count += list2->count;
837
838         return true;
839 }
840
841 static int ltdb_index_dn(struct ldb_module *module,
842                          struct ltdb_private *ltdb,
843                          const struct ldb_parse_tree *tree,
844                          struct dn_list *list);
845
846
847 /*
848   process an OR list (a union)
849  */
850 static int ltdb_index_dn_or(struct ldb_module *module,
851                             struct ltdb_private *ltdb,
852                             const struct ldb_parse_tree *tree,
853                             struct dn_list *list)
854 {
855         struct ldb_context *ldb;
856         unsigned int i;
857
858         ldb = ldb_module_get_ctx(module);
859
860         list->dn = NULL;
861         list->count = 0;
862
863         for (i=0; i<tree->u.list.num_elements; i++) {
864                 struct dn_list *list2;
865                 int ret;
866
867                 list2 = talloc_zero(list, struct dn_list);
868                 if (list2 == NULL) {
869                         return LDB_ERR_OPERATIONS_ERROR;
870                 }
871
872                 ret = ltdb_index_dn(module, ltdb,
873                                     tree->u.list.elements[i], list2);
874
875                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
876                         /* X || 0 == X */
877                         talloc_free(list2);
878                         continue;
879                 }
880
881                 if (ret != LDB_SUCCESS) {
882                         /* X || * == * */
883                         talloc_free(list2);
884                         return ret;
885                 }
886
887                 if (!list_union(ldb, list, list2)) {
888                         talloc_free(list2);
889                         return LDB_ERR_OPERATIONS_ERROR;
890                 }
891         }
892
893         if (list->count == 0) {
894                 return LDB_ERR_NO_SUCH_OBJECT;
895         }
896
897         return LDB_SUCCESS;
898 }
899
900
901 /*
902   NOT an index results
903  */
904 static int ltdb_index_dn_not(struct ldb_module *module,
905                              struct ltdb_private *ltdb,
906                              const struct ldb_parse_tree *tree,
907                              struct dn_list *list)
908 {
909         /* the only way to do an indexed not would be if we could
910            negate the not via another not or if we knew the total
911            number of database elements so we could know that the
912            existing expression covered the whole database.
913
914            instead, we just give up, and rely on a full index scan
915            (unless an outer & manages to reduce the list)
916         */
917         return LDB_ERR_OPERATIONS_ERROR;
918 }
919
920
921 static bool ltdb_index_unique(struct ldb_context *ldb,
922                               const char *attr)
923 {
924         const struct ldb_schema_attribute *a;
925         a = ldb_schema_attribute_by_name(ldb, attr);
926         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
927                 return true;
928         }
929         return false;
930 }
931
932 /*
933   process an AND expression (intersection)
934  */
935 static int ltdb_index_dn_and(struct ldb_module *module,
936                              struct ltdb_private *ltdb,
937                              const struct ldb_parse_tree *tree,
938                              struct dn_list *list)
939 {
940         struct ldb_context *ldb;
941         unsigned int i;
942         bool found;
943
944         ldb = ldb_module_get_ctx(module);
945
946         list->dn = NULL;
947         list->count = 0;
948
949         /* in the first pass we only look for unique simple
950            equality tests, in the hope of avoiding having to look
951            at any others */
952         for (i=0; i<tree->u.list.num_elements; i++) {
953                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
954                 int ret;
955
956                 if (subtree->operation != LDB_OP_EQUALITY ||
957                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
958                         continue;
959                 }
960
961                 ret = ltdb_index_dn(module, ltdb, subtree, list);
962                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
963                         /* 0 && X == 0 */
964                         return LDB_ERR_NO_SUCH_OBJECT;
965                 }
966                 if (ret == LDB_SUCCESS) {
967                         /* a unique index match means we can
968                          * stop. Note that we don't care if we return
969                          * a few too many objects, due to later
970                          * filtering */
971                         return LDB_SUCCESS;
972                 }
973         }
974
975         /* now do a full intersection */
976         found = false;
977
978         for (i=0; i<tree->u.list.num_elements; i++) {
979                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
980                 struct dn_list *list2;
981                 int ret;
982
983                 list2 = talloc_zero(list, struct dn_list);
984                 if (list2 == NULL) {
985                         return ldb_module_oom(module);
986                 }
987
988                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
989
990                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
991                         /* X && 0 == 0 */
992                         list->dn = NULL;
993                         list->count = 0;
994                         talloc_free(list2);
995                         return LDB_ERR_NO_SUCH_OBJECT;
996                 }
997
998                 if (ret != LDB_SUCCESS) {
999                         /* this didn't adding anything */
1000                         talloc_free(list2);
1001                         continue;
1002                 }
1003
1004                 if (!found) {
1005                         talloc_reparent(list2, list, list->dn);
1006                         list->dn = list2->dn;
1007                         list->count = list2->count;
1008                         found = true;
1009                 } else if (!list_intersect(ldb, ltdb,
1010                                            list, list2)) {
1011                         talloc_free(list2);
1012                         return LDB_ERR_OPERATIONS_ERROR;
1013                 }
1014
1015                 if (list->count == 0) {
1016                         list->dn = NULL;
1017                         return LDB_ERR_NO_SUCH_OBJECT;
1018                 }
1019
1020                 if (list->count < 2) {
1021                         /* it isn't worth loading the next part of the tree */
1022                         return LDB_SUCCESS;
1023                 }
1024         }
1025
1026         if (!found) {
1027                 /* none of the attributes were indexed */
1028                 return LDB_ERR_OPERATIONS_ERROR;
1029         }
1030
1031         return LDB_SUCCESS;
1032 }
1033
1034 /*
1035   return a list of matching objects using a one-level index
1036  */
1037 static int ltdb_index_dn_attr(struct ldb_module *module,
1038                               struct ltdb_private *ltdb,
1039                               const char *attr,
1040                               struct ldb_dn *dn,
1041                               struct dn_list *list)
1042 {
1043         struct ldb_context *ldb;
1044         struct ldb_dn *key;
1045         struct ldb_val val;
1046         int ret;
1047
1048         ldb = ldb_module_get_ctx(module);
1049
1050         /* work out the index key from the parent DN */
1051         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1052         val.length = strlen((char *)val.data);
1053         key = ltdb_index_key(ldb, attr, &val, NULL);
1054         if (!key) {
1055                 ldb_oom(ldb);
1056                 return LDB_ERR_OPERATIONS_ERROR;
1057         }
1058
1059         ret = ltdb_dn_list_load(module, ltdb, key, list);
1060         talloc_free(key);
1061         if (ret != LDB_SUCCESS) {
1062                 return ret;
1063         }
1064
1065         if (list->count == 0) {
1066                 return LDB_ERR_NO_SUCH_OBJECT;
1067         }
1068
1069         return LDB_SUCCESS;
1070 }
1071
1072 /*
1073   return a list of matching objects using a one-level index
1074  */
1075 static int ltdb_index_dn_one(struct ldb_module *module,
1076                              struct ltdb_private *ltdb,
1077                              struct ldb_dn *parent_dn,
1078                              struct dn_list *list)
1079 {
1080         return ltdb_index_dn_attr(module, ltdb,
1081                                   LTDB_IDXONE, parent_dn, list);
1082 }
1083
1084 /*
1085   return a list of matching objects using the DN index
1086  */
1087 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1088                                  struct ltdb_private *ltdb,
1089                                  struct ldb_dn *base_dn,
1090                                  struct dn_list *dn_list)
1091 {
1092         const struct ldb_val *guid_val = NULL;
1093         if (ltdb->cache->GUID_index_attribute == NULL) {
1094                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1095                 if (dn_list->dn == NULL) {
1096                         return ldb_module_oom(module);
1097                 }
1098                 dn_list->dn[0].data = discard_const_p(unsigned char,
1099                                                       ldb_dn_get_linearized(base_dn));
1100                 if (dn_list->dn[0].data == NULL) {
1101                         return ldb_module_oom(module);
1102                 }
1103                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1104                 dn_list->count = 1;
1105
1106                 return LDB_SUCCESS;
1107         }
1108
1109         if (ltdb->cache->GUID_index_dn_component != NULL) {
1110                 guid_val = ldb_dn_get_extended_component(base_dn,
1111                                                          ltdb->cache->GUID_index_dn_component);
1112         }
1113
1114         if (guid_val != NULL) {
1115                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1116                 if (dn_list->dn == NULL) {
1117                         return ldb_module_oom(module);
1118                 }
1119                 dn_list->dn[0].data = guid_val->data;
1120                 dn_list->dn[0].length = guid_val->length;
1121                 dn_list->count = 1;
1122
1123                 return LDB_SUCCESS;
1124         }
1125
1126         return ltdb_index_dn_attr(module, ltdb,
1127                                   LTDB_IDXDN, base_dn, dn_list);
1128 }
1129
1130 /*
1131   return a list of dn's that might match a indexed search or
1132   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1133  */
1134 static int ltdb_index_dn(struct ldb_module *module,
1135                          struct ltdb_private *ltdb,
1136                          const struct ldb_parse_tree *tree,
1137                          struct dn_list *list)
1138 {
1139         int ret = LDB_ERR_OPERATIONS_ERROR;
1140
1141         switch (tree->operation) {
1142         case LDB_OP_AND:
1143                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1144                 break;
1145
1146         case LDB_OP_OR:
1147                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1148                 break;
1149
1150         case LDB_OP_NOT:
1151                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1152                 break;
1153
1154         case LDB_OP_EQUALITY:
1155                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1156                 break;
1157
1158         case LDB_OP_SUBSTRING:
1159         case LDB_OP_GREATER:
1160         case LDB_OP_LESS:
1161         case LDB_OP_PRESENT:
1162         case LDB_OP_APPROX:
1163         case LDB_OP_EXTENDED:
1164                 /* we can't index with fancy bitops yet */
1165                 ret = LDB_ERR_OPERATIONS_ERROR;
1166                 break;
1167         }
1168
1169         return ret;
1170 }
1171
1172 /*
1173   filter a candidate dn_list from an indexed search into a set of results
1174   extracting just the given attributes
1175 */
1176 static int ltdb_index_filter(struct ltdb_private *ltdb,
1177                              const struct dn_list *dn_list,
1178                              struct ltdb_context *ac,
1179                              uint32_t *match_count)
1180 {
1181         struct ldb_context *ldb;
1182         struct ldb_message *msg;
1183         struct ldb_message *filtered_msg;
1184         unsigned int i;
1185
1186         ldb = ldb_module_get_ctx(ac->module);
1187
1188         for (i = 0; i < dn_list->count; i++) {
1189                 struct ldb_dn *dn;
1190                 int ret;
1191                 bool matched;
1192
1193                 msg = ldb_msg_new(ac);
1194                 if (!msg) {
1195                         return LDB_ERR_OPERATIONS_ERROR;
1196                 }
1197
1198                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
1199                 if (dn == NULL) {
1200                         talloc_free(msg);
1201                         return LDB_ERR_OPERATIONS_ERROR;
1202                 }
1203
1204                 ret = ltdb_search_dn1(ac->module, dn, msg,
1205                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1206                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1207                 talloc_free(dn);
1208                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1209                         /* the record has disappeared? yes, this can happen */
1210                         talloc_free(msg);
1211                         continue;
1212                 }
1213
1214                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1215                         /* an internal error */
1216                         talloc_free(msg);
1217                         return LDB_ERR_OPERATIONS_ERROR;
1218                 }
1219
1220                 ret = ldb_match_msg_error(ldb, msg,
1221                                           ac->tree, ac->base, ac->scope, &matched);
1222                 if (ret != LDB_SUCCESS) {
1223                         talloc_free(msg);
1224                         return ret;
1225                 }
1226                 if (!matched) {
1227                         talloc_free(msg);
1228                         continue;
1229                 }
1230
1231                 /* filter the attributes that the user wants */
1232                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1233
1234                 talloc_free(msg);
1235
1236                 if (ret == -1) {
1237                         return LDB_ERR_OPERATIONS_ERROR;
1238                 }
1239
1240                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1241                 if (ret != LDB_SUCCESS) {
1242                         /* Regardless of success or failure, the msg
1243                          * is the callbacks responsiblity, and should
1244                          * not be talloc_free()'ed */
1245                         ac->request_terminated = true;
1246                         return ret;
1247                 }
1248
1249                 (*match_count)++;
1250         }
1251
1252         return LDB_SUCCESS;
1253 }
1254
1255 /*
1256   remove any duplicated entries in a indexed result
1257  */
1258 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1259 {
1260         unsigned int i, new_count;
1261
1262         if (list->count < 2) {
1263                 return;
1264         }
1265
1266         TYPESAFE_QSORT(list->dn, list->count,
1267                        ldb_val_equal_exact_for_qsort);
1268
1269         new_count = 1;
1270         for (i=1; i<list->count; i++) {
1271                 if (ldb_val_equal_exact(&list->dn[i],
1272                                         &list->dn[new_count-1]) == 0) {
1273                         if (new_count != i) {
1274                                 list->dn[new_count] = list->dn[i];
1275                         }
1276                         new_count++;
1277                 }
1278         }
1279
1280         list->count = new_count;
1281 }
1282
1283 /*
1284   search the database with a LDAP-like expression using indexes
1285   returns -1 if an indexed search is not possible, in which
1286   case the caller should call ltdb_search_full()
1287 */
1288 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1289 {
1290         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1291         struct dn_list *dn_list;
1292         int ret;
1293
1294         /* see if indexing is enabled */
1295         if (!ltdb->cache->attribute_indexes &&
1296             !ltdb->cache->one_level_indexes &&
1297             ac->scope != LDB_SCOPE_BASE) {
1298                 /* fallback to a full search */
1299                 return LDB_ERR_OPERATIONS_ERROR;
1300         }
1301
1302         dn_list = talloc_zero(ac, struct dn_list);
1303         if (dn_list == NULL) {
1304                 return ldb_module_oom(ac->module);
1305         }
1306
1307         switch (ac->scope) {
1308         case LDB_SCOPE_BASE:
1309                 ret = ltdb_index_dn_base_dn(ac->module, ltdb,
1310                                             ac->base, dn_list);
1311                 if (ret != LDB_SUCCESS) {
1312                         talloc_free(dn_list);
1313                         return ret;
1314                 }
1315                 break;
1316
1317         case LDB_SCOPE_ONELEVEL:
1318                 if (!ltdb->cache->one_level_indexes) {
1319                         talloc_free(dn_list);
1320                         return LDB_ERR_OPERATIONS_ERROR;
1321                 }
1322                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list);
1323                 if (ret != LDB_SUCCESS) {
1324                         talloc_free(dn_list);
1325                         return ret;
1326                 }
1327                 break;
1328
1329         case LDB_SCOPE_SUBTREE:
1330         case LDB_SCOPE_DEFAULT:
1331                 if (!ltdb->cache->attribute_indexes) {
1332                         talloc_free(dn_list);
1333                         return LDB_ERR_OPERATIONS_ERROR;
1334                 }
1335                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1336                 if (ret != LDB_SUCCESS) {
1337                         talloc_free(dn_list);
1338                         return ret;
1339                 }
1340                 ltdb_dn_list_remove_duplicates(dn_list);
1341                 break;
1342         }
1343
1344         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count);
1345         talloc_free(dn_list);
1346         return ret;
1347 }
1348
1349 /**
1350  * @brief Add a DN in the index list of a given attribute name/value pair
1351  *
1352  * This function will add the DN in the index list for the index for
1353  * the given attribute name and value.
1354  *
1355  * @param[in]  module       A ldb_module structure
1356  *
1357  * @param[in]  dn           The string representation of the DN as it
1358  *                          will be stored in the index entry
1359  *
1360  * @param[in]  el           A ldb_message_element array, one of the entry
1361  *                          referred by the v_idx is the attribute name and
1362  *                          value pair which will be used to construct the
1363  *                          index name
1364  *
1365  * @param[in]  v_idx        The index of element in the el array to use
1366  *
1367  * @return                  An ldb error code
1368  */
1369 static int ltdb_index_add1(struct ldb_module *module,
1370                            struct ltdb_private *ltdb,
1371                            const struct ldb_message *msg,
1372                            struct ldb_message_element *el, int v_idx)
1373 {
1374         struct ldb_context *ldb;
1375         struct ldb_dn *dn_key;
1376         int ret;
1377         const struct ldb_schema_attribute *a;
1378         struct dn_list *list;
1379         unsigned alloc_len;
1380
1381         ldb = ldb_module_get_ctx(module);
1382
1383         list = talloc_zero(module, struct dn_list);
1384         if (list == NULL) {
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1389         if (!dn_key) {
1390                 talloc_free(list);
1391                 return LDB_ERR_OPERATIONS_ERROR;
1392         }
1393         talloc_steal(list, dn_key);
1394
1395         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
1396         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1397                 talloc_free(list);
1398                 return ret;
1399         }
1400
1401         if (list->count > 0 &&
1402             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1403                 /*
1404                  * We do not want to print info about a possibly
1405                  * confidential DN that the conflict was with in the
1406                  * user-visible error string
1407                  */
1408                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1409                           __location__ ": unique index violation on %s in %s, "
1410                           "conficts with %*.*s in %s",
1411                           el->name, ldb_dn_get_linearized(msg->dn),
1412                           (int)list->dn[0].length,
1413                           (int)list->dn[0].length,
1414                           list->dn[0].data,
1415                           ldb_dn_get_linearized(dn_key));
1416                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1417                                        el->name,
1418                                        ldb_dn_get_linearized(msg->dn));
1419                 talloc_free(list);
1420                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1421         }
1422
1423         /* overallocate the list a bit, to reduce the number of
1424          * realloc trigered copies */
1425         alloc_len = ((list->count+1)+7) & ~7;
1426         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1427         if (list->dn == NULL) {
1428                 talloc_free(list);
1429                 return LDB_ERR_OPERATIONS_ERROR;
1430         }
1431
1432         if (ltdb->cache->GUID_index_attribute == NULL) {
1433                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
1434                 list->dn[list->count].data
1435                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
1436                 if (list->dn[list->count].data == NULL) {
1437                         talloc_free(list);
1438                         return LDB_ERR_OPERATIONS_ERROR;
1439                 }
1440                 list->dn[list->count].length = strlen(dn_str);
1441         } else {
1442                 const struct ldb_val *key_val;
1443                 key_val = ldb_msg_find_ldb_val(msg,
1444                                                ltdb->cache->GUID_index_attribute);
1445                 if (key_val == NULL) {
1446                         talloc_free(list);
1447                         return ldb_module_operr(module);
1448                 }
1449
1450                 if (key_val->length != LTDB_GUID_SIZE) {
1451                         talloc_free(list);
1452                         return ldb_module_operr(module);
1453                 }
1454                 list->dn[list->count] = ldb_val_dup(list->dn, key_val);
1455                 if (list->dn[list->count].data == NULL) {
1456                         talloc_free(list);
1457                         return ldb_module_operr(module);
1458                 }
1459         }
1460         list->count++;
1461
1462         ret = ltdb_dn_list_store(module, dn_key, list);
1463
1464         talloc_free(list);
1465
1466         return ret;
1467 }
1468
1469 /*
1470   add index entries for one elements in a message
1471  */
1472 static int ltdb_index_add_el(struct ldb_module *module,
1473                              struct ltdb_private *ltdb,
1474                              const struct ldb_message *msg,
1475                              struct ldb_message_element *el)
1476 {
1477         unsigned int i;
1478         for (i = 0; i < el->num_values; i++) {
1479                 int ret = ltdb_index_add1(module, ltdb,
1480                                           msg, el, i);
1481                 if (ret != LDB_SUCCESS) {
1482                         return ret;
1483                 }
1484         }
1485
1486         return LDB_SUCCESS;
1487 }
1488
1489 /*
1490   add index entries for all elements in a message
1491  */
1492 static int ltdb_index_add_all(struct ldb_module *module,
1493                               struct ltdb_private *ltdb,
1494                               const struct ldb_message *msg)
1495 {
1496         struct ldb_message_element *elements = msg->elements;
1497         unsigned int i;
1498         const char *dn_str;
1499         int ret;
1500
1501         if (ldb_dn_is_special(msg->dn)) {
1502                 return LDB_SUCCESS;
1503         }
1504
1505         dn_str = ldb_dn_get_linearized(msg->dn);
1506         if (dn_str == NULL) {
1507                 return LDB_ERR_OPERATIONS_ERROR;
1508         }
1509
1510         ret = ltdb_write_index_dn_guid(module, msg, 1);
1511         if (ret != LDB_SUCCESS) {
1512                 return ret;
1513         }
1514
1515         if (!ltdb->cache->attribute_indexes) {
1516                 /* no indexed fields */
1517                 return LDB_SUCCESS;
1518         }
1519
1520         for (i = 0; i < msg->num_elements; i++) {
1521                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1522                         continue;
1523                 }
1524                 ret = ltdb_index_add_el(module, ltdb,
1525                                         msg, &elements[i]);
1526                 if (ret != LDB_SUCCESS) {
1527                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1528                         ldb_asprintf_errstring(ldb,
1529                                                __location__ ": Failed to re-index %s in %s - %s",
1530                                                elements[i].name, dn_str,
1531                                                ldb_errstring(ldb));
1532                         return ret;
1533                 }
1534         }
1535
1536         return LDB_SUCCESS;
1537 }
1538
1539
1540 /*
1541   insert a DN index for a message
1542 */
1543 static int ltdb_modify_index_dn(struct ldb_module *module,
1544                                 struct ltdb_private *ltdb,
1545                                 const struct ldb_message *msg,
1546                                 struct ldb_dn *dn,
1547                                 const char *index, int add)
1548 {
1549         struct ldb_message_element el;
1550         struct ldb_val val;
1551         int ret;
1552
1553         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1554         if (val.data == NULL) {
1555                 return LDB_ERR_OPERATIONS_ERROR;
1556         }
1557
1558         val.length = strlen((char *)val.data);
1559         el.name = index;
1560         el.values = &val;
1561         el.num_values = 1;
1562
1563         if (add) {
1564                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
1565         } else { /* delete */
1566                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
1567         }
1568
1569         if (ret != LDB_SUCCESS) {
1570                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1571                 const char *dn_str = ldb_dn_get_linearized(dn);
1572                 ldb_asprintf_errstring(ldb,
1573                                        __location__
1574                                        ": Failed to modify %s "
1575                                        "against %s in %s - %s",
1576                                        index,
1577                                        ltdb->cache->GUID_index_attribute,
1578                                        dn_str, ldb_errstring(ldb));
1579                 return ret;
1580         }
1581         return ret;
1582 }
1583
1584 /*
1585   insert a one level index for a message
1586 */
1587 static int ltdb_index_onelevel(struct ldb_module *module,
1588                                const struct ldb_message *msg, int add)
1589 {
1590         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1591                                                     struct ltdb_private);
1592         struct ldb_dn *pdn;
1593         int ret;
1594
1595         /* We index for ONE Level only if requested */
1596         if (!ltdb->cache->one_level_indexes) {
1597                 return LDB_SUCCESS;
1598         }
1599
1600         pdn = ldb_dn_get_parent(module, msg->dn);
1601         if (pdn == NULL) {
1602                 return LDB_ERR_OPERATIONS_ERROR;
1603         }
1604         ret = ltdb_modify_index_dn(module, ltdb,
1605                                    msg, pdn, LTDB_IDXONE, add);
1606
1607         talloc_free(pdn);
1608
1609         return ret;
1610 }
1611
1612 /*
1613   insert a one level index for a message
1614 */
1615 static int ltdb_write_index_dn_guid(struct ldb_module *module,
1616                                     const struct ldb_message *msg,
1617                                     int add)
1618 {
1619         int ret;
1620         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1621                                                     struct ltdb_private);
1622
1623         /* We index for DN only if using a GUID index */
1624         if (ltdb->cache->GUID_index_attribute == NULL) {
1625                 return LDB_SUCCESS;
1626         }
1627
1628         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
1629                                    LTDB_IDXDN, add);
1630
1631         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
1632                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1633                                        "Entry %s already exists",
1634                                        ldb_dn_get_linearized(msg->dn));
1635                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1636         }
1637         return ret;
1638 }
1639
1640 /*
1641   add the index entries for a new element in a record
1642   The caller guarantees that these element values are not yet indexed
1643 */
1644 int ltdb_index_add_element(struct ldb_module *module,
1645                            struct ltdb_private *ltdb,
1646                            const struct ldb_message *msg,
1647                            struct ldb_message_element *el)
1648 {
1649         if (ldb_dn_is_special(msg->dn)) {
1650                 return LDB_SUCCESS;
1651         }
1652         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1653                 return LDB_SUCCESS;
1654         }
1655         return ltdb_index_add_el(module, ltdb, msg, el);
1656 }
1657
1658 /*
1659   add the index entries for a new record
1660 */
1661 int ltdb_index_add_new(struct ldb_module *module,
1662                        struct ltdb_private *ltdb,
1663                        const struct ldb_message *msg)
1664 {
1665         int ret;
1666
1667         if (ldb_dn_is_special(msg->dn)) {
1668                 return LDB_SUCCESS;
1669         }
1670
1671         ret = ltdb_index_add_all(module, ltdb, msg);
1672         if (ret != LDB_SUCCESS) {
1673                 return ret;
1674         }
1675
1676         return ltdb_index_onelevel(module, msg, 1);
1677 }
1678
1679
1680 /*
1681   delete an index entry for one message element
1682 */
1683 int ltdb_index_del_value(struct ldb_module *module,
1684                          struct ltdb_private *ltdb,
1685                          const struct ldb_message *msg,
1686                          struct ldb_message_element *el, unsigned int v_idx)
1687 {
1688         struct ldb_context *ldb;
1689         struct ldb_dn *dn_key;
1690         const char *dn_str;
1691         int ret, i;
1692         unsigned int j;
1693         struct dn_list *list;
1694         struct ldb_dn *dn = msg->dn;
1695
1696         ldb = ldb_module_get_ctx(module);
1697
1698         dn_str = ldb_dn_get_linearized(dn);
1699         if (dn_str == NULL) {
1700                 return LDB_ERR_OPERATIONS_ERROR;
1701         }
1702
1703         if (dn_str[0] == '@') {
1704                 return LDB_SUCCESS;
1705         }
1706
1707         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1708         if (!dn_key) {
1709                 return LDB_ERR_OPERATIONS_ERROR;
1710         }
1711
1712         list = talloc_zero(dn_key, struct dn_list);
1713         if (list == NULL) {
1714                 talloc_free(dn_key);
1715                 return LDB_ERR_OPERATIONS_ERROR;
1716         }
1717
1718         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
1719         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1720                 /* it wasn't indexed. Did we have an earlier error? If we did then
1721                    its gone now */
1722                 talloc_free(dn_key);
1723                 return LDB_SUCCESS;
1724         }
1725
1726         if (ret != LDB_SUCCESS) {
1727                 talloc_free(dn_key);
1728                 return ret;
1729         }
1730
1731         i = ltdb_dn_list_find_msg(ltdb, list, msg);
1732         if (i == -1) {
1733                 /* nothing to delete */
1734                 talloc_free(dn_key);
1735                 return LDB_SUCCESS;
1736         }
1737
1738         j = (unsigned int) i;
1739         if (j != list->count - 1) {
1740                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1741         }
1742         list->count--;
1743         if (list->count == 0) {
1744                 talloc_free(list->dn);
1745                 list->dn = NULL;
1746         } else {
1747                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1748         }
1749
1750         ret = ltdb_dn_list_store(module, dn_key, list);
1751
1752         talloc_free(dn_key);
1753
1754         return ret;
1755 }
1756
1757 /*
1758   delete the index entries for a element
1759   return -1 on failure
1760 */
1761 int ltdb_index_del_element(struct ldb_module *module,
1762                            struct ltdb_private *ltdb,
1763                            const struct ldb_message *msg,
1764                            struct ldb_message_element *el)
1765 {
1766         const char *dn_str;
1767         int ret;
1768         unsigned int i;
1769
1770         if (!ltdb->cache->attribute_indexes) {
1771                 /* no indexed fields */
1772                 return LDB_SUCCESS;
1773         }
1774
1775         dn_str = ldb_dn_get_linearized(msg->dn);
1776         if (dn_str == NULL) {
1777                 return LDB_ERR_OPERATIONS_ERROR;
1778         }
1779
1780         if (dn_str[0] == '@') {
1781                 return LDB_SUCCESS;
1782         }
1783
1784         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1785                 return LDB_SUCCESS;
1786         }
1787         for (i = 0; i < el->num_values; i++) {
1788                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
1789                 if (ret != LDB_SUCCESS) {
1790                         return ret;
1791                 }
1792         }
1793
1794         return LDB_SUCCESS;
1795 }
1796
1797 /*
1798   delete the index entries for a record
1799   return -1 on failure
1800 */
1801 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1802 {
1803         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1804         int ret;
1805         unsigned int i;
1806
1807         if (ldb_dn_is_special(msg->dn)) {
1808                 return LDB_SUCCESS;
1809         }
1810
1811         ret = ltdb_index_onelevel(module, msg, 0);
1812         if (ret != LDB_SUCCESS) {
1813                 return ret;
1814         }
1815
1816         ret = ltdb_write_index_dn_guid(module, msg, 0);
1817         if (ret != LDB_SUCCESS) {
1818                 return ret;
1819         }
1820
1821         if (!ltdb->cache->attribute_indexes) {
1822                 /* no indexed fields */
1823                 return LDB_SUCCESS;
1824         }
1825
1826         for (i = 0; i < msg->num_elements; i++) {
1827                 ret = ltdb_index_del_element(module, ltdb,
1828                                              msg, &msg->elements[i]);
1829                 if (ret != LDB_SUCCESS) {
1830                         return ret;
1831                 }
1832         }
1833
1834         return LDB_SUCCESS;
1835 }
1836
1837
1838 /*
1839   traversal function that deletes all @INDEX records
1840 */
1841 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1842 {
1843         struct ldb_module *module = state;
1844         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1845         const char *dnstr = "DN=" LTDB_INDEX ":";
1846         struct dn_list list;
1847         struct ldb_dn *dn;
1848         struct ldb_val v;
1849         int ret;
1850
1851         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1852                 return 0;
1853         }
1854         /* we need to put a empty list in the internal tdb for this
1855          * index entry */
1856         list.dn = NULL;
1857         list.count = 0;
1858
1859         /* the offset of 3 is to remove the DN= prefix. */
1860         v.data = key.dptr + 3;
1861         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1862
1863         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1864         ret = ltdb_dn_list_store(module, dn, &list);
1865         if (ret != LDB_SUCCESS) {
1866                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1867                                        "Unable to store null index for %s\n",
1868                                                 ldb_dn_get_linearized(dn));
1869                 talloc_free(dn);
1870                 return -1;
1871         }
1872         talloc_free(dn);
1873         return 0;
1874 }
1875
1876 struct ltdb_reindex_context {
1877         struct ldb_module *module;
1878         int error;
1879 };
1880
1881 /*
1882   traversal function that adds @INDEX records during a re index
1883 */
1884 static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1885 {
1886         struct ldb_context *ldb;
1887         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1888         struct ldb_module *module = ctx->module;
1889         struct ldb_message *msg;
1890         unsigned int nb_elements_in_db;
1891         const struct ldb_val val = {
1892                 .data = data.dptr,
1893                 .length = data.dsize,
1894         };
1895         int ret;
1896         TDB_DATA key2;
1897         bool is_record;
1898         
1899         ldb = ldb_module_get_ctx(module);
1900
1901         if (key.dsize > 4 &&
1902             memcmp(key.dptr, "DN=@", 4) == 0) {
1903                 return 0;
1904         }
1905
1906         is_record = ltdb_key_is_record(key);
1907         if (is_record == false) {
1908                 return 0;
1909         }
1910         
1911         msg = ldb_msg_new(module);
1912         if (msg == NULL) {
1913                 return -1;
1914         }
1915
1916         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1917                                                    msg,
1918                                                    NULL, 0,
1919                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1920                                                    &nb_elements_in_db);
1921         if (ret != 0) {
1922                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1923                                                 ldb_dn_get_linearized(msg->dn));
1924                 ctx->error = ret;
1925                 talloc_free(msg);
1926                 return -1;
1927         }
1928
1929         if (msg->dn == NULL) {
1930                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1931                           "Refusing to re-index as GUID "
1932                           "key %*.*s with no DN\n",
1933                           (int)key.dsize, (int)key.dsize,
1934                           (char *)key.dptr);
1935                 talloc_free(msg);
1936                 return -1;
1937         }
1938         
1939         /* check if the DN key has changed, perhaps due to the case
1940            insensitivity of an element changing, or a change from DN
1941            to GUID keys */
1942         key2 = ltdb_key_msg(module, msg);
1943         if (key2.dptr == NULL) {
1944                 /* probably a corrupt record ... darn */
1945                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1946                                                 ldb_dn_get_linearized(msg->dn));
1947                 talloc_free(msg);
1948                 return 0;
1949         }
1950         if (key.dsize != key2.dsize ||
1951             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
1952                 int tdb_ret;
1953                 tdb_ret = tdb_delete(tdb, key);
1954                 if (tdb_ret != 0) {
1955                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1956                                   "Failed to delete %*.*s "
1957                                   "for rekey as %*.*s: %s",
1958                                   (int)key.dsize, (int)key.dsize,
1959                                   (const char *)key.dptr,
1960                                   (int)key2.dsize, (int)key2.dsize,
1961                                   (const char *)key.dptr,
1962                                   tdb_errorstr(tdb));
1963                         ctx->error = ltdb_err_map(tdb_error(tdb));
1964                         return -1;
1965                 }
1966                 tdb_ret = tdb_store(tdb, key2, data, 0);
1967                 if (tdb_ret != 0) {
1968                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1969                                   "Failed to rekey %*.*s as %*.*s: %s",
1970                                   (int)key.dsize, (int)key.dsize,
1971                                   (const char *)key.dptr,
1972                                   (int)key2.dsize, (int)key2.dsize,
1973                                   (const char *)key.dptr,
1974                                   tdb_errorstr(tdb));
1975                         ctx->error = ltdb_err_map(tdb_error(tdb));
1976                         return -1;
1977                 }
1978         }
1979         talloc_free(key2.dptr);
1980
1981         talloc_free(msg);
1982
1983         return 0;
1984 }
1985
1986 /*
1987   traversal function that adds @INDEX records during a re index
1988 */
1989 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1990 {
1991         struct ldb_context *ldb;
1992         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1993         struct ldb_module *module = ctx->module;
1994         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1995                                                     struct ltdb_private);
1996         struct ldb_message *msg;
1997         unsigned int nb_elements_in_db;
1998         const struct ldb_val val = {
1999                 .data = data.dptr,
2000                 .length = data.dsize,
2001         };
2002         int ret;
2003         bool is_record;
2004         
2005         ldb = ldb_module_get_ctx(module);
2006
2007         if (key.dsize > 4 &&
2008             memcmp(key.dptr, "DN=@", 4) == 0) {
2009                 return 0;
2010         }
2011
2012         is_record = ltdb_key_is_record(key);
2013         if (is_record == false) {
2014                 return 0;
2015         }
2016         
2017         msg = ldb_msg_new(module);
2018         if (msg == NULL) {
2019                 return -1;
2020         }
2021
2022         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2023                                                    msg,
2024                                                    NULL, 0,
2025                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2026                                                    &nb_elements_in_db);
2027         if (ret != 0) {
2028                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2029                                                 ldb_dn_get_linearized(msg->dn));
2030                 ctx->error = ret;
2031                 talloc_free(msg);
2032                 return -1;
2033         }
2034
2035         if (msg->dn == NULL) {
2036                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2037                           "Refusing to re-index as GUID "
2038                           "key %*.*s with no DN\n",
2039                           (int)key.dsize, (int)key.dsize,
2040                           (char *)key.dptr);
2041                 talloc_free(msg);
2042                 return -1;
2043         }
2044
2045         ret = ltdb_index_onelevel(module, msg, 1);
2046         if (ret != LDB_SUCCESS) {
2047                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2048                           "Adding special ONE LEVEL index failed (%s)!",
2049                                                 ldb_dn_get_linearized(msg->dn));
2050                 talloc_free(msg);
2051                 return -1;
2052         }
2053
2054         ret = ltdb_index_add_all(module, ltdb, msg);
2055
2056         if (ret != LDB_SUCCESS) {
2057                 ctx->error = ret;
2058                 talloc_free(msg);
2059                 return -1;
2060         }
2061
2062         talloc_free(msg);
2063
2064         return 0;
2065 }
2066
2067 /*
2068   force a complete reindex of the database
2069 */
2070 int ltdb_reindex(struct ldb_module *module)
2071 {
2072         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2073         int ret;
2074         struct ltdb_reindex_context ctx;
2075
2076         /*
2077          * Only triggered after a modification, but make clear we do
2078          * not re-index a read-only DB
2079          */
2080         if (ltdb->read_only) {
2081                 return LDB_ERR_UNWILLING_TO_PERFORM;
2082         }
2083
2084         if (ltdb_cache_reload(module) != 0) {
2085                 return LDB_ERR_OPERATIONS_ERROR;
2086         }
2087
2088         /*
2089          * Ensure we read (and so remove) the entries from the real
2090          * DB, no values stored so far are any use as we want to do a
2091          * re-index
2092          */
2093         ltdb_index_transaction_cancel(module);
2094
2095         ret = ltdb_index_transaction_start(module);
2096         if (ret != LDB_SUCCESS) {
2097                 return ret;
2098         }
2099
2100         /* first traverse the database deleting any @INDEX records by
2101          * putting NULL entries in the in-memory tdb
2102          */
2103         ret = tdb_traverse(ltdb->tdb, delete_index, module);
2104         if (ret < 0) {
2105                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2106                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
2107                                        ldb_errstring(ldb));
2108                 return LDB_ERR_OPERATIONS_ERROR;
2109         }
2110
2111         /* if we don't have indexes we have nothing todo */
2112         if (!ltdb->cache->attribute_indexes) {
2113                 return LDB_SUCCESS;
2114         }
2115
2116         ctx.module = module;
2117         ctx.error = 0;
2118
2119         /* now traverse adding any indexes for normal LDB records */
2120         ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
2121         if (ret < 0) {
2122                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2123                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
2124                                        ldb_errstring(ldb));
2125                 return LDB_ERR_OPERATIONS_ERROR;
2126         }
2127
2128         if (ctx.error != LDB_SUCCESS) {
2129                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2130                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2131                 return ctx.error;
2132         }
2133
2134         ctx.error = 0;
2135
2136         /* now traverse adding any indexes for normal LDB records */
2137         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
2138         if (ret < 0) {
2139                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2140                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
2141                                        ldb_errstring(ldb));
2142                 return LDB_ERR_OPERATIONS_ERROR;
2143         }
2144
2145         if (ctx.error != LDB_SUCCESS) {
2146                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2147                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2148                 return ctx.error;
2149         }
2150
2151         return LDB_SUCCESS;
2152 }