ldb_tdb: Check for memory allocation failure in ltdb_index_transaction_start()
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         if (ltdb->idxptr == NULL) {
58                 return ldb_oom(ldb_module_get_ctx(module));
59         }
60
61         return LDB_SUCCESS;
62 }
63
64 /* compare two DN entries in a dn_list. Take account of possible
65  * differences in string termination */
66 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
67 {
68         if (v1->length > v2->length && v1->data[v2->length] != 0) {
69                 return -1;
70         }
71         if (v1->length < v2->length && v2->data[v1->length] != 0) {
72                 return 1;
73         }
74         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
75 }
76
77
78 /*
79   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
80   comparison with the dn returns -1 if not found
81  */
82 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
83 {
84         unsigned int i;
85         for (i=0; i<list->count; i++) {
86                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
87         }
88         return -1;
89 }
90
91 /*
92   find a entry in a dn_list. Uses a case sensitive comparison with the dn
93   returns -1 if not found
94  */
95 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
96 {
97         struct ldb_val v;
98         v.data = discard_const_p(unsigned char, dn);
99         v.length = strlen(dn);
100         return ltdb_dn_list_find_val(list, &v);
101 }
102
103 /*
104   this is effectively a cast function, but with lots of paranoia
105   checks and also copes with CPUs that are fussy about pointer
106   alignment
107  */
108 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
109 {
110         struct dn_list *list;
111         if (rec.dsize != sizeof(void *)) {
112                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
113                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
114                 return NULL;
115         }
116         /* note that we can't just use a cast here, as rec.dptr may
117            not be aligned sufficiently for a pointer. A cast would cause
118            platforms like some ARM CPUs to crash */
119         memcpy(&list, rec.dptr, sizeof(void *));
120         list = talloc_get_type(list, struct dn_list);
121         if (list == NULL) {
122                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
123                                        "Bad type '%s' for idxptr",
124                                        talloc_get_name(list));
125                 return NULL;
126         }
127         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
128                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
129                                        "Bad parent '%s' for idxptr",
130                                        talloc_get_name(talloc_parent(list->dn)));
131                 return NULL;
132         }
133         return list;
134 }
135
136 /*
137   return the @IDX list in an index entry for a dn as a
138   struct dn_list
139  */
140 static int ltdb_dn_list_load(struct ldb_module *module,
141                              struct ldb_dn *dn, struct dn_list *list)
142 {
143         struct ldb_message *msg;
144         int ret;
145         struct ldb_message_element *el;
146         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
147         TDB_DATA rec;
148         struct dn_list *list2;
149         TDB_DATA key;
150
151         list->dn = NULL;
152         list->count = 0;
153
154         /* see if we have any in-memory index entries */
155         if (ltdb->idxptr == NULL ||
156             ltdb->idxptr->itdb == NULL) {
157                 goto normal_index;
158         }
159
160         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
161         key.dsize = strlen((char *)key.dptr);
162
163         rec = tdb_fetch(ltdb->idxptr->itdb, key);
164         if (rec.dptr == NULL) {
165                 goto normal_index;
166         }
167
168         /* we've found an in-memory index entry */
169         list2 = ltdb_index_idxptr(module, rec, true);
170         if (list2 == NULL) {
171                 free(rec.dptr);
172                 return LDB_ERR_OPERATIONS_ERROR;
173         }
174         free(rec.dptr);
175
176         *list = *list2;
177         return LDB_SUCCESS;
178
179 normal_index:
180         msg = ldb_msg_new(list);
181         if (msg == NULL) {
182                 return LDB_ERR_OPERATIONS_ERROR;
183         }
184
185         ret = ltdb_search_dn1(module, dn, msg,
186                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
187                               |LDB_UNPACK_DATA_FLAG_NO_DN);
188         if (ret != LDB_SUCCESS) {
189                 talloc_free(msg);
190                 return ret;
191         }
192
193         /* TODO: check indexing version number */
194
195         el = ldb_msg_find_element(msg, LTDB_IDX);
196         if (!el) {
197                 talloc_free(msg);
198                 return LDB_SUCCESS;
199         }
200
201         /*
202          * we avoid copying the strings by stealing the list.  We have
203          * to steal msg onto el->values (which looks odd) because we
204          * asked for the memory to be allocated on msg, not on each
205          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
206          */
207         talloc_steal(el->values, msg);
208         list->dn = talloc_steal(list, el->values);
209         list->count = el->num_values;
210
211         /* We don't need msg->elements any more */
212         talloc_free(msg->elements);
213         return LDB_SUCCESS;
214 }
215
216
217 /*
218   save a dn_list into a full @IDX style record
219  */
220 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
221                                    struct dn_list *list)
222 {
223         struct ldb_message *msg;
224         int ret;
225
226         if (list->count == 0) {
227                 ret = ltdb_delete_noindex(module, dn);
228                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
229                         return LDB_SUCCESS;
230                 }
231                 return ret;
232         }
233
234         msg = ldb_msg_new(module);
235         if (!msg) {
236                 return ldb_module_oom(module);
237         }
238
239         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
240         if (ret != LDB_SUCCESS) {
241                 talloc_free(msg);
242                 return ldb_module_oom(module);
243         }
244
245         msg->dn = dn;
246         if (list->count > 0) {
247                 struct ldb_message_element *el;
248
249                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
250                 if (ret != LDB_SUCCESS) {
251                         talloc_free(msg);
252                         return ldb_module_oom(module);
253                 }
254                 el->values = list->dn;
255                 el->num_values = list->count;
256         }
257
258         ret = ltdb_store(module, msg, TDB_REPLACE);
259         talloc_free(msg);
260         return ret;
261 }
262
263 /*
264   save a dn_list into the database, in either @IDX or internal format
265  */
266 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
267                               struct dn_list *list)
268 {
269         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
270         TDB_DATA rec, key;
271         int ret;
272         struct dn_list *list2;
273
274         if (ltdb->idxptr == NULL) {
275                 return ltdb_dn_list_store_full(module, dn, list);
276         }
277
278         if (ltdb->idxptr->itdb == NULL) {
279                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
280                 if (ltdb->idxptr->itdb == NULL) {
281                         return LDB_ERR_OPERATIONS_ERROR;
282                 }
283         }
284
285         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
286         key.dsize = strlen((char *)key.dptr);
287
288         rec = tdb_fetch(ltdb->idxptr->itdb, key);
289         if (rec.dptr != NULL) {
290                 list2 = ltdb_index_idxptr(module, rec, false);
291                 if (list2 == NULL) {
292                         free(rec.dptr);
293                         return LDB_ERR_OPERATIONS_ERROR;
294                 }
295                 free(rec.dptr);
296                 list2->dn = talloc_steal(list2, list->dn);
297                 list2->count = list->count;
298                 return LDB_SUCCESS;
299         }
300
301         list2 = talloc(ltdb->idxptr, struct dn_list);
302         if (list2 == NULL) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305         list2->dn = talloc_steal(list2, list->dn);
306         list2->count = list->count;
307
308         rec.dptr = (uint8_t *)&list2;
309         rec.dsize = sizeof(void *);
310
311         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
312         if (ret != 0) {
313                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
314         }
315         return LDB_SUCCESS;
316 }
317
318 /*
319   traverse function for storing the in-memory index entries on disk
320  */
321 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
322 {
323         struct ldb_module *module = state;
324         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
325         struct ldb_dn *dn;
326         struct ldb_context *ldb = ldb_module_get_ctx(module);
327         struct ldb_val v;
328         struct dn_list *list;
329
330         list = ltdb_index_idxptr(module, data, true);
331         if (list == NULL) {
332                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
333                 return -1;
334         }
335
336         v.data = key.dptr;
337         v.length = strnlen((char *)key.dptr, key.dsize);
338
339         dn = ldb_dn_from_ldb_val(module, ldb, &v);
340         if (dn == NULL) {
341                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
342                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
343                 return -1;
344         }
345
346         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
347         talloc_free(dn);
348         if (ltdb->idxptr->error != 0) {
349                 return -1;
350         }
351         return 0;
352 }
353
354 /* cleanup the idxptr mode when transaction commits */
355 int ltdb_index_transaction_commit(struct ldb_module *module)
356 {
357         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
358         int ret;
359
360         struct ldb_context *ldb = ldb_module_get_ctx(module);
361
362         ldb_reset_err_string(ldb);
363
364         if (ltdb->idxptr->itdb) {
365                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
366                 tdb_close(ltdb->idxptr->itdb);
367         }
368
369         ret = ltdb->idxptr->error;
370         if (ret != LDB_SUCCESS) {
371                 if (!ldb_errstring(ldb)) {
372                         ldb_set_errstring(ldb, ldb_strerror(ret));
373                 }
374                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
375         }
376
377         talloc_free(ltdb->idxptr);
378         ltdb->idxptr = NULL;
379         return ret;
380 }
381
382 /* cleanup the idxptr mode when transaction cancels */
383 int ltdb_index_transaction_cancel(struct ldb_module *module)
384 {
385         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
386         if (ltdb->idxptr && ltdb->idxptr->itdb) {
387                 tdb_close(ltdb->idxptr->itdb);
388         }
389         talloc_free(ltdb->idxptr);
390         ltdb->idxptr = NULL;
391         return LDB_SUCCESS;
392 }
393
394
395 /*
396   return the dn key to be used for an index
397   the caller is responsible for freeing
398 */
399 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
400                                      const char *attr, const struct ldb_val *value,
401                                      const struct ldb_schema_attribute **ap)
402 {
403         struct ldb_dn *ret;
404         struct ldb_val v;
405         const struct ldb_schema_attribute *a;
406         char *attr_folded;
407         int r;
408
409         attr_folded = ldb_attr_casefold(ldb, attr);
410         if (!attr_folded) {
411                 return NULL;
412         }
413
414         a = ldb_schema_attribute_by_name(ldb, attr);
415         if (ap) {
416                 *ap = a;
417         }
418         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
419         if (r != LDB_SUCCESS) {
420                 const char *errstr = ldb_errstring(ldb);
421                 /* canonicalisation can be refused. For example,
422                    a attribute that takes wildcards will refuse to canonicalise
423                    if the value contains a wildcard */
424                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
425                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
426                 talloc_free(attr_folded);
427                 return NULL;
428         }
429         if (ldb_should_b64_encode(ldb, &v)) {
430                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
431                 if (!vstr) {
432                         talloc_free(attr_folded);
433                         return NULL;
434                 }
435                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
436                 talloc_free(vstr);
437         } else {
438                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
439         }
440
441         if (v.data != value->data) {
442                 talloc_free(v.data);
443         }
444         talloc_free(attr_folded);
445
446         return ret;
447 }
448
449 /*
450   see if a attribute value is in the list of indexed attributes
451 */
452 static bool ltdb_is_indexed(struct ldb_module *module,
453                             struct ltdb_private *ltdb,
454                             const char *attr)
455 {
456         struct ldb_context *ldb = ldb_module_get_ctx(module);
457         unsigned int i;
458         struct ldb_message_element *el;
459
460         if (ldb->schema.index_handler_override) {
461                 const struct ldb_schema_attribute *a
462                         = ldb_schema_attribute_by_name(ldb, attr);
463
464                 if (a == NULL) {
465                         return false;
466                 }
467
468                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
469                         return true;
470                 } else {
471                         return false;
472                 }
473         }
474
475         if (!ltdb->cache->attribute_indexes) {
476                 return false;
477         }
478
479         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
480         if (el == NULL) {
481                 return false;
482         }
483
484         /* TODO: this is too expensive! At least use a binary search */
485         for (i=0; i<el->num_values; i++) {
486                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
487                         return true;
488                 }
489         }
490         return false;
491 }
492
493 /*
494   in the following logic functions, the return value is treated as
495   follows:
496
497      LDB_SUCCESS: we found some matching index values
498
499      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
500
501      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
502                                we'll need a full search
503  */
504
505 /*
506   return a list of dn's that might match a simple indexed search (an
507   equality search only)
508  */
509 static int ltdb_index_dn_simple(struct ldb_module *module,
510                                 struct ltdb_private *ltdb,
511                                 const struct ldb_parse_tree *tree,
512                                 struct dn_list *list)
513 {
514         struct ldb_context *ldb;
515         struct ldb_dn *dn;
516         int ret;
517
518         ldb = ldb_module_get_ctx(module);
519
520         list->count = 0;
521         list->dn = NULL;
522
523         /* if the attribute isn't in the list of indexed attributes then
524            this node needs a full search */
525         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
526                 return LDB_ERR_OPERATIONS_ERROR;
527         }
528
529         /* the attribute is indexed. Pull the list of DNs that match the
530            search criterion */
531         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
532         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
533
534         ret = ltdb_dn_list_load(module, dn, list);
535         talloc_free(dn);
536         return ret;
537 }
538
539
540 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
541
542 /*
543   return a list of dn's that might match a leaf indexed search
544  */
545 static int ltdb_index_dn_leaf(struct ldb_module *module,
546                               struct ltdb_private *ltdb,
547                               const struct ldb_parse_tree *tree,
548                               struct dn_list *list)
549 {
550         if (ltdb->disallow_dn_filter &&
551             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
552                 /* in AD mode we do not support "(dn=...)" search filters */
553                 list->dn = NULL;
554                 list->count = 0;
555                 return LDB_SUCCESS;
556         }
557         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
558                 list->dn = talloc_array(list, struct ldb_val, 1);
559                 if (list->dn == NULL) {
560                         ldb_module_oom(module);
561                         return LDB_ERR_OPERATIONS_ERROR;
562                 }
563                 list->dn[0] = tree->u.equality.value;
564                 list->count = 1;
565                 return LDB_SUCCESS;
566         }
567         return ltdb_index_dn_simple(module, ltdb, tree, list);
568 }
569
570
571 /*
572   list intersection
573   list = list & list2
574 */
575 static bool list_intersect(struct ldb_context *ldb,
576                            struct dn_list *list, const struct dn_list *list2)
577 {
578         struct dn_list *list3;
579         unsigned int i;
580
581         if (list->count == 0) {
582                 /* 0 & X == 0 */
583                 return true;
584         }
585         if (list2->count == 0) {
586                 /* X & 0 == 0 */
587                 list->count = 0;
588                 list->dn = NULL;
589                 return true;
590         }
591
592         /* the indexing code is allowed to return a longer list than
593            what really matches, as all results are filtered by the
594            full expression at the end - this shortcut avoids a lot of
595            work in some cases */
596         if (list->count < 2 && list2->count > 10) {
597                 return true;
598         }
599         if (list2->count < 2 && list->count > 10) {
600                 list->count = list2->count;
601                 list->dn = list2->dn;
602                 /* note that list2 may not be the parent of list2->dn,
603                    as list2->dn may be owned by ltdb->idxptr. In that
604                    case we expect this reparent call to fail, which is
605                    OK */
606                 talloc_reparent(list2, list, list2->dn);
607                 return true;
608         }
609
610         list3 = talloc_zero(list, struct dn_list);
611         if (list3 == NULL) {
612                 return false;
613         }
614
615         list3->dn = talloc_array(list3, struct ldb_val, list->count);
616         if (!list3->dn) {
617                 talloc_free(list3);
618                 return false;
619         }
620         list3->count = 0;
621
622         for (i=0;i<list->count;i++) {
623                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
624                         list3->dn[list3->count] = list->dn[i];
625                         list3->count++;
626                 }
627         }
628
629         list->dn = talloc_steal(list, list3->dn);
630         list->count = list3->count;
631         talloc_free(list3);
632
633         return true;
634 }
635
636
637 /*
638   list union
639   list = list | list2
640 */
641 static bool list_union(struct ldb_context *ldb,
642                        struct dn_list *list, const struct dn_list *list2)
643 {
644         struct ldb_val *dn3;
645
646         if (list2->count == 0) {
647                 /* X | 0 == X */
648                 return true;
649         }
650
651         if (list->count == 0) {
652                 /* 0 | X == X */
653                 list->count = list2->count;
654                 list->dn = list2->dn;
655                 /* note that list2 may not be the parent of list2->dn,
656                    as list2->dn may be owned by ltdb->idxptr. In that
657                    case we expect this reparent call to fail, which is
658                    OK */
659                 talloc_reparent(list2, list, list2->dn);
660                 return true;
661         }
662
663         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
664         if (!dn3) {
665                 ldb_oom(ldb);
666                 return false;
667         }
668
669         /* we allow for duplicates here, and get rid of them later */
670         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
671         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
672
673         list->dn = dn3;
674         list->count += list2->count;
675
676         return true;
677 }
678
679 static int ltdb_index_dn(struct ldb_module *module,
680                          struct ltdb_private *ltdb,
681                          const struct ldb_parse_tree *tree,
682                          struct dn_list *list);
683
684
685 /*
686   process an OR list (a union)
687  */
688 static int ltdb_index_dn_or(struct ldb_module *module,
689                             struct ltdb_private *ltdb,
690                             const struct ldb_parse_tree *tree,
691                             struct dn_list *list)
692 {
693         struct ldb_context *ldb;
694         unsigned int i;
695
696         ldb = ldb_module_get_ctx(module);
697
698         list->dn = NULL;
699         list->count = 0;
700
701         for (i=0; i<tree->u.list.num_elements; i++) {
702                 struct dn_list *list2;
703                 int ret;
704
705                 list2 = talloc_zero(list, struct dn_list);
706                 if (list2 == NULL) {
707                         return LDB_ERR_OPERATIONS_ERROR;
708                 }
709
710                 ret = ltdb_index_dn(module, ltdb,
711                                     tree->u.list.elements[i], list2);
712
713                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
714                         /* X || 0 == X */
715                         talloc_free(list2);
716                         continue;
717                 }
718
719                 if (ret != LDB_SUCCESS) {
720                         /* X || * == * */
721                         talloc_free(list2);
722                         return ret;
723                 }
724
725                 if (!list_union(ldb, list, list2)) {
726                         talloc_free(list2);
727                         return LDB_ERR_OPERATIONS_ERROR;
728                 }
729         }
730
731         if (list->count == 0) {
732                 return LDB_ERR_NO_SUCH_OBJECT;
733         }
734
735         return LDB_SUCCESS;
736 }
737
738
739 /*
740   NOT an index results
741  */
742 static int ltdb_index_dn_not(struct ldb_module *module,
743                              struct ltdb_private *ltdb,
744                              const struct ldb_parse_tree *tree,
745                              struct dn_list *list)
746 {
747         /* the only way to do an indexed not would be if we could
748            negate the not via another not or if we knew the total
749            number of database elements so we could know that the
750            existing expression covered the whole database.
751
752            instead, we just give up, and rely on a full index scan
753            (unless an outer & manages to reduce the list)
754         */
755         return LDB_ERR_OPERATIONS_ERROR;
756 }
757
758
759 static bool ltdb_index_unique(struct ldb_context *ldb,
760                               const char *attr)
761 {
762         const struct ldb_schema_attribute *a;
763         a = ldb_schema_attribute_by_name(ldb, attr);
764         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
765                 return true;
766         }
767         return false;
768 }
769
770 /*
771   process an AND expression (intersection)
772  */
773 static int ltdb_index_dn_and(struct ldb_module *module,
774                              struct ltdb_private *ltdb,
775                              const struct ldb_parse_tree *tree,
776                              struct dn_list *list)
777 {
778         struct ldb_context *ldb;
779         unsigned int i;
780         bool found;
781
782         ldb = ldb_module_get_ctx(module);
783
784         list->dn = NULL;
785         list->count = 0;
786
787         /* in the first pass we only look for unique simple
788            equality tests, in the hope of avoiding having to look
789            at any others */
790         for (i=0; i<tree->u.list.num_elements; i++) {
791                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
792                 int ret;
793
794                 if (subtree->operation != LDB_OP_EQUALITY ||
795                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
796                         continue;
797                 }
798
799                 ret = ltdb_index_dn(module, ltdb, subtree, list);
800                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
801                         /* 0 && X == 0 */
802                         return LDB_ERR_NO_SUCH_OBJECT;
803                 }
804                 if (ret == LDB_SUCCESS) {
805                         /* a unique index match means we can
806                          * stop. Note that we don't care if we return
807                          * a few too many objects, due to later
808                          * filtering */
809                         return LDB_SUCCESS;
810                 }
811         }
812
813         /* now do a full intersection */
814         found = false;
815
816         for (i=0; i<tree->u.list.num_elements; i++) {
817                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
818                 struct dn_list *list2;
819                 int ret;
820
821                 list2 = talloc_zero(list, struct dn_list);
822                 if (list2 == NULL) {
823                         return ldb_module_oom(module);
824                 }
825
826                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
827
828                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
829                         /* X && 0 == 0 */
830                         list->dn = NULL;
831                         list->count = 0;
832                         talloc_free(list2);
833                         return LDB_ERR_NO_SUCH_OBJECT;
834                 }
835
836                 if (ret != LDB_SUCCESS) {
837                         /* this didn't adding anything */
838                         talloc_free(list2);
839                         continue;
840                 }
841
842                 if (!found) {
843                         talloc_reparent(list2, list, list->dn);
844                         list->dn = list2->dn;
845                         list->count = list2->count;
846                         found = true;
847                 } else if (!list_intersect(ldb, list, list2)) {
848                         talloc_free(list2);
849                         return LDB_ERR_OPERATIONS_ERROR;
850                 }
851
852                 if (list->count == 0) {
853                         list->dn = NULL;
854                         return LDB_ERR_NO_SUCH_OBJECT;
855                 }
856
857                 if (list->count < 2) {
858                         /* it isn't worth loading the next part of the tree */
859                         return LDB_SUCCESS;
860                 }
861         }
862
863         if (!found) {
864                 /* none of the attributes were indexed */
865                 return LDB_ERR_OPERATIONS_ERROR;
866         }
867
868         return LDB_SUCCESS;
869 }
870
871 /*
872   return a list of matching objects using a one-level index
873  */
874 static int ltdb_index_dn_one(struct ldb_module *module,
875                              struct ldb_dn *parent_dn,
876                              struct dn_list *list)
877 {
878         struct ldb_context *ldb;
879         struct ldb_dn *key;
880         struct ldb_val val;
881         int ret;
882
883         ldb = ldb_module_get_ctx(module);
884
885         /* work out the index key from the parent DN */
886         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
887         val.length = strlen((char *)val.data);
888         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
889         if (!key) {
890                 ldb_oom(ldb);
891                 return LDB_ERR_OPERATIONS_ERROR;
892         }
893
894         ret = ltdb_dn_list_load(module, key, list);
895         talloc_free(key);
896         if (ret != LDB_SUCCESS) {
897                 return ret;
898         }
899
900         if (list->count == 0) {
901                 return LDB_ERR_NO_SUCH_OBJECT;
902         }
903
904         return LDB_SUCCESS;
905 }
906
907 /*
908   return a list of dn's that might match a indexed search or
909   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
910  */
911 static int ltdb_index_dn(struct ldb_module *module,
912                          struct ltdb_private *ltdb,
913                          const struct ldb_parse_tree *tree,
914                          struct dn_list *list)
915 {
916         int ret = LDB_ERR_OPERATIONS_ERROR;
917
918         switch (tree->operation) {
919         case LDB_OP_AND:
920                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
921                 break;
922
923         case LDB_OP_OR:
924                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
925                 break;
926
927         case LDB_OP_NOT:
928                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
929                 break;
930
931         case LDB_OP_EQUALITY:
932                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
933                 break;
934
935         case LDB_OP_SUBSTRING:
936         case LDB_OP_GREATER:
937         case LDB_OP_LESS:
938         case LDB_OP_PRESENT:
939         case LDB_OP_APPROX:
940         case LDB_OP_EXTENDED:
941                 /* we can't index with fancy bitops yet */
942                 ret = LDB_ERR_OPERATIONS_ERROR;
943                 break;
944         }
945
946         return ret;
947 }
948
949 /*
950   filter a candidate dn_list from an indexed search into a set of results
951   extracting just the given attributes
952 */
953 static int ltdb_index_filter(const struct dn_list *dn_list,
954                              struct ltdb_context *ac,
955                              uint32_t *match_count)
956 {
957         struct ldb_context *ldb;
958         struct ldb_message *msg;
959         struct ldb_message *filtered_msg;
960         unsigned int i;
961
962         ldb = ldb_module_get_ctx(ac->module);
963
964         for (i = 0; i < dn_list->count; i++) {
965                 struct ldb_dn *dn;
966                 int ret;
967                 bool matched;
968
969                 msg = ldb_msg_new(ac);
970                 if (!msg) {
971                         return LDB_ERR_OPERATIONS_ERROR;
972                 }
973
974                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
975                 if (dn == NULL) {
976                         talloc_free(msg);
977                         return LDB_ERR_OPERATIONS_ERROR;
978                 }
979
980                 ret = ltdb_search_dn1(ac->module, dn, msg,
981                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
982                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
983                 talloc_free(dn);
984                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
985                         /* the record has disappeared? yes, this can happen */
986                         talloc_free(msg);
987                         continue;
988                 }
989
990                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
991                         /* an internal error */
992                         talloc_free(msg);
993                         return LDB_ERR_OPERATIONS_ERROR;
994                 }
995
996                 ret = ldb_match_msg_error(ldb, msg,
997                                           ac->tree, ac->base, ac->scope, &matched);
998                 if (ret != LDB_SUCCESS) {
999                         talloc_free(msg);
1000                         return ret;
1001                 }
1002                 if (!matched) {
1003                         talloc_free(msg);
1004                         continue;
1005                 }
1006
1007                 /* filter the attributes that the user wants */
1008                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1009
1010                 talloc_free(msg);
1011
1012                 if (ret == -1) {
1013                         return LDB_ERR_OPERATIONS_ERROR;
1014                 }
1015
1016                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1017                 if (ret != LDB_SUCCESS) {
1018                         /* Regardless of success or failure, the msg
1019                          * is the callbacks responsiblity, and should
1020                          * not be talloc_free()'ed */
1021                         ac->request_terminated = true;
1022                         return ret;
1023                 }
1024
1025                 (*match_count)++;
1026         }
1027
1028         return LDB_SUCCESS;
1029 }
1030
1031 /*
1032   remove any duplicated entries in a indexed result
1033  */
1034 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1035 {
1036         unsigned int i, new_count;
1037
1038         if (list->count < 2) {
1039                 return;
1040         }
1041
1042         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
1043
1044         new_count = 1;
1045         for (i=1; i<list->count; i++) {
1046                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
1047                         if (new_count != i) {
1048                                 list->dn[new_count] = list->dn[i];
1049                         }
1050                         new_count++;
1051                 }
1052         }
1053
1054         list->count = new_count;
1055 }
1056
1057 /*
1058   search the database with a LDAP-like expression using indexes
1059   returns -1 if an indexed search is not possible, in which
1060   case the caller should call ltdb_search_full()
1061 */
1062 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1063 {
1064         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1065         struct dn_list *dn_list;
1066         int ret;
1067
1068         /* see if indexing is enabled */
1069         if (!ltdb->cache->attribute_indexes &&
1070             !ltdb->cache->one_level_indexes &&
1071             ac->scope != LDB_SCOPE_BASE) {
1072                 /* fallback to a full search */
1073                 return LDB_ERR_OPERATIONS_ERROR;
1074         }
1075
1076         dn_list = talloc_zero(ac, struct dn_list);
1077         if (dn_list == NULL) {
1078                 return ldb_module_oom(ac->module);
1079         }
1080
1081         switch (ac->scope) {
1082         case LDB_SCOPE_BASE:
1083                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1084                 if (dn_list->dn == NULL) {
1085                         talloc_free(dn_list);
1086                         return ldb_module_oom(ac->module);
1087                 }
1088                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1089                 if (dn_list->dn[0].data == NULL) {
1090                         talloc_free(dn_list);
1091                         return ldb_module_oom(ac->module);
1092                 }
1093                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1094                 dn_list->count = 1;
1095                 break;
1096
1097         case LDB_SCOPE_ONELEVEL:
1098                 if (!ltdb->cache->one_level_indexes) {
1099                         talloc_free(dn_list);
1100                         return LDB_ERR_OPERATIONS_ERROR;
1101                 }
1102                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1103                 if (ret != LDB_SUCCESS) {
1104                         talloc_free(dn_list);
1105                         return ret;
1106                 }
1107                 break;
1108
1109         case LDB_SCOPE_SUBTREE:
1110         case LDB_SCOPE_DEFAULT:
1111                 if (!ltdb->cache->attribute_indexes) {
1112                         talloc_free(dn_list);
1113                         return LDB_ERR_OPERATIONS_ERROR;
1114                 }
1115                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1116                 if (ret != LDB_SUCCESS) {
1117                         talloc_free(dn_list);
1118                         return ret;
1119                 }
1120                 ltdb_dn_list_remove_duplicates(dn_list);
1121                 break;
1122         }
1123
1124         ret = ltdb_index_filter(dn_list, ac, match_count);
1125         talloc_free(dn_list);
1126         return ret;
1127 }
1128
1129 /**
1130  * @brief Add a DN in the index list of a given attribute name/value pair
1131  *
1132  * This function will add the DN in the index list for the index for
1133  * the given attribute name and value.
1134  *
1135  * @param[in]  module       A ldb_module structure
1136  *
1137  * @param[in]  dn           The string representation of the DN as it
1138  *                          will be stored in the index entry
1139  *
1140  * @param[in]  el           A ldb_message_element array, one of the entry
1141  *                          referred by the v_idx is the attribute name and
1142  *                          value pair which will be used to construct the
1143  *                          index name
1144  *
1145  * @param[in]  v_idx        The index of element in the el array to use
1146  *
1147  * @return                  An ldb error code
1148  */
1149 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1150                            struct ldb_message_element *el, int v_idx,
1151                            bool is_new)
1152 {
1153         struct ldb_context *ldb;
1154         struct ldb_dn *dn_key;
1155         int ret;
1156         const struct ldb_schema_attribute *a;
1157         struct dn_list *list;
1158         unsigned alloc_len;
1159
1160         ldb = ldb_module_get_ctx(module);
1161
1162         list = talloc_zero(module, struct dn_list);
1163         if (list == NULL) {
1164                 return LDB_ERR_OPERATIONS_ERROR;
1165         }
1166
1167         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1168         if (!dn_key) {
1169                 talloc_free(list);
1170                 return LDB_ERR_OPERATIONS_ERROR;
1171         }
1172         talloc_steal(list, dn_key);
1173
1174         ret = ltdb_dn_list_load(module, dn_key, list);
1175         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1176                 talloc_free(list);
1177                 return ret;
1178         }
1179
1180         if (list->count > 0 &&
1181             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1182                 talloc_free(list);
1183                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1184                                        el->name, dn);
1185                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1186         }
1187
1188         /* If we are doing an ADD, then this can not already be in the index,
1189            as it was not already in the database, and this has already been
1190            checked because the store succeeded */
1191         if (! is_new) {
1192                 if (ltdb_dn_list_find_str(list, dn) != -1) {
1193                         talloc_free(list);
1194                         return LDB_SUCCESS;
1195                 }
1196         }
1197
1198         /* overallocate the list a bit, to reduce the number of
1199          * realloc trigered copies */
1200         alloc_len = ((list->count+1)+7) & ~7;
1201         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1202         if (list->dn == NULL) {
1203                 talloc_free(list);
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1207         list->dn[list->count].length = strlen(dn);
1208         list->count++;
1209
1210         ret = ltdb_dn_list_store(module, dn_key, list);
1211
1212         talloc_free(list);
1213
1214         return ret;
1215 }
1216
1217 /*
1218   add index entries for one elements in a message
1219  */
1220 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1221                              struct ldb_message_element *el, bool is_new)
1222 {
1223         unsigned int i;
1224         for (i = 0; i < el->num_values; i++) {
1225                 int ret = ltdb_index_add1(module, dn, el, i, is_new);
1226                 if (ret != LDB_SUCCESS) {
1227                         return ret;
1228                 }
1229         }
1230
1231         return LDB_SUCCESS;
1232 }
1233
1234 /*
1235   add index entries for all elements in a message
1236  */
1237 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1238                               struct ldb_message_element *elements,
1239                               unsigned int num_el,
1240                               bool is_new)
1241 {
1242         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1243         unsigned int i;
1244
1245         if (dn[0] == '@') {
1246                 return LDB_SUCCESS;
1247         }
1248
1249         if (!ltdb->cache->attribute_indexes) {
1250                 /* no indexed fields */
1251                 return LDB_SUCCESS;
1252         }
1253
1254         for (i = 0; i < num_el; i++) {
1255                 int ret;
1256                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1257                         continue;
1258                 }
1259                 ret = ltdb_index_add_el(module, dn, &elements[i], is_new);
1260                 if (ret != LDB_SUCCESS) {
1261                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1262                         ldb_asprintf_errstring(ldb,
1263                                                __location__ ": Failed to re-index %s in %s - %s",
1264                                                elements[i].name, dn, ldb_errstring(ldb));
1265                         return ret;
1266                 }
1267         }
1268
1269         return LDB_SUCCESS;
1270 }
1271
1272
1273 /*
1274   insert a one level index for a message
1275 */
1276 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1277 {
1278         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1279         struct ldb_message_element el;
1280         struct ldb_val val;
1281         struct ldb_dn *pdn;
1282         const char *dn;
1283         int ret;
1284
1285         /* We index for ONE Level only if requested */
1286         if (!ltdb->cache->one_level_indexes) {
1287                 return LDB_SUCCESS;
1288         }
1289
1290         pdn = ldb_dn_get_parent(module, msg->dn);
1291         if (pdn == NULL) {
1292                 return LDB_ERR_OPERATIONS_ERROR;
1293         }
1294
1295         dn = ldb_dn_get_linearized(msg->dn);
1296         if (dn == NULL) {
1297                 talloc_free(pdn);
1298                 return LDB_ERR_OPERATIONS_ERROR;
1299         }
1300
1301         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1302         if (val.data == NULL) {
1303                 talloc_free(pdn);
1304                 return LDB_ERR_OPERATIONS_ERROR;
1305         }
1306
1307         val.length = strlen((char *)val.data);
1308         el.name = LTDB_IDXONE;
1309         el.values = &val;
1310         el.num_values = 1;
1311
1312         if (add) {
1313                 ret = ltdb_index_add1(module, dn, &el, 0, add);
1314         } else { /* delete */
1315                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1316         }
1317
1318         talloc_free(pdn);
1319
1320         return ret;
1321 }
1322
1323 /*
1324   add the index entries for a new element in a record
1325   The caller guarantees that these element values are not yet indexed
1326 */
1327 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1328                            struct ldb_message_element *el)
1329 {
1330         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1331         if (ldb_dn_is_special(dn)) {
1332                 return LDB_SUCCESS;
1333         }
1334         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1335                 return LDB_SUCCESS;
1336         }
1337         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el, true);
1338 }
1339
1340 /*
1341   add the index entries for a new record
1342 */
1343 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1344 {
1345         const char *dn;
1346         int ret;
1347
1348         if (ldb_dn_is_special(msg->dn)) {
1349                 return LDB_SUCCESS;
1350         }
1351
1352         dn = ldb_dn_get_linearized(msg->dn);
1353         if (dn == NULL) {
1354                 return LDB_ERR_OPERATIONS_ERROR;
1355         }
1356
1357         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1358                                  true);
1359         if (ret != LDB_SUCCESS) {
1360                 return ret;
1361         }
1362
1363         return ltdb_index_onelevel(module, msg, 1);
1364 }
1365
1366
1367 /*
1368   delete an index entry for one message element
1369 */
1370 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1371                          struct ldb_message_element *el, unsigned int v_idx)
1372 {
1373         struct ldb_context *ldb;
1374         struct ldb_dn *dn_key;
1375         const char *dn_str;
1376         int ret, i;
1377         unsigned int j;
1378         struct dn_list *list;
1379
1380         ldb = ldb_module_get_ctx(module);
1381
1382         dn_str = ldb_dn_get_linearized(dn);
1383         if (dn_str == NULL) {
1384                 return LDB_ERR_OPERATIONS_ERROR;
1385         }
1386
1387         if (dn_str[0] == '@') {
1388                 return LDB_SUCCESS;
1389         }
1390
1391         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1392         if (!dn_key) {
1393                 return LDB_ERR_OPERATIONS_ERROR;
1394         }
1395
1396         list = talloc_zero(dn_key, struct dn_list);
1397         if (list == NULL) {
1398                 talloc_free(dn_key);
1399                 return LDB_ERR_OPERATIONS_ERROR;
1400         }
1401
1402         ret = ltdb_dn_list_load(module, dn_key, list);
1403         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1404                 /* it wasn't indexed. Did we have an earlier error? If we did then
1405                    its gone now */
1406                 talloc_free(dn_key);
1407                 return LDB_SUCCESS;
1408         }
1409
1410         if (ret != LDB_SUCCESS) {
1411                 talloc_free(dn_key);
1412                 return ret;
1413         }
1414
1415         i = ltdb_dn_list_find_str(list, dn_str);
1416         if (i == -1) {
1417                 /* nothing to delete */
1418                 talloc_free(dn_key);
1419                 return LDB_SUCCESS;
1420         }
1421
1422         j = (unsigned int) i;
1423         if (j != list->count - 1) {
1424                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1425         }
1426         list->count--;
1427         if (list->count == 0) {
1428                 talloc_free(list->dn);
1429                 list->dn = NULL;
1430         } else {
1431                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1432         }
1433
1434         ret = ltdb_dn_list_store(module, dn_key, list);
1435
1436         talloc_free(dn_key);
1437
1438         return ret;
1439 }
1440
1441 /*
1442   delete the index entries for a element
1443   return -1 on failure
1444 */
1445 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1446                            struct ldb_message_element *el)
1447 {
1448         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1449         const char *dn_str;
1450         int ret;
1451         unsigned int i;
1452
1453         if (!ltdb->cache->attribute_indexes) {
1454                 /* no indexed fields */
1455                 return LDB_SUCCESS;
1456         }
1457
1458         dn_str = ldb_dn_get_linearized(dn);
1459         if (dn_str == NULL) {
1460                 return LDB_ERR_OPERATIONS_ERROR;
1461         }
1462
1463         if (dn_str[0] == '@') {
1464                 return LDB_SUCCESS;
1465         }
1466
1467         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1468                 return LDB_SUCCESS;
1469         }
1470         for (i = 0; i < el->num_values; i++) {
1471                 ret = ltdb_index_del_value(module, dn, el, i);
1472                 if (ret != LDB_SUCCESS) {
1473                         return ret;
1474                 }
1475         }
1476
1477         return LDB_SUCCESS;
1478 }
1479
1480 /*
1481   delete the index entries for a record
1482   return -1 on failure
1483 */
1484 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1485 {
1486         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1487         int ret;
1488         unsigned int i;
1489
1490         if (ldb_dn_is_special(msg->dn)) {
1491                 return LDB_SUCCESS;
1492         }
1493
1494         ret = ltdb_index_onelevel(module, msg, 0);
1495         if (ret != LDB_SUCCESS) {
1496                 return ret;
1497         }
1498
1499         if (!ltdb->cache->attribute_indexes) {
1500                 /* no indexed fields */
1501                 return LDB_SUCCESS;
1502         }
1503
1504         for (i = 0; i < msg->num_elements; i++) {
1505                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1506                 if (ret != LDB_SUCCESS) {
1507                         return ret;
1508                 }
1509         }
1510
1511         return LDB_SUCCESS;
1512 }
1513
1514
1515 /*
1516   traversal function that deletes all @INDEX records
1517 */
1518 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1519 {
1520         struct ldb_module *module = state;
1521         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1522         const char *dnstr = "DN=" LTDB_INDEX ":";
1523         struct dn_list list;
1524         struct ldb_dn *dn;
1525         struct ldb_val v;
1526         int ret;
1527
1528         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1529                 return 0;
1530         }
1531         /* we need to put a empty list in the internal tdb for this
1532          * index entry */
1533         list.dn = NULL;
1534         list.count = 0;
1535
1536         /* the offset of 3 is to remove the DN= prefix. */
1537         v.data = key.dptr + 3;
1538         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1539
1540         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1541         ret = ltdb_dn_list_store(module, dn, &list);
1542         if (ret != LDB_SUCCESS) {
1543                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1544                                        "Unable to store null index for %s\n",
1545                                                 ldb_dn_get_linearized(dn));
1546                 talloc_free(dn);
1547                 return -1;
1548         }
1549         talloc_free(dn);
1550         return 0;
1551 }
1552
1553 struct ltdb_reindex_context {
1554         struct ldb_module *module;
1555         int error;
1556 };
1557
1558 /*
1559   traversal function that adds @INDEX records during a re index
1560 */
1561 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1562 {
1563         struct ldb_context *ldb;
1564         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1565         struct ldb_module *module = ctx->module;
1566         struct ldb_message *msg;
1567         unsigned int nb_elements_in_db;
1568         const struct ldb_val val = {
1569                 .data = data.dptr,
1570                 .length = data.dsize,
1571         };
1572         const char *dn = NULL;
1573         int ret;
1574         TDB_DATA key2;
1575
1576         ldb = ldb_module_get_ctx(module);
1577
1578         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1579             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1580                 return 0;
1581         }
1582
1583         msg = ldb_msg_new(module);
1584         if (msg == NULL) {
1585                 return -1;
1586         }
1587
1588         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1589                                                    msg,
1590                                                    NULL, 0,
1591                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1592                                                    &nb_elements_in_db);
1593         if (ret != 0) {
1594                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1595                                                 ldb_dn_get_linearized(msg->dn));
1596                 talloc_free(msg);
1597                 return -1;
1598         }
1599
1600         /* check if the DN key has changed, perhaps due to the
1601            case insensitivity of an element changing */
1602         key2 = ltdb_key(module, msg->dn);
1603         if (key2.dptr == NULL) {
1604                 /* probably a corrupt record ... darn */
1605                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1606                                                 ldb_dn_get_linearized(msg->dn));
1607                 talloc_free(msg);
1608                 return 0;
1609         }
1610         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1611                 tdb_delete(tdb, key);
1612                 tdb_store(tdb, key2, data, 0);
1613         }
1614         talloc_free(key2.dptr);
1615
1616         if (msg->dn == NULL) {
1617                 dn = (char *)key.dptr + 3;
1618         } else {
1619                 dn = ldb_dn_get_linearized(msg->dn);
1620         }
1621
1622         ret = ltdb_index_onelevel(module, msg, 1);
1623         if (ret != LDB_SUCCESS) {
1624                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1625                           "Adding special ONE LEVEL index failed (%s)!",
1626                                                 ldb_dn_get_linearized(msg->dn));
1627                 talloc_free(msg);
1628                 return -1;
1629         }
1630
1631         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1632                                  false);
1633
1634         if (ret != LDB_SUCCESS) {
1635                 ctx->error = ret;
1636                 talloc_free(msg);
1637                 return -1;
1638         }
1639
1640         talloc_free(msg);
1641
1642         return 0;
1643 }
1644
1645 /*
1646   force a complete reindex of the database
1647 */
1648 int ltdb_reindex(struct ldb_module *module)
1649 {
1650         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1651         int ret;
1652         struct ltdb_reindex_context ctx;
1653
1654         if (ltdb_cache_reload(module) != 0) {
1655                 return LDB_ERR_OPERATIONS_ERROR;
1656         }
1657
1658         /* first traverse the database deleting any @INDEX records by
1659          * putting NULL entries in the in-memory tdb
1660          */
1661         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1662         if (ret < 0) {
1663                 return LDB_ERR_OPERATIONS_ERROR;
1664         }
1665
1666         /* if we don't have indexes we have nothing todo */
1667         if (!ltdb->cache->attribute_indexes) {
1668                 return LDB_SUCCESS;
1669         }
1670
1671         ctx.module = module;
1672         ctx.error = 0;
1673
1674         /* now traverse adding any indexes for normal LDB records */
1675         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1676         if (ret < 0) {
1677                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1678                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1679                 return LDB_ERR_OPERATIONS_ERROR;
1680         }
1681
1682         if (ctx.error != LDB_SUCCESS) {
1683                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1684                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1685                 return ctx.error;
1686         }
1687
1688         return LDB_SUCCESS;
1689 }