ldb_tdb: Improve logging on unique index violation
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         if (ltdb->idxptr == NULL) {
58                 return ldb_oom(ldb_module_get_ctx(module));
59         }
60
61         return LDB_SUCCESS;
62 }
63
64 /* compare two DN entries in a dn_list. Take account of possible
65  * differences in string termination */
66 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
67 {
68         if (v1->length > v2->length && v1->data[v2->length] != 0) {
69                 return -1;
70         }
71         if (v1->length < v2->length && v2->data[v1->length] != 0) {
72                 return 1;
73         }
74         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
75 }
76
77
78 /*
79   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
80   comparison with the dn returns -1 if not found
81  */
82 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
83 {
84         unsigned int i;
85         for (i=0; i<list->count; i++) {
86                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
87         }
88         return -1;
89 }
90
91 /*
92   find a entry in a dn_list. Uses a case sensitive comparison with the dn
93   returns -1 if not found
94  */
95 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
96 {
97         struct ldb_val v;
98         v.data = discard_const_p(unsigned char, dn);
99         v.length = strlen(dn);
100         return ltdb_dn_list_find_val(list, &v);
101 }
102
103 /*
104   this is effectively a cast function, but with lots of paranoia
105   checks and also copes with CPUs that are fussy about pointer
106   alignment
107  */
108 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
109 {
110         struct dn_list *list;
111         if (rec.dsize != sizeof(void *)) {
112                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
113                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
114                 return NULL;
115         }
116         /* note that we can't just use a cast here, as rec.dptr may
117            not be aligned sufficiently for a pointer. A cast would cause
118            platforms like some ARM CPUs to crash */
119         memcpy(&list, rec.dptr, sizeof(void *));
120         list = talloc_get_type(list, struct dn_list);
121         if (list == NULL) {
122                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
123                                        "Bad type '%s' for idxptr",
124                                        talloc_get_name(list));
125                 return NULL;
126         }
127         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
128                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
129                                        "Bad parent '%s' for idxptr",
130                                        talloc_get_name(talloc_parent(list->dn)));
131                 return NULL;
132         }
133         return list;
134 }
135
136 /*
137   return the @IDX list in an index entry for a dn as a
138   struct dn_list
139  */
140 static int ltdb_dn_list_load(struct ldb_module *module,
141                              struct ldb_dn *dn, struct dn_list *list)
142 {
143         struct ldb_message *msg;
144         int ret;
145         struct ldb_message_element *el;
146         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
147         TDB_DATA rec;
148         struct dn_list *list2;
149         TDB_DATA key;
150
151         list->dn = NULL;
152         list->count = 0;
153
154         /* see if we have any in-memory index entries */
155         if (ltdb->idxptr == NULL ||
156             ltdb->idxptr->itdb == NULL) {
157                 goto normal_index;
158         }
159
160         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
161         key.dsize = strlen((char *)key.dptr);
162
163         rec = tdb_fetch(ltdb->idxptr->itdb, key);
164         if (rec.dptr == NULL) {
165                 goto normal_index;
166         }
167
168         /* we've found an in-memory index entry */
169         list2 = ltdb_index_idxptr(module, rec, true);
170         if (list2 == NULL) {
171                 free(rec.dptr);
172                 return LDB_ERR_OPERATIONS_ERROR;
173         }
174         free(rec.dptr);
175
176         *list = *list2;
177         return LDB_SUCCESS;
178
179 normal_index:
180         msg = ldb_msg_new(list);
181         if (msg == NULL) {
182                 return LDB_ERR_OPERATIONS_ERROR;
183         }
184
185         ret = ltdb_search_dn1(module, dn, msg,
186                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
187                               |LDB_UNPACK_DATA_FLAG_NO_DN);
188         if (ret != LDB_SUCCESS) {
189                 talloc_free(msg);
190                 return ret;
191         }
192
193         /* TODO: check indexing version number */
194
195         el = ldb_msg_find_element(msg, LTDB_IDX);
196         if (!el) {
197                 talloc_free(msg);
198                 return LDB_SUCCESS;
199         }
200
201         /*
202          * we avoid copying the strings by stealing the list.  We have
203          * to steal msg onto el->values (which looks odd) because we
204          * asked for the memory to be allocated on msg, not on each
205          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
206          */
207         talloc_steal(el->values, msg);
208         list->dn = talloc_steal(list, el->values);
209         list->count = el->num_values;
210
211         /* We don't need msg->elements any more */
212         talloc_free(msg->elements);
213         return LDB_SUCCESS;
214 }
215
216
217 /*
218   save a dn_list into a full @IDX style record
219  */
220 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
221                                    struct dn_list *list)
222 {
223         struct ldb_message *msg;
224         int ret;
225
226         if (list->count == 0) {
227                 ret = ltdb_delete_noindex(module, dn);
228                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
229                         return LDB_SUCCESS;
230                 }
231                 return ret;
232         }
233
234         msg = ldb_msg_new(module);
235         if (!msg) {
236                 return ldb_module_oom(module);
237         }
238
239         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
240         if (ret != LDB_SUCCESS) {
241                 talloc_free(msg);
242                 return ldb_module_oom(module);
243         }
244
245         msg->dn = dn;
246         if (list->count > 0) {
247                 struct ldb_message_element *el;
248
249                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
250                 if (ret != LDB_SUCCESS) {
251                         talloc_free(msg);
252                         return ldb_module_oom(module);
253                 }
254                 el->values = list->dn;
255                 el->num_values = list->count;
256         }
257
258         ret = ltdb_store(module, msg, TDB_REPLACE);
259         talloc_free(msg);
260         return ret;
261 }
262
263 /*
264   save a dn_list into the database, in either @IDX or internal format
265  */
266 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
267                               struct dn_list *list)
268 {
269         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
270         TDB_DATA rec, key;
271         int ret;
272         struct dn_list *list2;
273
274         if (ltdb->idxptr == NULL) {
275                 return ltdb_dn_list_store_full(module, dn, list);
276         }
277
278         if (ltdb->idxptr->itdb == NULL) {
279                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
280                 if (ltdb->idxptr->itdb == NULL) {
281                         return LDB_ERR_OPERATIONS_ERROR;
282                 }
283         }
284
285         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
286         key.dsize = strlen((char *)key.dptr);
287
288         rec = tdb_fetch(ltdb->idxptr->itdb, key);
289         if (rec.dptr != NULL) {
290                 list2 = ltdb_index_idxptr(module, rec, false);
291                 if (list2 == NULL) {
292                         free(rec.dptr);
293                         return LDB_ERR_OPERATIONS_ERROR;
294                 }
295                 free(rec.dptr);
296                 list2->dn = talloc_steal(list2, list->dn);
297                 list2->count = list->count;
298                 return LDB_SUCCESS;
299         }
300
301         list2 = talloc(ltdb->idxptr, struct dn_list);
302         if (list2 == NULL) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305         list2->dn = talloc_steal(list2, list->dn);
306         list2->count = list->count;
307
308         rec.dptr = (uint8_t *)&list2;
309         rec.dsize = sizeof(void *);
310
311         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
312         if (ret != 0) {
313                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
314         }
315         return LDB_SUCCESS;
316 }
317
318 /*
319   traverse function for storing the in-memory index entries on disk
320  */
321 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
322 {
323         struct ldb_module *module = state;
324         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
325         struct ldb_dn *dn;
326         struct ldb_context *ldb = ldb_module_get_ctx(module);
327         struct ldb_val v;
328         struct dn_list *list;
329
330         list = ltdb_index_idxptr(module, data, true);
331         if (list == NULL) {
332                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
333                 return -1;
334         }
335
336         v.data = key.dptr;
337         v.length = strnlen((char *)key.dptr, key.dsize);
338
339         dn = ldb_dn_from_ldb_val(module, ldb, &v);
340         if (dn == NULL) {
341                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
342                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
343                 return -1;
344         }
345
346         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
347         talloc_free(dn);
348         if (ltdb->idxptr->error != 0) {
349                 return -1;
350         }
351         return 0;
352 }
353
354 /* cleanup the idxptr mode when transaction commits */
355 int ltdb_index_transaction_commit(struct ldb_module *module)
356 {
357         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
358         int ret;
359
360         struct ldb_context *ldb = ldb_module_get_ctx(module);
361
362         ldb_reset_err_string(ldb);
363
364         if (ltdb->idxptr->itdb) {
365                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
366                 tdb_close(ltdb->idxptr->itdb);
367         }
368
369         ret = ltdb->idxptr->error;
370         if (ret != LDB_SUCCESS) {
371                 if (!ldb_errstring(ldb)) {
372                         ldb_set_errstring(ldb, ldb_strerror(ret));
373                 }
374                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
375         }
376
377         talloc_free(ltdb->idxptr);
378         ltdb->idxptr = NULL;
379         return ret;
380 }
381
382 /* cleanup the idxptr mode when transaction cancels */
383 int ltdb_index_transaction_cancel(struct ldb_module *module)
384 {
385         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
386         if (ltdb->idxptr && ltdb->idxptr->itdb) {
387                 tdb_close(ltdb->idxptr->itdb);
388         }
389         talloc_free(ltdb->idxptr);
390         ltdb->idxptr = NULL;
391         return LDB_SUCCESS;
392 }
393
394
395 /*
396   return the dn key to be used for an index
397   the caller is responsible for freeing
398 */
399 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
400                                      const char *attr, const struct ldb_val *value,
401                                      const struct ldb_schema_attribute **ap)
402 {
403         struct ldb_dn *ret;
404         struct ldb_val v;
405         const struct ldb_schema_attribute *a;
406         char *attr_folded;
407         int r;
408
409         attr_folded = ldb_attr_casefold(ldb, attr);
410         if (!attr_folded) {
411                 return NULL;
412         }
413
414         a = ldb_schema_attribute_by_name(ldb, attr);
415         if (ap) {
416                 *ap = a;
417         }
418         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
419         if (r != LDB_SUCCESS) {
420                 const char *errstr = ldb_errstring(ldb);
421                 /* canonicalisation can be refused. For example,
422                    a attribute that takes wildcards will refuse to canonicalise
423                    if the value contains a wildcard */
424                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
425                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
426                 talloc_free(attr_folded);
427                 return NULL;
428         }
429         if (ldb_should_b64_encode(ldb, &v)) {
430                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
431                 if (!vstr) {
432                         talloc_free(attr_folded);
433                         return NULL;
434                 }
435                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
436                 talloc_free(vstr);
437         } else {
438                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
439         }
440
441         if (v.data != value->data) {
442                 talloc_free(v.data);
443         }
444         talloc_free(attr_folded);
445
446         return ret;
447 }
448
449 /*
450   see if a attribute value is in the list of indexed attributes
451 */
452 static bool ltdb_is_indexed(struct ldb_module *module,
453                             struct ltdb_private *ltdb,
454                             const char *attr)
455 {
456         struct ldb_context *ldb = ldb_module_get_ctx(module);
457         unsigned int i;
458         struct ldb_message_element *el;
459
460         if (ldb->schema.index_handler_override) {
461                 const struct ldb_schema_attribute *a
462                         = ldb_schema_attribute_by_name(ldb, attr);
463
464                 if (a == NULL) {
465                         return false;
466                 }
467
468                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
469                         return true;
470                 } else {
471                         return false;
472                 }
473         }
474
475         if (!ltdb->cache->attribute_indexes) {
476                 return false;
477         }
478
479         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
480         if (el == NULL) {
481                 return false;
482         }
483
484         /* TODO: this is too expensive! At least use a binary search */
485         for (i=0; i<el->num_values; i++) {
486                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
487                         return true;
488                 }
489         }
490         return false;
491 }
492
493 /*
494   in the following logic functions, the return value is treated as
495   follows:
496
497      LDB_SUCCESS: we found some matching index values
498
499      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
500
501      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
502                                we'll need a full search
503  */
504
505 /*
506   return a list of dn's that might match a simple indexed search (an
507   equality search only)
508  */
509 static int ltdb_index_dn_simple(struct ldb_module *module,
510                                 struct ltdb_private *ltdb,
511                                 const struct ldb_parse_tree *tree,
512                                 struct dn_list *list)
513 {
514         struct ldb_context *ldb;
515         struct ldb_dn *dn;
516         int ret;
517
518         ldb = ldb_module_get_ctx(module);
519
520         list->count = 0;
521         list->dn = NULL;
522
523         /* if the attribute isn't in the list of indexed attributes then
524            this node needs a full search */
525         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
526                 return LDB_ERR_OPERATIONS_ERROR;
527         }
528
529         /* the attribute is indexed. Pull the list of DNs that match the
530            search criterion */
531         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
532         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
533
534         ret = ltdb_dn_list_load(module, dn, list);
535         talloc_free(dn);
536         return ret;
537 }
538
539
540 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
541
542 /*
543   return a list of dn's that might match a leaf indexed search
544  */
545 static int ltdb_index_dn_leaf(struct ldb_module *module,
546                               struct ltdb_private *ltdb,
547                               const struct ldb_parse_tree *tree,
548                               struct dn_list *list)
549 {
550         if (ltdb->disallow_dn_filter &&
551             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
552                 /* in AD mode we do not support "(dn=...)" search filters */
553                 list->dn = NULL;
554                 list->count = 0;
555                 return LDB_SUCCESS;
556         }
557         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
558                 list->dn = talloc_array(list, struct ldb_val, 1);
559                 if (list->dn == NULL) {
560                         ldb_module_oom(module);
561                         return LDB_ERR_OPERATIONS_ERROR;
562                 }
563                 list->dn[0] = tree->u.equality.value;
564                 list->count = 1;
565                 return LDB_SUCCESS;
566         }
567         return ltdb_index_dn_simple(module, ltdb, tree, list);
568 }
569
570
571 /*
572   list intersection
573   list = list & list2
574 */
575 static bool list_intersect(struct ldb_context *ldb,
576                            struct dn_list *list, const struct dn_list *list2)
577 {
578         struct dn_list *list3;
579         unsigned int i;
580
581         if (list->count == 0) {
582                 /* 0 & X == 0 */
583                 return true;
584         }
585         if (list2->count == 0) {
586                 /* X & 0 == 0 */
587                 list->count = 0;
588                 list->dn = NULL;
589                 return true;
590         }
591
592         /* the indexing code is allowed to return a longer list than
593            what really matches, as all results are filtered by the
594            full expression at the end - this shortcut avoids a lot of
595            work in some cases */
596         if (list->count < 2 && list2->count > 10) {
597                 return true;
598         }
599         if (list2->count < 2 && list->count > 10) {
600                 list->count = list2->count;
601                 list->dn = list2->dn;
602                 /* note that list2 may not be the parent of list2->dn,
603                    as list2->dn may be owned by ltdb->idxptr. In that
604                    case we expect this reparent call to fail, which is
605                    OK */
606                 talloc_reparent(list2, list, list2->dn);
607                 return true;
608         }
609
610         list3 = talloc_zero(list, struct dn_list);
611         if (list3 == NULL) {
612                 return false;
613         }
614
615         list3->dn = talloc_array(list3, struct ldb_val, list->count);
616         if (!list3->dn) {
617                 talloc_free(list3);
618                 return false;
619         }
620         list3->count = 0;
621
622         for (i=0;i<list->count;i++) {
623                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
624                         list3->dn[list3->count] = list->dn[i];
625                         list3->count++;
626                 }
627         }
628
629         list->dn = talloc_steal(list, list3->dn);
630         list->count = list3->count;
631         talloc_free(list3);
632
633         return true;
634 }
635
636
637 /*
638   list union
639   list = list | list2
640 */
641 static bool list_union(struct ldb_context *ldb,
642                        struct dn_list *list, const struct dn_list *list2)
643 {
644         struct ldb_val *dn3;
645
646         if (list2->count == 0) {
647                 /* X | 0 == X */
648                 return true;
649         }
650
651         if (list->count == 0) {
652                 /* 0 | X == X */
653                 list->count = list2->count;
654                 list->dn = list2->dn;
655                 /* note that list2 may not be the parent of list2->dn,
656                    as list2->dn may be owned by ltdb->idxptr. In that
657                    case we expect this reparent call to fail, which is
658                    OK */
659                 talloc_reparent(list2, list, list2->dn);
660                 return true;
661         }
662
663         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
664         if (!dn3) {
665                 ldb_oom(ldb);
666                 return false;
667         }
668
669         /* we allow for duplicates here, and get rid of them later */
670         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
671         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
672
673         list->dn = dn3;
674         list->count += list2->count;
675
676         return true;
677 }
678
679 static int ltdb_index_dn(struct ldb_module *module,
680                          struct ltdb_private *ltdb,
681                          const struct ldb_parse_tree *tree,
682                          struct dn_list *list);
683
684
685 /*
686   process an OR list (a union)
687  */
688 static int ltdb_index_dn_or(struct ldb_module *module,
689                             struct ltdb_private *ltdb,
690                             const struct ldb_parse_tree *tree,
691                             struct dn_list *list)
692 {
693         struct ldb_context *ldb;
694         unsigned int i;
695
696         ldb = ldb_module_get_ctx(module);
697
698         list->dn = NULL;
699         list->count = 0;
700
701         for (i=0; i<tree->u.list.num_elements; i++) {
702                 struct dn_list *list2;
703                 int ret;
704
705                 list2 = talloc_zero(list, struct dn_list);
706                 if (list2 == NULL) {
707                         return LDB_ERR_OPERATIONS_ERROR;
708                 }
709
710                 ret = ltdb_index_dn(module, ltdb,
711                                     tree->u.list.elements[i], list2);
712
713                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
714                         /* X || 0 == X */
715                         talloc_free(list2);
716                         continue;
717                 }
718
719                 if (ret != LDB_SUCCESS) {
720                         /* X || * == * */
721                         talloc_free(list2);
722                         return ret;
723                 }
724
725                 if (!list_union(ldb, list, list2)) {
726                         talloc_free(list2);
727                         return LDB_ERR_OPERATIONS_ERROR;
728                 }
729         }
730
731         if (list->count == 0) {
732                 return LDB_ERR_NO_SUCH_OBJECT;
733         }
734
735         return LDB_SUCCESS;
736 }
737
738
739 /*
740   NOT an index results
741  */
742 static int ltdb_index_dn_not(struct ldb_module *module,
743                              struct ltdb_private *ltdb,
744                              const struct ldb_parse_tree *tree,
745                              struct dn_list *list)
746 {
747         /* the only way to do an indexed not would be if we could
748            negate the not via another not or if we knew the total
749            number of database elements so we could know that the
750            existing expression covered the whole database.
751
752            instead, we just give up, and rely on a full index scan
753            (unless an outer & manages to reduce the list)
754         */
755         return LDB_ERR_OPERATIONS_ERROR;
756 }
757
758
759 static bool ltdb_index_unique(struct ldb_context *ldb,
760                               const char *attr)
761 {
762         const struct ldb_schema_attribute *a;
763         a = ldb_schema_attribute_by_name(ldb, attr);
764         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
765                 return true;
766         }
767         return false;
768 }
769
770 /*
771   process an AND expression (intersection)
772  */
773 static int ltdb_index_dn_and(struct ldb_module *module,
774                              struct ltdb_private *ltdb,
775                              const struct ldb_parse_tree *tree,
776                              struct dn_list *list)
777 {
778         struct ldb_context *ldb;
779         unsigned int i;
780         bool found;
781
782         ldb = ldb_module_get_ctx(module);
783
784         list->dn = NULL;
785         list->count = 0;
786
787         /* in the first pass we only look for unique simple
788            equality tests, in the hope of avoiding having to look
789            at any others */
790         for (i=0; i<tree->u.list.num_elements; i++) {
791                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
792                 int ret;
793
794                 if (subtree->operation != LDB_OP_EQUALITY ||
795                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
796                         continue;
797                 }
798
799                 ret = ltdb_index_dn(module, ltdb, subtree, list);
800                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
801                         /* 0 && X == 0 */
802                         return LDB_ERR_NO_SUCH_OBJECT;
803                 }
804                 if (ret == LDB_SUCCESS) {
805                         /* a unique index match means we can
806                          * stop. Note that we don't care if we return
807                          * a few too many objects, due to later
808                          * filtering */
809                         return LDB_SUCCESS;
810                 }
811         }
812
813         /* now do a full intersection */
814         found = false;
815
816         for (i=0; i<tree->u.list.num_elements; i++) {
817                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
818                 struct dn_list *list2;
819                 int ret;
820
821                 list2 = talloc_zero(list, struct dn_list);
822                 if (list2 == NULL) {
823                         return ldb_module_oom(module);
824                 }
825
826                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
827
828                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
829                         /* X && 0 == 0 */
830                         list->dn = NULL;
831                         list->count = 0;
832                         talloc_free(list2);
833                         return LDB_ERR_NO_SUCH_OBJECT;
834                 }
835
836                 if (ret != LDB_SUCCESS) {
837                         /* this didn't adding anything */
838                         talloc_free(list2);
839                         continue;
840                 }
841
842                 if (!found) {
843                         talloc_reparent(list2, list, list->dn);
844                         list->dn = list2->dn;
845                         list->count = list2->count;
846                         found = true;
847                 } else if (!list_intersect(ldb, list, list2)) {
848                         talloc_free(list2);
849                         return LDB_ERR_OPERATIONS_ERROR;
850                 }
851
852                 if (list->count == 0) {
853                         list->dn = NULL;
854                         return LDB_ERR_NO_SUCH_OBJECT;
855                 }
856
857                 if (list->count < 2) {
858                         /* it isn't worth loading the next part of the tree */
859                         return LDB_SUCCESS;
860                 }
861         }
862
863         if (!found) {
864                 /* none of the attributes were indexed */
865                 return LDB_ERR_OPERATIONS_ERROR;
866         }
867
868         return LDB_SUCCESS;
869 }
870
871 /*
872   return a list of matching objects using a one-level index
873  */
874 static int ltdb_index_dn_one(struct ldb_module *module,
875                              struct ldb_dn *parent_dn,
876                              struct dn_list *list)
877 {
878         struct ldb_context *ldb;
879         struct ldb_dn *key;
880         struct ldb_val val;
881         int ret;
882
883         ldb = ldb_module_get_ctx(module);
884
885         /* work out the index key from the parent DN */
886         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
887         val.length = strlen((char *)val.data);
888         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
889         if (!key) {
890                 ldb_oom(ldb);
891                 return LDB_ERR_OPERATIONS_ERROR;
892         }
893
894         ret = ltdb_dn_list_load(module, key, list);
895         talloc_free(key);
896         if (ret != LDB_SUCCESS) {
897                 return ret;
898         }
899
900         if (list->count == 0) {
901                 return LDB_ERR_NO_SUCH_OBJECT;
902         }
903
904         return LDB_SUCCESS;
905 }
906
907 /*
908   return a list of dn's that might match a indexed search or
909   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
910  */
911 static int ltdb_index_dn(struct ldb_module *module,
912                          struct ltdb_private *ltdb,
913                          const struct ldb_parse_tree *tree,
914                          struct dn_list *list)
915 {
916         int ret = LDB_ERR_OPERATIONS_ERROR;
917
918         switch (tree->operation) {
919         case LDB_OP_AND:
920                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
921                 break;
922
923         case LDB_OP_OR:
924                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
925                 break;
926
927         case LDB_OP_NOT:
928                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
929                 break;
930
931         case LDB_OP_EQUALITY:
932                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
933                 break;
934
935         case LDB_OP_SUBSTRING:
936         case LDB_OP_GREATER:
937         case LDB_OP_LESS:
938         case LDB_OP_PRESENT:
939         case LDB_OP_APPROX:
940         case LDB_OP_EXTENDED:
941                 /* we can't index with fancy bitops yet */
942                 ret = LDB_ERR_OPERATIONS_ERROR;
943                 break;
944         }
945
946         return ret;
947 }
948
949 /*
950   filter a candidate dn_list from an indexed search into a set of results
951   extracting just the given attributes
952 */
953 static int ltdb_index_filter(const struct dn_list *dn_list,
954                              struct ltdb_context *ac,
955                              uint32_t *match_count)
956 {
957         struct ldb_context *ldb;
958         struct ldb_message *msg;
959         struct ldb_message *filtered_msg;
960         unsigned int i;
961
962         ldb = ldb_module_get_ctx(ac->module);
963
964         for (i = 0; i < dn_list->count; i++) {
965                 struct ldb_dn *dn;
966                 int ret;
967                 bool matched;
968
969                 msg = ldb_msg_new(ac);
970                 if (!msg) {
971                         return LDB_ERR_OPERATIONS_ERROR;
972                 }
973
974                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
975                 if (dn == NULL) {
976                         talloc_free(msg);
977                         return LDB_ERR_OPERATIONS_ERROR;
978                 }
979
980                 ret = ltdb_search_dn1(ac->module, dn, msg,
981                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
982                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
983                 talloc_free(dn);
984                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
985                         /* the record has disappeared? yes, this can happen */
986                         talloc_free(msg);
987                         continue;
988                 }
989
990                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
991                         /* an internal error */
992                         talloc_free(msg);
993                         return LDB_ERR_OPERATIONS_ERROR;
994                 }
995
996                 ret = ldb_match_msg_error(ldb, msg,
997                                           ac->tree, ac->base, ac->scope, &matched);
998                 if (ret != LDB_SUCCESS) {
999                         talloc_free(msg);
1000                         return ret;
1001                 }
1002                 if (!matched) {
1003                         talloc_free(msg);
1004                         continue;
1005                 }
1006
1007                 /* filter the attributes that the user wants */
1008                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1009
1010                 talloc_free(msg);
1011
1012                 if (ret == -1) {
1013                         return LDB_ERR_OPERATIONS_ERROR;
1014                 }
1015
1016                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1017                 if (ret != LDB_SUCCESS) {
1018                         /* Regardless of success or failure, the msg
1019                          * is the callbacks responsiblity, and should
1020                          * not be talloc_free()'ed */
1021                         ac->request_terminated = true;
1022                         return ret;
1023                 }
1024
1025                 (*match_count)++;
1026         }
1027
1028         return LDB_SUCCESS;
1029 }
1030
1031 /*
1032   remove any duplicated entries in a indexed result
1033  */
1034 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1035 {
1036         unsigned int i, new_count;
1037
1038         if (list->count < 2) {
1039                 return;
1040         }
1041
1042         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
1043
1044         new_count = 1;
1045         for (i=1; i<list->count; i++) {
1046                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
1047                         if (new_count != i) {
1048                                 list->dn[new_count] = list->dn[i];
1049                         }
1050                         new_count++;
1051                 }
1052         }
1053
1054         list->count = new_count;
1055 }
1056
1057 /*
1058   search the database with a LDAP-like expression using indexes
1059   returns -1 if an indexed search is not possible, in which
1060   case the caller should call ltdb_search_full()
1061 */
1062 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1063 {
1064         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1065         struct dn_list *dn_list;
1066         int ret;
1067
1068         /* see if indexing is enabled */
1069         if (!ltdb->cache->attribute_indexes &&
1070             !ltdb->cache->one_level_indexes &&
1071             ac->scope != LDB_SCOPE_BASE) {
1072                 /* fallback to a full search */
1073                 return LDB_ERR_OPERATIONS_ERROR;
1074         }
1075
1076         dn_list = talloc_zero(ac, struct dn_list);
1077         if (dn_list == NULL) {
1078                 return ldb_module_oom(ac->module);
1079         }
1080
1081         switch (ac->scope) {
1082         case LDB_SCOPE_BASE:
1083                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1084                 if (dn_list->dn == NULL) {
1085                         talloc_free(dn_list);
1086                         return ldb_module_oom(ac->module);
1087                 }
1088                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1089                 if (dn_list->dn[0].data == NULL) {
1090                         talloc_free(dn_list);
1091                         return ldb_module_oom(ac->module);
1092                 }
1093                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1094                 dn_list->count = 1;
1095                 break;
1096
1097         case LDB_SCOPE_ONELEVEL:
1098                 if (!ltdb->cache->one_level_indexes) {
1099                         talloc_free(dn_list);
1100                         return LDB_ERR_OPERATIONS_ERROR;
1101                 }
1102                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1103                 if (ret != LDB_SUCCESS) {
1104                         talloc_free(dn_list);
1105                         return ret;
1106                 }
1107                 break;
1108
1109         case LDB_SCOPE_SUBTREE:
1110         case LDB_SCOPE_DEFAULT:
1111                 if (!ltdb->cache->attribute_indexes) {
1112                         talloc_free(dn_list);
1113                         return LDB_ERR_OPERATIONS_ERROR;
1114                 }
1115                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1116                 if (ret != LDB_SUCCESS) {
1117                         talloc_free(dn_list);
1118                         return ret;
1119                 }
1120                 ltdb_dn_list_remove_duplicates(dn_list);
1121                 break;
1122         }
1123
1124         ret = ltdb_index_filter(dn_list, ac, match_count);
1125         talloc_free(dn_list);
1126         return ret;
1127 }
1128
1129 /**
1130  * @brief Add a DN in the index list of a given attribute name/value pair
1131  *
1132  * This function will add the DN in the index list for the index for
1133  * the given attribute name and value.
1134  *
1135  * @param[in]  module       A ldb_module structure
1136  *
1137  * @param[in]  dn           The string representation of the DN as it
1138  *                          will be stored in the index entry
1139  *
1140  * @param[in]  el           A ldb_message_element array, one of the entry
1141  *                          referred by the v_idx is the attribute name and
1142  *                          value pair which will be used to construct the
1143  *                          index name
1144  *
1145  * @param[in]  v_idx        The index of element in the el array to use
1146  *
1147  * @return                  An ldb error code
1148  */
1149 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1150                            struct ldb_message_element *el, int v_idx,
1151                            bool is_new)
1152 {
1153         struct ldb_context *ldb;
1154         struct ldb_dn *dn_key;
1155         int ret;
1156         const struct ldb_schema_attribute *a;
1157         struct dn_list *list;
1158         unsigned alloc_len;
1159
1160         ldb = ldb_module_get_ctx(module);
1161
1162         list = talloc_zero(module, struct dn_list);
1163         if (list == NULL) {
1164                 return LDB_ERR_OPERATIONS_ERROR;
1165         }
1166
1167         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1168         if (!dn_key) {
1169                 talloc_free(list);
1170                 return LDB_ERR_OPERATIONS_ERROR;
1171         }
1172         talloc_steal(list, dn_key);
1173
1174         ret = ltdb_dn_list_load(module, dn_key, list);
1175         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1176                 talloc_free(list);
1177                 return ret;
1178         }
1179
1180         if (list->count > 0 &&
1181             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1182                 /*
1183                  * We do not want to print info about a possibly
1184                  * confidential DN that the conflict was with in the
1185                  * user-visible error string
1186                  */
1187                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1188                           __location__ ": unique index violation on %s in %s, "
1189                           "conficts with %*.*s in %s",
1190                           el->name, dn,
1191                           (int)list->dn[0].length,
1192                           (int)list->dn[0].length,
1193                           list->dn[0].data,
1194                           ldb_dn_get_linearized(dn_key));
1195                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1196                                        el->name, dn);
1197                 talloc_free(list);
1198                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1199         }
1200
1201         /* If we are doing an ADD, then this can not already be in the index,
1202            as it was not already in the database, and this has already been
1203            checked because the store succeeded */
1204         if (! is_new) {
1205                 if (ltdb_dn_list_find_str(list, dn) != -1) {
1206                         talloc_free(list);
1207                         return LDB_SUCCESS;
1208                 }
1209         }
1210
1211         /* overallocate the list a bit, to reduce the number of
1212          * realloc trigered copies */
1213         alloc_len = ((list->count+1)+7) & ~7;
1214         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1215         if (list->dn == NULL) {
1216                 talloc_free(list);
1217                 return LDB_ERR_OPERATIONS_ERROR;
1218         }
1219         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1220         list->dn[list->count].length = strlen(dn);
1221         list->count++;
1222
1223         ret = ltdb_dn_list_store(module, dn_key, list);
1224
1225         talloc_free(list);
1226
1227         return ret;
1228 }
1229
1230 /*
1231   add index entries for one elements in a message
1232  */
1233 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1234                              struct ldb_message_element *el, bool is_new)
1235 {
1236         unsigned int i;
1237         for (i = 0; i < el->num_values; i++) {
1238                 int ret = ltdb_index_add1(module, dn, el, i, is_new);
1239                 if (ret != LDB_SUCCESS) {
1240                         return ret;
1241                 }
1242         }
1243
1244         return LDB_SUCCESS;
1245 }
1246
1247 /*
1248   add index entries for all elements in a message
1249  */
1250 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1251                               struct ldb_message_element *elements,
1252                               unsigned int num_el,
1253                               bool is_new)
1254 {
1255         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1256         unsigned int i;
1257
1258         if (dn[0] == '@') {
1259                 return LDB_SUCCESS;
1260         }
1261
1262         if (!ltdb->cache->attribute_indexes) {
1263                 /* no indexed fields */
1264                 return LDB_SUCCESS;
1265         }
1266
1267         for (i = 0; i < num_el; i++) {
1268                 int ret;
1269                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1270                         continue;
1271                 }
1272                 ret = ltdb_index_add_el(module, dn, &elements[i], is_new);
1273                 if (ret != LDB_SUCCESS) {
1274                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1275                         ldb_asprintf_errstring(ldb,
1276                                                __location__ ": Failed to re-index %s in %s - %s",
1277                                                elements[i].name, dn, ldb_errstring(ldb));
1278                         return ret;
1279                 }
1280         }
1281
1282         return LDB_SUCCESS;
1283 }
1284
1285
1286 /*
1287   insert a one level index for a message
1288 */
1289 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1290 {
1291         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1292         struct ldb_message_element el;
1293         struct ldb_val val;
1294         struct ldb_dn *pdn;
1295         const char *dn;
1296         int ret;
1297
1298         /* We index for ONE Level only if requested */
1299         if (!ltdb->cache->one_level_indexes) {
1300                 return LDB_SUCCESS;
1301         }
1302
1303         pdn = ldb_dn_get_parent(module, msg->dn);
1304         if (pdn == NULL) {
1305                 return LDB_ERR_OPERATIONS_ERROR;
1306         }
1307
1308         dn = ldb_dn_get_linearized(msg->dn);
1309         if (dn == NULL) {
1310                 talloc_free(pdn);
1311                 return LDB_ERR_OPERATIONS_ERROR;
1312         }
1313
1314         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1315         if (val.data == NULL) {
1316                 talloc_free(pdn);
1317                 return LDB_ERR_OPERATIONS_ERROR;
1318         }
1319
1320         val.length = strlen((char *)val.data);
1321         el.name = LTDB_IDXONE;
1322         el.values = &val;
1323         el.num_values = 1;
1324
1325         if (add) {
1326                 ret = ltdb_index_add1(module, dn, &el, 0, add);
1327         } else { /* delete */
1328                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1329         }
1330
1331         talloc_free(pdn);
1332
1333         return ret;
1334 }
1335
1336 /*
1337   add the index entries for a new element in a record
1338   The caller guarantees that these element values are not yet indexed
1339 */
1340 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1341                            struct ldb_message_element *el)
1342 {
1343         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1344         if (ldb_dn_is_special(dn)) {
1345                 return LDB_SUCCESS;
1346         }
1347         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1348                 return LDB_SUCCESS;
1349         }
1350         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el, true);
1351 }
1352
1353 /*
1354   add the index entries for a new record
1355 */
1356 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1357 {
1358         const char *dn;
1359         int ret;
1360
1361         if (ldb_dn_is_special(msg->dn)) {
1362                 return LDB_SUCCESS;
1363         }
1364
1365         dn = ldb_dn_get_linearized(msg->dn);
1366         if (dn == NULL) {
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1371                                  true);
1372         if (ret != LDB_SUCCESS) {
1373                 return ret;
1374         }
1375
1376         return ltdb_index_onelevel(module, msg, 1);
1377 }
1378
1379
1380 /*
1381   delete an index entry for one message element
1382 */
1383 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1384                          struct ldb_message_element *el, unsigned int v_idx)
1385 {
1386         struct ldb_context *ldb;
1387         struct ldb_dn *dn_key;
1388         const char *dn_str;
1389         int ret, i;
1390         unsigned int j;
1391         struct dn_list *list;
1392
1393         ldb = ldb_module_get_ctx(module);
1394
1395         dn_str = ldb_dn_get_linearized(dn);
1396         if (dn_str == NULL) {
1397                 return LDB_ERR_OPERATIONS_ERROR;
1398         }
1399
1400         if (dn_str[0] == '@') {
1401                 return LDB_SUCCESS;
1402         }
1403
1404         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1405         if (!dn_key) {
1406                 return LDB_ERR_OPERATIONS_ERROR;
1407         }
1408
1409         list = talloc_zero(dn_key, struct dn_list);
1410         if (list == NULL) {
1411                 talloc_free(dn_key);
1412                 return LDB_ERR_OPERATIONS_ERROR;
1413         }
1414
1415         ret = ltdb_dn_list_load(module, dn_key, list);
1416         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1417                 /* it wasn't indexed. Did we have an earlier error? If we did then
1418                    its gone now */
1419                 talloc_free(dn_key);
1420                 return LDB_SUCCESS;
1421         }
1422
1423         if (ret != LDB_SUCCESS) {
1424                 talloc_free(dn_key);
1425                 return ret;
1426         }
1427
1428         i = ltdb_dn_list_find_str(list, dn_str);
1429         if (i == -1) {
1430                 /* nothing to delete */
1431                 talloc_free(dn_key);
1432                 return LDB_SUCCESS;
1433         }
1434
1435         j = (unsigned int) i;
1436         if (j != list->count - 1) {
1437                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1438         }
1439         list->count--;
1440         if (list->count == 0) {
1441                 talloc_free(list->dn);
1442                 list->dn = NULL;
1443         } else {
1444                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1445         }
1446
1447         ret = ltdb_dn_list_store(module, dn_key, list);
1448
1449         talloc_free(dn_key);
1450
1451         return ret;
1452 }
1453
1454 /*
1455   delete the index entries for a element
1456   return -1 on failure
1457 */
1458 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1459                            struct ldb_message_element *el)
1460 {
1461         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1462         const char *dn_str;
1463         int ret;
1464         unsigned int i;
1465
1466         if (!ltdb->cache->attribute_indexes) {
1467                 /* no indexed fields */
1468                 return LDB_SUCCESS;
1469         }
1470
1471         dn_str = ldb_dn_get_linearized(dn);
1472         if (dn_str == NULL) {
1473                 return LDB_ERR_OPERATIONS_ERROR;
1474         }
1475
1476         if (dn_str[0] == '@') {
1477                 return LDB_SUCCESS;
1478         }
1479
1480         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1481                 return LDB_SUCCESS;
1482         }
1483         for (i = 0; i < el->num_values; i++) {
1484                 ret = ltdb_index_del_value(module, dn, el, i);
1485                 if (ret != LDB_SUCCESS) {
1486                         return ret;
1487                 }
1488         }
1489
1490         return LDB_SUCCESS;
1491 }
1492
1493 /*
1494   delete the index entries for a record
1495   return -1 on failure
1496 */
1497 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1498 {
1499         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1500         int ret;
1501         unsigned int i;
1502
1503         if (ldb_dn_is_special(msg->dn)) {
1504                 return LDB_SUCCESS;
1505         }
1506
1507         ret = ltdb_index_onelevel(module, msg, 0);
1508         if (ret != LDB_SUCCESS) {
1509                 return ret;
1510         }
1511
1512         if (!ltdb->cache->attribute_indexes) {
1513                 /* no indexed fields */
1514                 return LDB_SUCCESS;
1515         }
1516
1517         for (i = 0; i < msg->num_elements; i++) {
1518                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1519                 if (ret != LDB_SUCCESS) {
1520                         return ret;
1521                 }
1522         }
1523
1524         return LDB_SUCCESS;
1525 }
1526
1527
1528 /*
1529   traversal function that deletes all @INDEX records
1530 */
1531 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1532 {
1533         struct ldb_module *module = state;
1534         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1535         const char *dnstr = "DN=" LTDB_INDEX ":";
1536         struct dn_list list;
1537         struct ldb_dn *dn;
1538         struct ldb_val v;
1539         int ret;
1540
1541         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1542                 return 0;
1543         }
1544         /* we need to put a empty list in the internal tdb for this
1545          * index entry */
1546         list.dn = NULL;
1547         list.count = 0;
1548
1549         /* the offset of 3 is to remove the DN= prefix. */
1550         v.data = key.dptr + 3;
1551         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1552
1553         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1554         ret = ltdb_dn_list_store(module, dn, &list);
1555         if (ret != LDB_SUCCESS) {
1556                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1557                                        "Unable to store null index for %s\n",
1558                                                 ldb_dn_get_linearized(dn));
1559                 talloc_free(dn);
1560                 return -1;
1561         }
1562         talloc_free(dn);
1563         return 0;
1564 }
1565
1566 struct ltdb_reindex_context {
1567         struct ldb_module *module;
1568         int error;
1569 };
1570
1571 /*
1572   traversal function that adds @INDEX records during a re index
1573 */
1574 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1575 {
1576         struct ldb_context *ldb;
1577         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1578         struct ldb_module *module = ctx->module;
1579         struct ldb_message *msg;
1580         unsigned int nb_elements_in_db;
1581         const struct ldb_val val = {
1582                 .data = data.dptr,
1583                 .length = data.dsize,
1584         };
1585         const char *dn = NULL;
1586         int ret;
1587         TDB_DATA key2;
1588
1589         ldb = ldb_module_get_ctx(module);
1590
1591         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1592             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1593                 return 0;
1594         }
1595
1596         msg = ldb_msg_new(module);
1597         if (msg == NULL) {
1598                 return -1;
1599         }
1600
1601         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1602                                                    msg,
1603                                                    NULL, 0,
1604                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1605                                                    &nb_elements_in_db);
1606         if (ret != 0) {
1607                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1608                                                 ldb_dn_get_linearized(msg->dn));
1609                 talloc_free(msg);
1610                 return -1;
1611         }
1612
1613         /* check if the DN key has changed, perhaps due to the
1614            case insensitivity of an element changing */
1615         key2 = ltdb_key(module, msg->dn);
1616         if (key2.dptr == NULL) {
1617                 /* probably a corrupt record ... darn */
1618                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1619                                                 ldb_dn_get_linearized(msg->dn));
1620                 talloc_free(msg);
1621                 return 0;
1622         }
1623         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1624                 tdb_delete(tdb, key);
1625                 tdb_store(tdb, key2, data, 0);
1626         }
1627         talloc_free(key2.dptr);
1628
1629         if (msg->dn == NULL) {
1630                 dn = (char *)key.dptr + 3;
1631         } else {
1632                 dn = ldb_dn_get_linearized(msg->dn);
1633         }
1634
1635         ret = ltdb_index_onelevel(module, msg, 1);
1636         if (ret != LDB_SUCCESS) {
1637                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1638                           "Adding special ONE LEVEL index failed (%s)!",
1639                                                 ldb_dn_get_linearized(msg->dn));
1640                 talloc_free(msg);
1641                 return -1;
1642         }
1643
1644         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1645                                  false);
1646
1647         if (ret != LDB_SUCCESS) {
1648                 ctx->error = ret;
1649                 talloc_free(msg);
1650                 return -1;
1651         }
1652
1653         talloc_free(msg);
1654
1655         return 0;
1656 }
1657
1658 /*
1659   force a complete reindex of the database
1660 */
1661 int ltdb_reindex(struct ldb_module *module)
1662 {
1663         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1664         int ret;
1665         struct ltdb_reindex_context ctx;
1666
1667         if (ltdb_cache_reload(module) != 0) {
1668                 return LDB_ERR_OPERATIONS_ERROR;
1669         }
1670
1671         /*
1672          * Ensure we read (and so remove) the entries from the real
1673          * DB, no values stored so far are any use as we want to do a
1674          * re-index
1675          */
1676         ltdb_index_transaction_cancel(module);
1677
1678         ret = ltdb_index_transaction_start(module);
1679         if (ret != LDB_SUCCESS) {
1680                 return ret;
1681         }
1682
1683         /* first traverse the database deleting any @INDEX records by
1684          * putting NULL entries in the in-memory tdb
1685          */
1686         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1687         if (ret < 0) {
1688                 return LDB_ERR_OPERATIONS_ERROR;
1689         }
1690
1691         /* if we don't have indexes we have nothing todo */
1692         if (!ltdb->cache->attribute_indexes) {
1693                 return LDB_SUCCESS;
1694         }
1695
1696         ctx.module = module;
1697         ctx.error = 0;
1698
1699         /* now traverse adding any indexes for normal LDB records */
1700         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1701         if (ret < 0) {
1702                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1703                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1704                 return LDB_ERR_OPERATIONS_ERROR;
1705         }
1706
1707         if (ctx.error != LDB_SUCCESS) {
1708                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1709                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1710                 return ctx.error;
1711         }
1712
1713         return LDB_SUCCESS;
1714 }