ldb: Prepare for adding flags to ltdb_search_dn1() to control memory allocation
[garming/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         return LDB_SUCCESS;
58 }
59
60 /* compare two DN entries in a dn_list. Take account of possible
61  * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 {
64         if (v1->length > v2->length && v1->data[v2->length] != 0) {
65                 return -1;
66         }
67         if (v1->length < v2->length && v2->data[v1->length] != 0) {
68                 return 1;
69         }
70         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
71 }
72
73
74 /*
75   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
76   comparison with the dn returns -1 if not found
77  */
78 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
79 {
80         unsigned int i;
81         for (i=0; i<list->count; i++) {
82                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
83         }
84         return -1;
85 }
86
87 /*
88   find a entry in a dn_list. Uses a case sensitive comparison with the dn
89   returns -1 if not found
90  */
91 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
92 {
93         struct ldb_val v;
94         v.data = discard_const_p(unsigned char, dn);
95         v.length = strlen(dn);
96         return ltdb_dn_list_find_val(list, &v);
97 }
98
99 /*
100   this is effectively a cast function, but with lots of paranoia
101   checks and also copes with CPUs that are fussy about pointer
102   alignment
103  */
104 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
105 {
106         struct dn_list *list;
107         if (rec.dsize != sizeof(void *)) {
108                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
109                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
110                 return NULL;
111         }
112         /* note that we can't just use a cast here, as rec.dptr may
113            not be aligned sufficiently for a pointer. A cast would cause
114            platforms like some ARM CPUs to crash */
115         memcpy(&list, rec.dptr, sizeof(void *));
116         list = talloc_get_type(list, struct dn_list);
117         if (list == NULL) {
118                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
119                                        "Bad type '%s' for idxptr",
120                                        talloc_get_name(list));
121                 return NULL;
122         }
123         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
124                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
125                                        "Bad parent '%s' for idxptr",
126                                        talloc_get_name(talloc_parent(list->dn)));
127                 return NULL;
128         }
129         return list;
130 }
131
132 /*
133   return the @IDX list in an index entry for a dn as a
134   struct dn_list
135  */
136 static int ltdb_dn_list_load(struct ldb_module *module,
137                              struct ldb_dn *dn, struct dn_list *list)
138 {
139         struct ldb_message *msg;
140         int ret;
141         struct ldb_message_element *el;
142         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
143         TDB_DATA rec;
144         struct dn_list *list2;
145         TDB_DATA key;
146
147         list->dn = NULL;
148         list->count = 0;
149
150         /* see if we have any in-memory index entries */
151         if (ltdb->idxptr == NULL ||
152             ltdb->idxptr->itdb == NULL) {
153                 goto normal_index;
154         }
155
156         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
157         key.dsize = strlen((char *)key.dptr);
158
159         rec = tdb_fetch(ltdb->idxptr->itdb, key);
160         if (rec.dptr == NULL) {
161                 goto normal_index;
162         }
163
164         /* we've found an in-memory index entry */
165         list2 = ltdb_index_idxptr(module, rec, true);
166         if (list2 == NULL) {
167                 free(rec.dptr);
168                 return LDB_ERR_OPERATIONS_ERROR;
169         }
170         free(rec.dptr);
171
172         *list = *list2;
173         return LDB_SUCCESS;
174
175 normal_index:
176         msg = ldb_msg_new(list);
177         if (msg == NULL) {
178                 return LDB_ERR_OPERATIONS_ERROR;
179         }
180
181         ret = ltdb_search_dn1(module, dn, msg);
182         if (ret != LDB_SUCCESS) {
183                 talloc_free(msg);
184                 return ret;
185         }
186
187         /* TODO: check indexing version number */
188
189         el = ldb_msg_find_element(msg, LTDB_IDX);
190         if (!el) {
191                 talloc_free(msg);
192                 return LDB_SUCCESS;
193         }
194
195         /*
196          * we avoid copying the strings by stealing the list.  We have
197          * to steal msg onto el->values (which looks odd) because we
198          * asked for the memory to be allocated on msg, not on each
199          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
200          */
201         talloc_steal(el->values, msg);
202         list->dn = talloc_steal(list, el->values);
203         list->count = el->num_values;
204
205         /* We don't need msg->elements any more */
206         talloc_free(msg->elements);
207         return LDB_SUCCESS;
208 }
209
210
211 /*
212   save a dn_list into a full @IDX style record
213  */
214 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
215                                    struct dn_list *list)
216 {
217         struct ldb_message *msg;
218         int ret;
219
220         if (list->count == 0) {
221                 ret = ltdb_delete_noindex(module, dn);
222                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
223                         return LDB_SUCCESS;
224                 }
225                 return ret;
226         }
227
228         msg = ldb_msg_new(module);
229         if (!msg) {
230                 return ldb_module_oom(module);
231         }
232
233         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
234         if (ret != LDB_SUCCESS) {
235                 talloc_free(msg);
236                 return ldb_module_oom(module);
237         }
238
239         msg->dn = dn;
240         if (list->count > 0) {
241                 struct ldb_message_element *el;
242
243                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
244                 if (ret != LDB_SUCCESS) {
245                         talloc_free(msg);
246                         return ldb_module_oom(module);
247                 }
248                 el->values = list->dn;
249                 el->num_values = list->count;
250         }
251
252         ret = ltdb_store(module, msg, TDB_REPLACE);
253         talloc_free(msg);
254         return ret;
255 }
256
257 /*
258   save a dn_list into the database, in either @IDX or internal format
259  */
260 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
261                               struct dn_list *list)
262 {
263         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
264         TDB_DATA rec, key;
265         int ret;
266         struct dn_list *list2;
267
268         if (ltdb->idxptr == NULL) {
269                 return ltdb_dn_list_store_full(module, dn, list);
270         }
271
272         if (ltdb->idxptr->itdb == NULL) {
273                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
274                 if (ltdb->idxptr->itdb == NULL) {
275                         return LDB_ERR_OPERATIONS_ERROR;
276                 }
277         }
278
279         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
280         key.dsize = strlen((char *)key.dptr);
281
282         rec = tdb_fetch(ltdb->idxptr->itdb, key);
283         if (rec.dptr != NULL) {
284                 list2 = ltdb_index_idxptr(module, rec, false);
285                 if (list2 == NULL) {
286                         free(rec.dptr);
287                         return LDB_ERR_OPERATIONS_ERROR;
288                 }
289                 free(rec.dptr);
290                 list2->dn = talloc_steal(list2, list->dn);
291                 list2->count = list->count;
292                 return LDB_SUCCESS;
293         }
294
295         list2 = talloc(ltdb->idxptr, struct dn_list);
296         if (list2 == NULL) {
297                 return LDB_ERR_OPERATIONS_ERROR;
298         }
299         list2->dn = talloc_steal(list2, list->dn);
300         list2->count = list->count;
301
302         rec.dptr = (uint8_t *)&list2;
303         rec.dsize = sizeof(void *);
304
305         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
306         if (ret != 0) {
307                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
308         }
309         return LDB_SUCCESS;
310 }
311
312 /*
313   traverse function for storing the in-memory index entries on disk
314  */
315 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
316 {
317         struct ldb_module *module = state;
318         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
319         struct ldb_dn *dn;
320         struct ldb_context *ldb = ldb_module_get_ctx(module);
321         struct ldb_val v;
322         struct dn_list *list;
323
324         list = ltdb_index_idxptr(module, data, true);
325         if (list == NULL) {
326                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
327                 return -1;
328         }
329
330         v.data = key.dptr;
331         v.length = strnlen((char *)key.dptr, key.dsize);
332
333         dn = ldb_dn_from_ldb_val(module, ldb, &v);
334         if (dn == NULL) {
335                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
336                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
337                 return -1;
338         }
339
340         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
341         talloc_free(dn);
342         if (ltdb->idxptr->error != 0) {
343                 return -1;
344         }
345         return 0;
346 }
347
348 /* cleanup the idxptr mode when transaction commits */
349 int ltdb_index_transaction_commit(struct ldb_module *module)
350 {
351         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
352         int ret;
353
354         struct ldb_context *ldb = ldb_module_get_ctx(module);
355
356         ldb_reset_err_string(ldb);
357
358         if (ltdb->idxptr->itdb) {
359                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
360                 tdb_close(ltdb->idxptr->itdb);
361         }
362
363         ret = ltdb->idxptr->error;
364         if (ret != LDB_SUCCESS) {
365                 if (!ldb_errstring(ldb)) {
366                         ldb_set_errstring(ldb, ldb_strerror(ret));
367                 }
368                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
369         }
370
371         talloc_free(ltdb->idxptr);
372         ltdb->idxptr = NULL;
373         return ret;
374 }
375
376 /* cleanup the idxptr mode when transaction cancels */
377 int ltdb_index_transaction_cancel(struct ldb_module *module)
378 {
379         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
380         if (ltdb->idxptr && ltdb->idxptr->itdb) {
381                 tdb_close(ltdb->idxptr->itdb);
382         }
383         talloc_free(ltdb->idxptr);
384         ltdb->idxptr = NULL;
385         return LDB_SUCCESS;
386 }
387
388
389 /*
390   return the dn key to be used for an index
391   the caller is responsible for freeing
392 */
393 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
394                                      const char *attr, const struct ldb_val *value,
395                                      const struct ldb_schema_attribute **ap)
396 {
397         struct ldb_dn *ret;
398         struct ldb_val v;
399         const struct ldb_schema_attribute *a;
400         char *attr_folded;
401         int r;
402
403         attr_folded = ldb_attr_casefold(ldb, attr);
404         if (!attr_folded) {
405                 return NULL;
406         }
407
408         a = ldb_schema_attribute_by_name(ldb, attr);
409         if (ap) {
410                 *ap = a;
411         }
412         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
413         if (r != LDB_SUCCESS) {
414                 const char *errstr = ldb_errstring(ldb);
415                 /* canonicalisation can be refused. For example,
416                    a attribute that takes wildcards will refuse to canonicalise
417                    if the value contains a wildcard */
418                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
419                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
420                 talloc_free(attr_folded);
421                 return NULL;
422         }
423         if (ldb_should_b64_encode(ldb, &v)) {
424                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
425                 if (!vstr) {
426                         talloc_free(attr_folded);
427                         return NULL;
428                 }
429                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
430                 talloc_free(vstr);
431         } else {
432                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
433         }
434
435         if (v.data != value->data) {
436                 talloc_free(v.data);
437         }
438         talloc_free(attr_folded);
439
440         return ret;
441 }
442
443 /*
444   see if a attribute value is in the list of indexed attributes
445 */
446 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
447 {
448         unsigned int i;
449         struct ldb_message_element *el;
450
451         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
452         if (el == NULL) {
453                 return false;
454         }
455
456         /* TODO: this is too expensive! At least use a binary search */
457         for (i=0; i<el->num_values; i++) {
458                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
459                         return true;
460                 }
461         }
462         return false;
463 }
464
465 /*
466   in the following logic functions, the return value is treated as
467   follows:
468
469      LDB_SUCCESS: we found some matching index values
470
471      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
472
473      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
474                                we'll need a full search
475  */
476
477 /*
478   return a list of dn's that might match a simple indexed search (an
479   equality search only)
480  */
481 static int ltdb_index_dn_simple(struct ldb_module *module,
482                                 const struct ldb_parse_tree *tree,
483                                 const struct ldb_message *index_list,
484                                 struct dn_list *list)
485 {
486         struct ldb_context *ldb;
487         struct ldb_dn *dn;
488         int ret;
489
490         ldb = ldb_module_get_ctx(module);
491
492         list->count = 0;
493         list->dn = NULL;
494
495         /* if the attribute isn't in the list of indexed attributes then
496            this node needs a full search */
497         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
498                 return LDB_ERR_OPERATIONS_ERROR;
499         }
500
501         /* the attribute is indexed. Pull the list of DNs that match the
502            search criterion */
503         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
504         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
505
506         ret = ltdb_dn_list_load(module, dn, list);
507         talloc_free(dn);
508         return ret;
509 }
510
511
512 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
513
514 /*
515   return a list of dn's that might match a leaf indexed search
516  */
517 static int ltdb_index_dn_leaf(struct ldb_module *module,
518                               const struct ldb_parse_tree *tree,
519                               const struct ldb_message *index_list,
520                               struct dn_list *list)
521 {
522         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
523                                                     struct ltdb_private);
524         if (ltdb->disallow_dn_filter &&
525             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
526                 /* in AD mode we do not support "(dn=...)" search filters */
527                 list->dn = NULL;
528                 list->count = 0;
529                 return LDB_SUCCESS;
530         }
531         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
532                 list->dn = talloc_array(list, struct ldb_val, 1);
533                 if (list->dn == NULL) {
534                         ldb_module_oom(module);
535                         return LDB_ERR_OPERATIONS_ERROR;
536                 }
537                 list->dn[0] = tree->u.equality.value;
538                 list->count = 1;
539                 return LDB_SUCCESS;
540         }
541         return ltdb_index_dn_simple(module, tree, index_list, list);
542 }
543
544
545 /*
546   list intersection
547   list = list & list2
548 */
549 static bool list_intersect(struct ldb_context *ldb,
550                            struct dn_list *list, const struct dn_list *list2)
551 {
552         struct dn_list *list3;
553         unsigned int i;
554
555         if (list->count == 0) {
556                 /* 0 & X == 0 */
557                 return true;
558         }
559         if (list2->count == 0) {
560                 /* X & 0 == 0 */
561                 list->count = 0;
562                 list->dn = NULL;
563                 return true;
564         }
565
566         /* the indexing code is allowed to return a longer list than
567            what really matches, as all results are filtered by the
568            full expression at the end - this shortcut avoids a lot of
569            work in some cases */
570         if (list->count < 2 && list2->count > 10) {
571                 return true;
572         }
573         if (list2->count < 2 && list->count > 10) {
574                 list->count = list2->count;
575                 list->dn = list2->dn;
576                 /* note that list2 may not be the parent of list2->dn,
577                    as list2->dn may be owned by ltdb->idxptr. In that
578                    case we expect this reparent call to fail, which is
579                    OK */
580                 talloc_reparent(list2, list, list2->dn);
581                 return true;
582         }
583
584         list3 = talloc_zero(list, struct dn_list);
585         if (list3 == NULL) {
586                 return false;
587         }
588
589         list3->dn = talloc_array(list3, struct ldb_val, list->count);
590         if (!list3->dn) {
591                 talloc_free(list3);
592                 return false;
593         }
594         list3->count = 0;
595
596         for (i=0;i<list->count;i++) {
597                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
598                         list3->dn[list3->count] = list->dn[i];
599                         list3->count++;
600                 }
601         }
602
603         list->dn = talloc_steal(list, list3->dn);
604         list->count = list3->count;
605         talloc_free(list3);
606
607         return true;
608 }
609
610
611 /*
612   list union
613   list = list | list2
614 */
615 static bool list_union(struct ldb_context *ldb,
616                        struct dn_list *list, const struct dn_list *list2)
617 {
618         struct ldb_val *dn3;
619
620         if (list2->count == 0) {
621                 /* X | 0 == X */
622                 return true;
623         }
624
625         if (list->count == 0) {
626                 /* 0 | X == X */
627                 list->count = list2->count;
628                 list->dn = list2->dn;
629                 /* note that list2 may not be the parent of list2->dn,
630                    as list2->dn may be owned by ltdb->idxptr. In that
631                    case we expect this reparent call to fail, which is
632                    OK */
633                 talloc_reparent(list2, list, list2->dn);
634                 return true;
635         }
636
637         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
638         if (!dn3) {
639                 ldb_oom(ldb);
640                 return false;
641         }
642
643         /* we allow for duplicates here, and get rid of them later */
644         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
645         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
646
647         list->dn = dn3;
648         list->count += list2->count;
649
650         return true;
651 }
652
653 static int ltdb_index_dn(struct ldb_module *module,
654                          const struct ldb_parse_tree *tree,
655                          const struct ldb_message *index_list,
656                          struct dn_list *list);
657
658
659 /*
660   process an OR list (a union)
661  */
662 static int ltdb_index_dn_or(struct ldb_module *module,
663                             const struct ldb_parse_tree *tree,
664                             const struct ldb_message *index_list,
665                             struct dn_list *list)
666 {
667         struct ldb_context *ldb;
668         unsigned int i;
669
670         ldb = ldb_module_get_ctx(module);
671
672         list->dn = NULL;
673         list->count = 0;
674
675         for (i=0; i<tree->u.list.num_elements; i++) {
676                 struct dn_list *list2;
677                 int ret;
678
679                 list2 = talloc_zero(list, struct dn_list);
680                 if (list2 == NULL) {
681                         return LDB_ERR_OPERATIONS_ERROR;
682                 }
683
684                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
685
686                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
687                         /* X || 0 == X */
688                         talloc_free(list2);
689                         continue;
690                 }
691
692                 if (ret != LDB_SUCCESS) {
693                         /* X || * == * */
694                         talloc_free(list2);
695                         return ret;
696                 }
697
698                 if (!list_union(ldb, list, list2)) {
699                         talloc_free(list2);
700                         return LDB_ERR_OPERATIONS_ERROR;
701                 }
702         }
703
704         if (list->count == 0) {
705                 return LDB_ERR_NO_SUCH_OBJECT;
706         }
707
708         return LDB_SUCCESS;
709 }
710
711
712 /*
713   NOT an index results
714  */
715 static int ltdb_index_dn_not(struct ldb_module *module,
716                              const struct ldb_parse_tree *tree,
717                              const struct ldb_message *index_list,
718                              struct dn_list *list)
719 {
720         /* the only way to do an indexed not would be if we could
721            negate the not via another not or if we knew the total
722            number of database elements so we could know that the
723            existing expression covered the whole database.
724
725            instead, we just give up, and rely on a full index scan
726            (unless an outer & manages to reduce the list)
727         */
728         return LDB_ERR_OPERATIONS_ERROR;
729 }
730
731
732 static bool ltdb_index_unique(struct ldb_context *ldb,
733                               const char *attr)
734 {
735         const struct ldb_schema_attribute *a;
736         a = ldb_schema_attribute_by_name(ldb, attr);
737         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
738                 return true;
739         }
740         return false;
741 }
742
743 /*
744   process an AND expression (intersection)
745  */
746 static int ltdb_index_dn_and(struct ldb_module *module,
747                              const struct ldb_parse_tree *tree,
748                              const struct ldb_message *index_list,
749                              struct dn_list *list)
750 {
751         struct ldb_context *ldb;
752         unsigned int i;
753         bool found;
754
755         ldb = ldb_module_get_ctx(module);
756
757         list->dn = NULL;
758         list->count = 0;
759
760         /* in the first pass we only look for unique simple
761            equality tests, in the hope of avoiding having to look
762            at any others */
763         for (i=0; i<tree->u.list.num_elements; i++) {
764                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
765                 int ret;
766
767                 if (subtree->operation != LDB_OP_EQUALITY ||
768                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
769                         continue;
770                 }
771
772                 ret = ltdb_index_dn(module, subtree, index_list, list);
773                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
774                         /* 0 && X == 0 */
775                         return LDB_ERR_NO_SUCH_OBJECT;
776                 }
777                 if (ret == LDB_SUCCESS) {
778                         /* a unique index match means we can
779                          * stop. Note that we don't care if we return
780                          * a few too many objects, due to later
781                          * filtering */
782                         return LDB_SUCCESS;
783                 }
784         }
785
786         /* now do a full intersection */
787         found = false;
788
789         for (i=0; i<tree->u.list.num_elements; i++) {
790                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
791                 struct dn_list *list2;
792                 int ret;
793
794                 list2 = talloc_zero(list, struct dn_list);
795                 if (list2 == NULL) {
796                         return ldb_module_oom(module);
797                 }
798
799                 ret = ltdb_index_dn(module, subtree, index_list, list2);
800
801                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
802                         /* X && 0 == 0 */
803                         list->dn = NULL;
804                         list->count = 0;
805                         talloc_free(list2);
806                         return LDB_ERR_NO_SUCH_OBJECT;
807                 }
808
809                 if (ret != LDB_SUCCESS) {
810                         /* this didn't adding anything */
811                         talloc_free(list2);
812                         continue;
813                 }
814
815                 if (!found) {
816                         talloc_reparent(list2, list, list->dn);
817                         list->dn = list2->dn;
818                         list->count = list2->count;
819                         found = true;
820                 } else if (!list_intersect(ldb, list, list2)) {
821                         talloc_free(list2);
822                         return LDB_ERR_OPERATIONS_ERROR;
823                 }
824
825                 if (list->count == 0) {
826                         list->dn = NULL;
827                         return LDB_ERR_NO_SUCH_OBJECT;
828                 }
829
830                 if (list->count < 2) {
831                         /* it isn't worth loading the next part of the tree */
832                         return LDB_SUCCESS;
833                 }
834         }
835
836         if (!found) {
837                 /* none of the attributes were indexed */
838                 return LDB_ERR_OPERATIONS_ERROR;
839         }
840
841         return LDB_SUCCESS;
842 }
843
844 /*
845   return a list of matching objects using a one-level index
846  */
847 static int ltdb_index_dn_one(struct ldb_module *module,
848                              struct ldb_dn *parent_dn,
849                              struct dn_list *list)
850 {
851         struct ldb_context *ldb;
852         struct ldb_dn *key;
853         struct ldb_val val;
854         int ret;
855
856         ldb = ldb_module_get_ctx(module);
857
858         /* work out the index key from the parent DN */
859         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
860         val.length = strlen((char *)val.data);
861         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
862         if (!key) {
863                 ldb_oom(ldb);
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         ret = ltdb_dn_list_load(module, key, list);
868         talloc_free(key);
869         if (ret != LDB_SUCCESS) {
870                 return ret;
871         }
872
873         if (list->count == 0) {
874                 return LDB_ERR_NO_SUCH_OBJECT;
875         }
876
877         return LDB_SUCCESS;
878 }
879
880 /*
881   return a list of dn's that might match a indexed search or
882   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
883  */
884 static int ltdb_index_dn(struct ldb_module *module,
885                          const struct ldb_parse_tree *tree,
886                          const struct ldb_message *index_list,
887                          struct dn_list *list)
888 {
889         int ret = LDB_ERR_OPERATIONS_ERROR;
890
891         switch (tree->operation) {
892         case LDB_OP_AND:
893                 ret = ltdb_index_dn_and(module, tree, index_list, list);
894                 break;
895
896         case LDB_OP_OR:
897                 ret = ltdb_index_dn_or(module, tree, index_list, list);
898                 break;
899
900         case LDB_OP_NOT:
901                 ret = ltdb_index_dn_not(module, tree, index_list, list);
902                 break;
903
904         case LDB_OP_EQUALITY:
905                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
906                 break;
907
908         case LDB_OP_SUBSTRING:
909         case LDB_OP_GREATER:
910         case LDB_OP_LESS:
911         case LDB_OP_PRESENT:
912         case LDB_OP_APPROX:
913         case LDB_OP_EXTENDED:
914                 /* we can't index with fancy bitops yet */
915                 ret = LDB_ERR_OPERATIONS_ERROR;
916                 break;
917         }
918
919         return ret;
920 }
921
922 /*
923   filter a candidate dn_list from an indexed search into a set of results
924   extracting just the given attributes
925 */
926 static int ltdb_index_filter(const struct dn_list *dn_list,
927                              struct ltdb_context *ac,
928                              uint32_t *match_count)
929 {
930         struct ldb_context *ldb;
931         struct ldb_message *msg;
932         unsigned int i;
933
934         ldb = ldb_module_get_ctx(ac->module);
935
936         for (i = 0; i < dn_list->count; i++) {
937                 struct ldb_dn *dn;
938                 int ret;
939                 bool matched;
940
941                 msg = ldb_msg_new(ac);
942                 if (!msg) {
943                         return LDB_ERR_OPERATIONS_ERROR;
944                 }
945
946                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
947                 if (dn == NULL) {
948                         talloc_free(msg);
949                         return LDB_ERR_OPERATIONS_ERROR;
950                 }
951
952                 ret = ltdb_search_dn1(ac->module, dn, msg);
953                 talloc_free(dn);
954                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
955                         /* the record has disappeared? yes, this can happen */
956                         talloc_free(msg);
957                         continue;
958                 }
959
960                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
961                         /* an internal error */
962                         talloc_free(msg);
963                         return LDB_ERR_OPERATIONS_ERROR;
964                 }
965
966                 ret = ldb_match_msg_error(ldb, msg,
967                                           ac->tree, ac->base, ac->scope, &matched);
968                 if (ret != LDB_SUCCESS) {
969                         talloc_free(msg);
970                         return ret;
971                 }
972                 if (!matched) {
973                         talloc_free(msg);
974                         continue;
975                 }
976
977                 /* filter the attributes that the user wants */
978                 ret = ltdb_filter_attrs(msg, ac->attrs);
979
980                 if (ret == -1) {
981                         talloc_free(msg);
982                         return LDB_ERR_OPERATIONS_ERROR;
983                 }
984
985                 ret = ldb_module_send_entry(ac->req, msg, NULL);
986                 if (ret != LDB_SUCCESS) {
987                         /* Regardless of success or failure, the msg
988                          * is the callbacks responsiblity, and should
989                          * not be talloc_free()'ed */
990                         ac->request_terminated = true;
991                         return ret;
992                 }
993
994                 (*match_count)++;
995         }
996
997         return LDB_SUCCESS;
998 }
999
1000 /*
1001   remove any duplicated entries in a indexed result
1002  */
1003 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1004 {
1005         unsigned int i, new_count;
1006
1007         if (list->count < 2) {
1008                 return;
1009         }
1010
1011         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
1012
1013         new_count = 1;
1014         for (i=1; i<list->count; i++) {
1015                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
1016                         if (new_count != i) {
1017                                 list->dn[new_count] = list->dn[i];
1018                         }
1019                         new_count++;
1020                 }
1021         }
1022
1023         list->count = new_count;
1024 }
1025
1026 /*
1027   search the database with a LDAP-like expression using indexes
1028   returns -1 if an indexed search is not possible, in which
1029   case the caller should call ltdb_search_full()
1030 */
1031 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1032 {
1033         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1034         struct dn_list *dn_list;
1035         int ret;
1036
1037         /* see if indexing is enabled */
1038         if (!ltdb->cache->attribute_indexes &&
1039             !ltdb->cache->one_level_indexes &&
1040             ac->scope != LDB_SCOPE_BASE) {
1041                 /* fallback to a full search */
1042                 return LDB_ERR_OPERATIONS_ERROR;
1043         }
1044
1045         dn_list = talloc_zero(ac, struct dn_list);
1046         if (dn_list == NULL) {
1047                 return ldb_module_oom(ac->module);
1048         }
1049
1050         switch (ac->scope) {
1051         case LDB_SCOPE_BASE:
1052                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1053                 if (dn_list->dn == NULL) {
1054                         talloc_free(dn_list);
1055                         return ldb_module_oom(ac->module);
1056                 }
1057                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1058                 if (dn_list->dn[0].data == NULL) {
1059                         talloc_free(dn_list);
1060                         return ldb_module_oom(ac->module);
1061                 }
1062                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1063                 dn_list->count = 1;
1064                 break;
1065
1066         case LDB_SCOPE_ONELEVEL:
1067                 if (!ltdb->cache->one_level_indexes) {
1068                         talloc_free(dn_list);
1069                         return LDB_ERR_OPERATIONS_ERROR;
1070                 }
1071                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1072                 if (ret != LDB_SUCCESS) {
1073                         talloc_free(dn_list);
1074                         return ret;
1075                 }
1076                 break;
1077
1078         case LDB_SCOPE_SUBTREE:
1079         case LDB_SCOPE_DEFAULT:
1080                 if (!ltdb->cache->attribute_indexes) {
1081                         talloc_free(dn_list);
1082                         return LDB_ERR_OPERATIONS_ERROR;
1083                 }
1084                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1085                 if (ret != LDB_SUCCESS) {
1086                         talloc_free(dn_list);
1087                         return ret;
1088                 }
1089                 ltdb_dn_list_remove_duplicates(dn_list);
1090                 break;
1091         }
1092
1093         ret = ltdb_index_filter(dn_list, ac, match_count);
1094         talloc_free(dn_list);
1095         return ret;
1096 }
1097
1098 /**
1099  * @brief Add a DN in the index list of a given attribute name/value pair
1100  *
1101  * This function will add the DN in the index list for the index for
1102  * the given attribute name and value.
1103  *
1104  * @param[in]  module       A ldb_module structure
1105  *
1106  * @param[in]  dn           The string representation of the DN as it
1107  *                          will be stored in the index entry
1108  *
1109  * @param[in]  el           A ldb_message_element array, one of the entry
1110  *                          referred by the v_idx is the attribute name and
1111  *                          value pair which will be used to construct the
1112  *                          index name
1113  *
1114  * @param[in]  v_idx        The index of element in the el array to use
1115  *
1116  * @return                  An ldb error code
1117  */
1118 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1119                            struct ldb_message_element *el, int v_idx,
1120                            bool is_new)
1121 {
1122         struct ldb_context *ldb;
1123         struct ldb_dn *dn_key;
1124         int ret;
1125         const struct ldb_schema_attribute *a;
1126         struct dn_list *list;
1127         unsigned alloc_len;
1128
1129         ldb = ldb_module_get_ctx(module);
1130
1131         list = talloc_zero(module, struct dn_list);
1132         if (list == NULL) {
1133                 return LDB_ERR_OPERATIONS_ERROR;
1134         }
1135
1136         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1137         if (!dn_key) {
1138                 talloc_free(list);
1139                 return LDB_ERR_OPERATIONS_ERROR;
1140         }
1141         talloc_steal(list, dn_key);
1142
1143         ret = ltdb_dn_list_load(module, dn_key, list);
1144         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1145                 talloc_free(list);
1146                 return ret;
1147         }
1148
1149         if (list->count > 0 &&
1150             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1151                 talloc_free(list);
1152                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1153                                        el->name, dn);
1154                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1155         }
1156
1157         /* If we are doing an ADD, then this can not already be in the index,
1158            as it was not already in the database, and this has already been
1159            checked because the store succeeded */
1160         if (! is_new) {
1161                 if (ltdb_dn_list_find_str(list, dn) != -1) {
1162                         talloc_free(list);
1163                         return LDB_SUCCESS;
1164                 }
1165         }
1166
1167         /* overallocate the list a bit, to reduce the number of
1168          * realloc trigered copies */
1169         alloc_len = ((list->count+1)+7) & ~7;
1170         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1171         if (list->dn == NULL) {
1172                 talloc_free(list);
1173                 return LDB_ERR_OPERATIONS_ERROR;
1174         }
1175         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1176         list->dn[list->count].length = strlen(dn);
1177         list->count++;
1178
1179         ret = ltdb_dn_list_store(module, dn_key, list);
1180
1181         talloc_free(list);
1182
1183         return ret;
1184 }
1185
1186 /*
1187   add index entries for one elements in a message
1188  */
1189 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1190                              struct ldb_message_element *el, bool is_new)
1191 {
1192         unsigned int i;
1193         for (i = 0; i < el->num_values; i++) {
1194                 int ret = ltdb_index_add1(module, dn, el, i, is_new);
1195                 if (ret != LDB_SUCCESS) {
1196                         return ret;
1197                 }
1198         }
1199
1200         return LDB_SUCCESS;
1201 }
1202
1203 /*
1204   add index entries for all elements in a message
1205  */
1206 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1207                               struct ldb_message_element *elements, int num_el,
1208                               bool is_new)
1209 {
1210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1211         unsigned int i;
1212
1213         if (dn[0] == '@') {
1214                 return LDB_SUCCESS;
1215         }
1216
1217         if (ltdb->cache->indexlist->num_elements == 0) {
1218                 /* no indexed fields */
1219                 return LDB_SUCCESS;
1220         }
1221
1222         for (i = 0; i < num_el; i++) {
1223                 int ret;
1224                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1225                         continue;
1226                 }
1227                 ret = ltdb_index_add_el(module, dn, &elements[i], is_new);
1228                 if (ret != LDB_SUCCESS) {
1229                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1230                         ldb_asprintf_errstring(ldb,
1231                                                __location__ ": Failed to re-index %s in %s - %s",
1232                                                elements[i].name, dn, ldb_errstring(ldb));
1233                         return ret;
1234                 }
1235         }
1236
1237         return LDB_SUCCESS;
1238 }
1239
1240
1241 /*
1242   insert a one level index for a message
1243 */
1244 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1245 {
1246         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1247         struct ldb_message_element el;
1248         struct ldb_val val;
1249         struct ldb_dn *pdn;
1250         const char *dn;
1251         int ret;
1252
1253         /* We index for ONE Level only if requested */
1254         if (!ltdb->cache->one_level_indexes) {
1255                 return LDB_SUCCESS;
1256         }
1257
1258         pdn = ldb_dn_get_parent(module, msg->dn);
1259         if (pdn == NULL) {
1260                 return LDB_ERR_OPERATIONS_ERROR;
1261         }
1262
1263         dn = ldb_dn_get_linearized(msg->dn);
1264         if (dn == NULL) {
1265                 talloc_free(pdn);
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1270         if (val.data == NULL) {
1271                 talloc_free(pdn);
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274
1275         val.length = strlen((char *)val.data);
1276         el.name = LTDB_IDXONE;
1277         el.values = &val;
1278         el.num_values = 1;
1279
1280         if (add) {
1281                 ret = ltdb_index_add1(module, dn, &el, 0, add);
1282         } else { /* delete */
1283                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1284         }
1285
1286         talloc_free(pdn);
1287
1288         return ret;
1289 }
1290
1291 /*
1292   add the index entries for a new element in a record
1293   The caller guarantees that these element values are not yet indexed
1294 */
1295 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1296                            struct ldb_message_element *el)
1297 {
1298         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1299         if (ldb_dn_is_special(dn)) {
1300                 return LDB_SUCCESS;
1301         }
1302         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1303                 return LDB_SUCCESS;
1304         }
1305         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el, true);
1306 }
1307
1308 /*
1309   add the index entries for a new record
1310 */
1311 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1312 {
1313         const char *dn;
1314         int ret;
1315
1316         if (ldb_dn_is_special(msg->dn)) {
1317                 return LDB_SUCCESS;
1318         }
1319
1320         dn = ldb_dn_get_linearized(msg->dn);
1321         if (dn == NULL) {
1322                 return LDB_ERR_OPERATIONS_ERROR;
1323         }
1324
1325         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1326                                  true);
1327         if (ret != LDB_SUCCESS) {
1328                 return ret;
1329         }
1330
1331         return ltdb_index_onelevel(module, msg, 1);
1332 }
1333
1334
1335 /*
1336   delete an index entry for one message element
1337 */
1338 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1339                          struct ldb_message_element *el, unsigned int v_idx)
1340 {
1341         struct ldb_context *ldb;
1342         struct ldb_dn *dn_key;
1343         const char *dn_str;
1344         int ret, i;
1345         unsigned int j;
1346         struct dn_list *list;
1347
1348         ldb = ldb_module_get_ctx(module);
1349
1350         dn_str = ldb_dn_get_linearized(dn);
1351         if (dn_str == NULL) {
1352                 return LDB_ERR_OPERATIONS_ERROR;
1353         }
1354
1355         if (dn_str[0] == '@') {
1356                 return LDB_SUCCESS;
1357         }
1358
1359         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1360         if (!dn_key) {
1361                 return LDB_ERR_OPERATIONS_ERROR;
1362         }
1363
1364         list = talloc_zero(dn_key, struct dn_list);
1365         if (list == NULL) {
1366                 talloc_free(dn_key);
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         ret = ltdb_dn_list_load(module, dn_key, list);
1371         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1372                 /* it wasn't indexed. Did we have an earlier error? If we did then
1373                    its gone now */
1374                 talloc_free(dn_key);
1375                 return LDB_SUCCESS;
1376         }
1377
1378         if (ret != LDB_SUCCESS) {
1379                 talloc_free(dn_key);
1380                 return ret;
1381         }
1382
1383         i = ltdb_dn_list_find_str(list, dn_str);
1384         if (i == -1) {
1385                 /* nothing to delete */
1386                 talloc_free(dn_key);
1387                 return LDB_SUCCESS;
1388         }
1389
1390         j = (unsigned int) i;
1391         if (j != list->count - 1) {
1392                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1393         }
1394         list->count--;
1395         if (list->count == 0) {
1396                 talloc_free(list->dn);
1397                 list->dn = NULL;
1398         } else {
1399                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1400         }
1401
1402         ret = ltdb_dn_list_store(module, dn_key, list);
1403
1404         talloc_free(dn_key);
1405
1406         return ret;
1407 }
1408
1409 /*
1410   delete the index entries for a element
1411   return -1 on failure
1412 */
1413 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1414                            struct ldb_message_element *el)
1415 {
1416         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1417         const char *dn_str;
1418         int ret;
1419         unsigned int i;
1420
1421         if (!ltdb->cache->attribute_indexes) {
1422                 /* no indexed fields */
1423                 return LDB_SUCCESS;
1424         }
1425
1426         dn_str = ldb_dn_get_linearized(dn);
1427         if (dn_str == NULL) {
1428                 return LDB_ERR_OPERATIONS_ERROR;
1429         }
1430
1431         if (dn_str[0] == '@') {
1432                 return LDB_SUCCESS;
1433         }
1434
1435         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1436                 return LDB_SUCCESS;
1437         }
1438         for (i = 0; i < el->num_values; i++) {
1439                 ret = ltdb_index_del_value(module, dn, el, i);
1440                 if (ret != LDB_SUCCESS) {
1441                         return ret;
1442                 }
1443         }
1444
1445         return LDB_SUCCESS;
1446 }
1447
1448 /*
1449   delete the index entries for a record
1450   return -1 on failure
1451 */
1452 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1453 {
1454         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1455         int ret;
1456         unsigned int i;
1457
1458         if (ldb_dn_is_special(msg->dn)) {
1459                 return LDB_SUCCESS;
1460         }
1461
1462         ret = ltdb_index_onelevel(module, msg, 0);
1463         if (ret != LDB_SUCCESS) {
1464                 return ret;
1465         }
1466
1467         if (!ltdb->cache->attribute_indexes) {
1468                 /* no indexed fields */
1469                 return LDB_SUCCESS;
1470         }
1471
1472         for (i = 0; i < msg->num_elements; i++) {
1473                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1474                 if (ret != LDB_SUCCESS) {
1475                         return ret;
1476                 }
1477         }
1478
1479         return LDB_SUCCESS;
1480 }
1481
1482
1483 /*
1484   traversal function that deletes all @INDEX records
1485 */
1486 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1487 {
1488         struct ldb_module *module = state;
1489         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1490         const char *dnstr = "DN=" LTDB_INDEX ":";
1491         struct dn_list list;
1492         struct ldb_dn *dn;
1493         struct ldb_val v;
1494         int ret;
1495
1496         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1497                 return 0;
1498         }
1499         /* we need to put a empty list in the internal tdb for this
1500          * index entry */
1501         list.dn = NULL;
1502         list.count = 0;
1503
1504         /* the offset of 3 is to remove the DN= prefix. */
1505         v.data = key.dptr + 3;
1506         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1507
1508         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1509         ret = ltdb_dn_list_store(module, dn, &list);
1510         if (ret != LDB_SUCCESS) {
1511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1512                                        "Unable to store null index for %s\n",
1513                                                 ldb_dn_get_linearized(dn));
1514                 talloc_free(dn);
1515                 return -1;
1516         }
1517         talloc_free(dn);
1518         return 0;
1519 }
1520
1521 struct ltdb_reindex_context {
1522         struct ldb_module *module;
1523         int error;
1524 };
1525
1526 /*
1527   traversal function that adds @INDEX records during a re index
1528 */
1529 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1530 {
1531         struct ldb_context *ldb;
1532         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1533         struct ldb_module *module = ctx->module;
1534         struct ldb_message *msg;
1535         const char *dn = NULL;
1536         int ret;
1537         TDB_DATA key2;
1538
1539         ldb = ldb_module_get_ctx(module);
1540
1541         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1542             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1543                 return 0;
1544         }
1545
1546         msg = ldb_msg_new(module);
1547         if (msg == NULL) {
1548                 return -1;
1549         }
1550
1551         ret = ldb_unpack_data(ldb, (struct ldb_val *)&data, msg);
1552         if (ret != 0) {
1553                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1554                                                 ldb_dn_get_linearized(msg->dn));
1555                 talloc_free(msg);
1556                 return -1;
1557         }
1558
1559         /* check if the DN key has changed, perhaps due to the
1560            case insensitivity of an element changing */
1561         key2 = ltdb_key(module, msg->dn);
1562         if (key2.dptr == NULL) {
1563                 /* probably a corrupt record ... darn */
1564                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1565                                                 ldb_dn_get_linearized(msg->dn));
1566                 talloc_free(msg);
1567                 return 0;
1568         }
1569         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1570                 tdb_delete(tdb, key);
1571                 tdb_store(tdb, key2, data, 0);
1572         }
1573         talloc_free(key2.dptr);
1574
1575         if (msg->dn == NULL) {
1576                 dn = (char *)key.dptr + 3;
1577         } else {
1578                 dn = ldb_dn_get_linearized(msg->dn);
1579         }
1580
1581         ret = ltdb_index_onelevel(module, msg, 1);
1582         if (ret != LDB_SUCCESS) {
1583                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1584                           "Adding special ONE LEVEL index failed (%s)!",
1585                                                 ldb_dn_get_linearized(msg->dn));
1586                 talloc_free(msg);
1587                 return -1;
1588         }
1589
1590         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements,
1591                                  false);
1592
1593         if (ret != LDB_SUCCESS) {
1594                 ctx->error = ret;
1595                 talloc_free(msg);
1596                 return -1;
1597         }
1598
1599         talloc_free(msg);
1600
1601         return 0;
1602 }
1603
1604 /*
1605   force a complete reindex of the database
1606 */
1607 int ltdb_reindex(struct ldb_module *module)
1608 {
1609         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1610         int ret;
1611         struct ltdb_reindex_context ctx;
1612
1613         if (ltdb_cache_reload(module) != 0) {
1614                 return LDB_ERR_OPERATIONS_ERROR;
1615         }
1616
1617         /* first traverse the database deleting any @INDEX records by
1618          * putting NULL entries in the in-memory tdb
1619          */
1620         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1621         if (ret < 0) {
1622                 return LDB_ERR_OPERATIONS_ERROR;
1623         }
1624
1625         /* if we don't have indexes we have nothing todo */
1626         if (ltdb->cache->indexlist->num_elements == 0) {
1627                 return LDB_SUCCESS;
1628         }
1629
1630         ctx.module = module;
1631         ctx.error = 0;
1632
1633         /* now traverse adding any indexes for normal LDB records */
1634         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1635         if (ret < 0) {
1636                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1637                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         if (ctx.error != LDB_SUCCESS) {
1642                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1643                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1644                 return ctx.error;
1645         }
1646
1647         return LDB_SUCCESS;
1648 }