dc33217aea9f0f15643103dd7596b3ff3c095316
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 /* we put a @IDXVERSION attribute on index entries. This
48    allows us to tell if it was written by an older version
49 */
50 #define LTDB_INDEXING_VERSION 2
51
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
54 {
55         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57         if (ltdb->idxptr == NULL) {
58                 return ldb_oom(ldb_module_get_ctx(module));
59         }
60
61         return LDB_SUCCESS;
62 }
63
64 /*
65   see if two ldb_val structures contain exactly the same data
66   return -1 or 1 for a mismatch, 0 for match
67 */
68 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
69                                          const struct ldb_val *v2)
70 {
71         if (v1->length > v2->length) {
72                 return -1;
73         }
74         if (v1->length < v2->length) {
75                 return 1;
76         }
77         return memcmp(v1->data, v2->data, v1->length);
78 }
79
80
81 /*
82   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
83   binary-safe comparison for the 'dn' returns -1 if not found
84
85   This is therefore safe when the value is a GUID in the future
86  */
87 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
88                                  const struct dn_list *list,
89                                  const struct ldb_val *v)
90 {
91         unsigned int i;
92         for (i=0; i<list->count; i++) {
93                 if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
94                         return i;
95                 }
96         }
97         return -1;
98 }
99
100 /*
101   find a entry in a dn_list. Uses a case sensitive comparison with the dn
102   returns -1 if not found
103  */
104 static int ltdb_dn_list_find_str(struct ltdb_private *ltdb,
105                                  struct dn_list *list,
106                                  const char *dn)
107 {
108         struct ldb_val v;
109         v.data = discard_const_p(unsigned char, dn);
110         v.length = strlen(dn);
111         return ltdb_dn_list_find_val(ltdb, list, &v);
112 }
113
114 /*
115   this is effectively a cast function, but with lots of paranoia
116   checks and also copes with CPUs that are fussy about pointer
117   alignment
118  */
119 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
120 {
121         struct dn_list *list;
122         if (rec.dsize != sizeof(void *)) {
123                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
124                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
125                 return NULL;
126         }
127         /* note that we can't just use a cast here, as rec.dptr may
128            not be aligned sufficiently for a pointer. A cast would cause
129            platforms like some ARM CPUs to crash */
130         memcpy(&list, rec.dptr, sizeof(void *));
131         list = talloc_get_type(list, struct dn_list);
132         if (list == NULL) {
133                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
134                                        "Bad type '%s' for idxptr",
135                                        talloc_get_name(list));
136                 return NULL;
137         }
138         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
139                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
140                                        "Bad parent '%s' for idxptr",
141                                        talloc_get_name(talloc_parent(list->dn)));
142                 return NULL;
143         }
144         return list;
145 }
146
147 /*
148   return the @IDX list in an index entry for a dn as a
149   struct dn_list
150  */
151 static int ltdb_dn_list_load(struct ldb_module *module,
152                              struct ldb_dn *dn, struct dn_list *list)
153 {
154         struct ldb_message *msg;
155         int ret;
156         struct ldb_message_element *el;
157         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
158         TDB_DATA rec;
159         struct dn_list *list2;
160         TDB_DATA key;
161
162         list->dn = NULL;
163         list->count = 0;
164
165         /* see if we have any in-memory index entries */
166         if (ltdb->idxptr == NULL ||
167             ltdb->idxptr->itdb == NULL) {
168                 goto normal_index;
169         }
170
171         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
172         key.dsize = strlen((char *)key.dptr);
173
174         rec = tdb_fetch(ltdb->idxptr->itdb, key);
175         if (rec.dptr == NULL) {
176                 goto normal_index;
177         }
178
179         /* we've found an in-memory index entry */
180         list2 = ltdb_index_idxptr(module, rec, true);
181         if (list2 == NULL) {
182                 free(rec.dptr);
183                 return LDB_ERR_OPERATIONS_ERROR;
184         }
185         free(rec.dptr);
186
187         *list = *list2;
188         return LDB_SUCCESS;
189
190 normal_index:
191         msg = ldb_msg_new(list);
192         if (msg == NULL) {
193                 return LDB_ERR_OPERATIONS_ERROR;
194         }
195
196         ret = ltdb_search_dn1(module, dn, msg,
197                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
198                               |LDB_UNPACK_DATA_FLAG_NO_DN);
199         if (ret != LDB_SUCCESS) {
200                 talloc_free(msg);
201                 return ret;
202         }
203
204         /* TODO: check indexing version number */
205
206         el = ldb_msg_find_element(msg, LTDB_IDX);
207         if (!el) {
208                 talloc_free(msg);
209                 return LDB_SUCCESS;
210         }
211
212         /*
213          * we avoid copying the strings by stealing the list.  We have
214          * to steal msg onto el->values (which looks odd) because we
215          * asked for the memory to be allocated on msg, not on each
216          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
217          */
218         if (ltdb->cache->GUID_index_attribute == NULL) {
219                 talloc_steal(el->values, msg);
220                 list->dn = talloc_steal(list, el->values);
221                 list->count = el->num_values;
222         } else {
223                 unsigned int i;
224                 static const size_t GUID_val_size = 16;
225                 if (el->num_values != 1) {
226                         return LDB_ERR_OPERATIONS_ERROR;
227                 }
228
229                 if ((el->values[0].length % GUID_val_size) != 0) {
230                         return LDB_ERR_OPERATIONS_ERROR;
231                 }
232
233                 list->count = el->values[0].length / GUID_val_size;
234                 list->dn = talloc_array(list, struct ldb_val, list->count);
235
236                 /*
237                  * The actual data is on msg, due to
238                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
239                  */
240                 talloc_steal(list->dn, msg);
241                 for (i = 0; i < list->count; i++) {
242                         list->dn[i].data = &el->values[0].data[i * GUID_val_size];
243                         list->dn[i].length = GUID_val_size;
244                 }
245         }
246
247         /* We don't need msg->elements any more */
248         talloc_free(msg->elements);
249         return LDB_SUCCESS;
250 }
251
252
253 /*
254   save a dn_list into a full @IDX style record
255  */
256 static int ltdb_dn_list_store_full(struct ldb_module *module,
257                                    struct ltdb_private *ltdb,
258                                    struct ldb_dn *dn,
259                                    struct dn_list *list)
260 {
261         struct ldb_message *msg;
262         int ret;
263
264         if (list->count == 0) {
265                 ret = ltdb_delete_noindex(module, dn);
266                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
267                         return LDB_SUCCESS;
268                 }
269                 return ret;
270         }
271
272         msg = ldb_msg_new(module);
273         if (!msg) {
274                 return ldb_module_oom(module);
275         }
276
277         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
278         if (ret != LDB_SUCCESS) {
279                 talloc_free(msg);
280                 return ldb_module_oom(module);
281         }
282
283         msg->dn = dn;
284         if (list->count > 0) {
285                 struct ldb_message_element *el;
286
287                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
288                 if (ret != LDB_SUCCESS) {
289                         talloc_free(msg);
290                         return ldb_module_oom(module);
291                 }
292
293                 if (ltdb->cache->GUID_index_attribute == NULL) {
294                         el->values = list->dn;
295                         el->num_values = list->count;
296                 } else {
297                         struct ldb_val v;
298                         unsigned int i;
299                         el->values = talloc_array(msg,
300                                                   struct ldb_val, 1);
301                         if (el->values == NULL) {
302                                 talloc_free(msg);
303                                 return ldb_module_oom(module);
304                         }
305
306                         v.data = talloc_array_size(el->values,
307                                                    list->count,
308                                                    LTDB_GUID_SIZE);
309                         if (v.data == NULL) {
310                                 talloc_free(msg);
311                                 return ldb_module_oom(module);
312                         }
313
314                         v.length = talloc_get_size(v.data);
315
316                         for (i = 0; i < list->count; i++) {
317                                 if (list->dn[i].length !=
318                                     LTDB_GUID_SIZE) {
319                                         talloc_free(msg);
320                                         return ldb_module_operr(module);
321                                 }
322                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
323                                        list->dn[i].data,
324                                        LTDB_GUID_SIZE);
325                         }
326                         el->values[0] = v;
327                         el->num_values = 1;
328                 }
329         }
330
331         ret = ltdb_store(module, msg, TDB_REPLACE);
332         talloc_free(msg);
333         return ret;
334 }
335
336 /*
337   save a dn_list into the database, in either @IDX or internal format
338  */
339 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
340                               struct dn_list *list)
341 {
342         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
343         TDB_DATA rec, key;
344         int ret;
345         struct dn_list *list2;
346
347         if (ltdb->idxptr == NULL) {
348                 return ltdb_dn_list_store_full(module, ltdb,
349                                                dn, list);
350         }
351
352         if (ltdb->idxptr->itdb == NULL) {
353                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
354                 if (ltdb->idxptr->itdb == NULL) {
355                         return LDB_ERR_OPERATIONS_ERROR;
356                 }
357         }
358
359         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
360         key.dsize = strlen((char *)key.dptr);
361
362         rec = tdb_fetch(ltdb->idxptr->itdb, key);
363         if (rec.dptr != NULL) {
364                 list2 = ltdb_index_idxptr(module, rec, false);
365                 if (list2 == NULL) {
366                         free(rec.dptr);
367                         return LDB_ERR_OPERATIONS_ERROR;
368                 }
369                 free(rec.dptr);
370                 list2->dn = talloc_steal(list2, list->dn);
371                 list2->count = list->count;
372                 return LDB_SUCCESS;
373         }
374
375         list2 = talloc(ltdb->idxptr, struct dn_list);
376         if (list2 == NULL) {
377                 return LDB_ERR_OPERATIONS_ERROR;
378         }
379         list2->dn = talloc_steal(list2, list->dn);
380         list2->count = list->count;
381
382         rec.dptr = (uint8_t *)&list2;
383         rec.dsize = sizeof(void *);
384
385
386         /*
387          * This is not a store into the main DB, but into an in-memory
388          * TDB, so we don't need a guard on ltdb->read_only
389          */
390         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
391         if (ret != 0) {
392                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
393         }
394         return LDB_SUCCESS;
395 }
396
397 /*
398   traverse function for storing the in-memory index entries on disk
399  */
400 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
401 {
402         struct ldb_module *module = state;
403         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
404         struct ldb_dn *dn;
405         struct ldb_context *ldb = ldb_module_get_ctx(module);
406         struct ldb_val v;
407         struct dn_list *list;
408
409         list = ltdb_index_idxptr(module, data, true);
410         if (list == NULL) {
411                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
412                 return -1;
413         }
414
415         v.data = key.dptr;
416         v.length = strnlen((char *)key.dptr, key.dsize);
417
418         dn = ldb_dn_from_ldb_val(module, ldb, &v);
419         if (dn == NULL) {
420                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
421                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
422                 return -1;
423         }
424
425         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
426                                                       dn, list);
427         talloc_free(dn);
428         if (ltdb->idxptr->error != 0) {
429                 return -1;
430         }
431         return 0;
432 }
433
434 /* cleanup the idxptr mode when transaction commits */
435 int ltdb_index_transaction_commit(struct ldb_module *module)
436 {
437         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
438         int ret;
439
440         struct ldb_context *ldb = ldb_module_get_ctx(module);
441
442         ldb_reset_err_string(ldb);
443
444         if (ltdb->idxptr->itdb) {
445                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
446                 tdb_close(ltdb->idxptr->itdb);
447         }
448
449         ret = ltdb->idxptr->error;
450         if (ret != LDB_SUCCESS) {
451                 if (!ldb_errstring(ldb)) {
452                         ldb_set_errstring(ldb, ldb_strerror(ret));
453                 }
454                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
455         }
456
457         talloc_free(ltdb->idxptr);
458         ltdb->idxptr = NULL;
459         return ret;
460 }
461
462 /* cleanup the idxptr mode when transaction cancels */
463 int ltdb_index_transaction_cancel(struct ldb_module *module)
464 {
465         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
466         if (ltdb->idxptr && ltdb->idxptr->itdb) {
467                 tdb_close(ltdb->idxptr->itdb);
468         }
469         talloc_free(ltdb->idxptr);
470         ltdb->idxptr = NULL;
471         return LDB_SUCCESS;
472 }
473
474
475 /*
476   return the dn key to be used for an index
477   the caller is responsible for freeing
478 */
479 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
480                                      const char *attr, const struct ldb_val *value,
481                                      const struct ldb_schema_attribute **ap)
482 {
483         struct ldb_dn *ret;
484         struct ldb_val v;
485         const struct ldb_schema_attribute *a;
486         char *attr_folded;
487         int r;
488
489         attr_folded = ldb_attr_casefold(ldb, attr);
490         if (!attr_folded) {
491                 return NULL;
492         }
493
494         a = ldb_schema_attribute_by_name(ldb, attr);
495         if (ap) {
496                 *ap = a;
497         }
498         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
499         if (r != LDB_SUCCESS) {
500                 const char *errstr = ldb_errstring(ldb);
501                 /* canonicalisation can be refused. For example,
502                    a attribute that takes wildcards will refuse to canonicalise
503                    if the value contains a wildcard */
504                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
505                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
506                 talloc_free(attr_folded);
507                 return NULL;
508         }
509         if (ldb_should_b64_encode(ldb, &v)) {
510                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
511                 if (!vstr) {
512                         talloc_free(attr_folded);
513                         return NULL;
514                 }
515                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
516                 talloc_free(vstr);
517         } else {
518                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
519         }
520
521         if (v.data != value->data) {
522                 talloc_free(v.data);
523         }
524         talloc_free(attr_folded);
525
526         return ret;
527 }
528
529 /*
530   see if a attribute value is in the list of indexed attributes
531 */
532 static bool ltdb_is_indexed(struct ldb_module *module,
533                             struct ltdb_private *ltdb,
534                             const char *attr)
535 {
536         struct ldb_context *ldb = ldb_module_get_ctx(module);
537         unsigned int i;
538         struct ldb_message_element *el;
539
540         if (ldb->schema.index_handler_override) {
541                 const struct ldb_schema_attribute *a
542                         = ldb_schema_attribute_by_name(ldb, attr);
543
544                 if (a == NULL) {
545                         return false;
546                 }
547
548                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
549                         return true;
550                 } else {
551                         return false;
552                 }
553         }
554
555         if (!ltdb->cache->attribute_indexes) {
556                 return false;
557         }
558
559         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
560         if (el == NULL) {
561                 return false;
562         }
563
564         /* TODO: this is too expensive! At least use a binary search */
565         for (i=0; i<el->num_values; i++) {
566                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
567                         return true;
568                 }
569         }
570         return false;
571 }
572
573 /*
574   in the following logic functions, the return value is treated as
575   follows:
576
577      LDB_SUCCESS: we found some matching index values
578
579      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
580
581      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
582                                we'll need a full search
583  */
584
585 /*
586   return a list of dn's that might match a simple indexed search (an
587   equality search only)
588  */
589 static int ltdb_index_dn_simple(struct ldb_module *module,
590                                 struct ltdb_private *ltdb,
591                                 const struct ldb_parse_tree *tree,
592                                 struct dn_list *list)
593 {
594         struct ldb_context *ldb;
595         struct ldb_dn *dn;
596         int ret;
597
598         ldb = ldb_module_get_ctx(module);
599
600         list->count = 0;
601         list->dn = NULL;
602
603         /* if the attribute isn't in the list of indexed attributes then
604            this node needs a full search */
605         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
606                 return LDB_ERR_OPERATIONS_ERROR;
607         }
608
609         /* the attribute is indexed. Pull the list of DNs that match the
610            search criterion */
611         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
612         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
613
614         ret = ltdb_dn_list_load(module, dn, list);
615         talloc_free(dn);
616         return ret;
617 }
618
619
620 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
621
622 /*
623   return a list of dn's that might match a leaf indexed search
624  */
625 static int ltdb_index_dn_leaf(struct ldb_module *module,
626                               struct ltdb_private *ltdb,
627                               const struct ldb_parse_tree *tree,
628                               struct dn_list *list)
629 {
630         if (ltdb->disallow_dn_filter &&
631             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
632                 /* in AD mode we do not support "(dn=...)" search filters */
633                 list->dn = NULL;
634                 list->count = 0;
635                 return LDB_SUCCESS;
636         }
637         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
638                 list->dn = talloc_array(list, struct ldb_val, 1);
639                 if (list->dn == NULL) {
640                         ldb_module_oom(module);
641                         return LDB_ERR_OPERATIONS_ERROR;
642                 }
643                 list->dn[0] = tree->u.equality.value;
644                 list->count = 1;
645                 return LDB_SUCCESS;
646         }
647         return ltdb_index_dn_simple(module, ltdb, tree, list);
648 }
649
650
651 /*
652   list intersection
653   list = list & list2
654 */
655 static bool list_intersect(struct ldb_context *ldb,
656                            struct ltdb_private *ltdb,
657                            struct dn_list *list, const struct dn_list *list2)
658 {
659         struct dn_list *list3;
660         unsigned int i;
661
662         if (list->count == 0) {
663                 /* 0 & X == 0 */
664                 return true;
665         }
666         if (list2->count == 0) {
667                 /* X & 0 == 0 */
668                 list->count = 0;
669                 list->dn = NULL;
670                 return true;
671         }
672
673         /* the indexing code is allowed to return a longer list than
674            what really matches, as all results are filtered by the
675            full expression at the end - this shortcut avoids a lot of
676            work in some cases */
677         if (list->count < 2 && list2->count > 10) {
678                 return true;
679         }
680         if (list2->count < 2 && list->count > 10) {
681                 list->count = list2->count;
682                 list->dn = list2->dn;
683                 /* note that list2 may not be the parent of list2->dn,
684                    as list2->dn may be owned by ltdb->idxptr. In that
685                    case we expect this reparent call to fail, which is
686                    OK */
687                 talloc_reparent(list2, list, list2->dn);
688                 return true;
689         }
690
691         list3 = talloc_zero(list, struct dn_list);
692         if (list3 == NULL) {
693                 return false;
694         }
695
696         list3->dn = talloc_array(list3, struct ldb_val, list->count);
697         if (!list3->dn) {
698                 talloc_free(list3);
699                 return false;
700         }
701         list3->count = 0;
702
703         for (i=0;i<list->count;i++) {
704                 if (ltdb_dn_list_find_val(ltdb, list2,
705                                           &list->dn[i]) != -1) {
706                         list3->dn[list3->count] = list->dn[i];
707                         list3->count++;
708                 }
709         }
710
711         list->dn = talloc_steal(list, list3->dn);
712         list->count = list3->count;
713         talloc_free(list3);
714
715         return true;
716 }
717
718
719 /*
720   list union
721   list = list | list2
722 */
723 static bool list_union(struct ldb_context *ldb,
724                        struct dn_list *list, const struct dn_list *list2)
725 {
726         struct ldb_val *dn3;
727
728         if (list2->count == 0) {
729                 /* X | 0 == X */
730                 return true;
731         }
732
733         if (list->count == 0) {
734                 /* 0 | X == X */
735                 list->count = list2->count;
736                 list->dn = list2->dn;
737                 /* note that list2 may not be the parent of list2->dn,
738                    as list2->dn may be owned by ltdb->idxptr. In that
739                    case we expect this reparent call to fail, which is
740                    OK */
741                 talloc_reparent(list2, list, list2->dn);
742                 return true;
743         }
744
745         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
746         if (!dn3) {
747                 ldb_oom(ldb);
748                 return false;
749         }
750
751         /* we allow for duplicates here, and get rid of them later */
752         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
753         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
754
755         list->dn = dn3;
756         list->count += list2->count;
757
758         return true;
759 }
760
761 static int ltdb_index_dn(struct ldb_module *module,
762                          struct ltdb_private *ltdb,
763                          const struct ldb_parse_tree *tree,
764                          struct dn_list *list);
765
766
767 /*
768   process an OR list (a union)
769  */
770 static int ltdb_index_dn_or(struct ldb_module *module,
771                             struct ltdb_private *ltdb,
772                             const struct ldb_parse_tree *tree,
773                             struct dn_list *list)
774 {
775         struct ldb_context *ldb;
776         unsigned int i;
777
778         ldb = ldb_module_get_ctx(module);
779
780         list->dn = NULL;
781         list->count = 0;
782
783         for (i=0; i<tree->u.list.num_elements; i++) {
784                 struct dn_list *list2;
785                 int ret;
786
787                 list2 = talloc_zero(list, struct dn_list);
788                 if (list2 == NULL) {
789                         return LDB_ERR_OPERATIONS_ERROR;
790                 }
791
792                 ret = ltdb_index_dn(module, ltdb,
793                                     tree->u.list.elements[i], list2);
794
795                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
796                         /* X || 0 == X */
797                         talloc_free(list2);
798                         continue;
799                 }
800
801                 if (ret != LDB_SUCCESS) {
802                         /* X || * == * */
803                         talloc_free(list2);
804                         return ret;
805                 }
806
807                 if (!list_union(ldb, list, list2)) {
808                         talloc_free(list2);
809                         return LDB_ERR_OPERATIONS_ERROR;
810                 }
811         }
812
813         if (list->count == 0) {
814                 return LDB_ERR_NO_SUCH_OBJECT;
815         }
816
817         return LDB_SUCCESS;
818 }
819
820
821 /*
822   NOT an index results
823  */
824 static int ltdb_index_dn_not(struct ldb_module *module,
825                              struct ltdb_private *ltdb,
826                              const struct ldb_parse_tree *tree,
827                              struct dn_list *list)
828 {
829         /* the only way to do an indexed not would be if we could
830            negate the not via another not or if we knew the total
831            number of database elements so we could know that the
832            existing expression covered the whole database.
833
834            instead, we just give up, and rely on a full index scan
835            (unless an outer & manages to reduce the list)
836         */
837         return LDB_ERR_OPERATIONS_ERROR;
838 }
839
840
841 static bool ltdb_index_unique(struct ldb_context *ldb,
842                               const char *attr)
843 {
844         const struct ldb_schema_attribute *a;
845         a = ldb_schema_attribute_by_name(ldb, attr);
846         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
847                 return true;
848         }
849         return false;
850 }
851
852 /*
853   process an AND expression (intersection)
854  */
855 static int ltdb_index_dn_and(struct ldb_module *module,
856                              struct ltdb_private *ltdb,
857                              const struct ldb_parse_tree *tree,
858                              struct dn_list *list)
859 {
860         struct ldb_context *ldb;
861         unsigned int i;
862         bool found;
863
864         ldb = ldb_module_get_ctx(module);
865
866         list->dn = NULL;
867         list->count = 0;
868
869         /* in the first pass we only look for unique simple
870            equality tests, in the hope of avoiding having to look
871            at any others */
872         for (i=0; i<tree->u.list.num_elements; i++) {
873                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
874                 int ret;
875
876                 if (subtree->operation != LDB_OP_EQUALITY ||
877                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
878                         continue;
879                 }
880
881                 ret = ltdb_index_dn(module, ltdb, subtree, list);
882                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
883                         /* 0 && X == 0 */
884                         return LDB_ERR_NO_SUCH_OBJECT;
885                 }
886                 if (ret == LDB_SUCCESS) {
887                         /* a unique index match means we can
888                          * stop. Note that we don't care if we return
889                          * a few too many objects, due to later
890                          * filtering */
891                         return LDB_SUCCESS;
892                 }
893         }
894
895         /* now do a full intersection */
896         found = false;
897
898         for (i=0; i<tree->u.list.num_elements; i++) {
899                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
900                 struct dn_list *list2;
901                 int ret;
902
903                 list2 = talloc_zero(list, struct dn_list);
904                 if (list2 == NULL) {
905                         return ldb_module_oom(module);
906                 }
907
908                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
909
910                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
911                         /* X && 0 == 0 */
912                         list->dn = NULL;
913                         list->count = 0;
914                         talloc_free(list2);
915                         return LDB_ERR_NO_SUCH_OBJECT;
916                 }
917
918                 if (ret != LDB_SUCCESS) {
919                         /* this didn't adding anything */
920                         talloc_free(list2);
921                         continue;
922                 }
923
924                 if (!found) {
925                         talloc_reparent(list2, list, list->dn);
926                         list->dn = list2->dn;
927                         list->count = list2->count;
928                         found = true;
929                 } else if (!list_intersect(ldb, ltdb,
930                                            list, list2)) {
931                         talloc_free(list2);
932                         return LDB_ERR_OPERATIONS_ERROR;
933                 }
934
935                 if (list->count == 0) {
936                         list->dn = NULL;
937                         return LDB_ERR_NO_SUCH_OBJECT;
938                 }
939
940                 if (list->count < 2) {
941                         /* it isn't worth loading the next part of the tree */
942                         return LDB_SUCCESS;
943                 }
944         }
945
946         if (!found) {
947                 /* none of the attributes were indexed */
948                 return LDB_ERR_OPERATIONS_ERROR;
949         }
950
951         return LDB_SUCCESS;
952 }
953
954 /*
955   return a list of matching objects using a one-level index
956  */
957 static int ltdb_index_dn_one(struct ldb_module *module,
958                              struct ldb_dn *parent_dn,
959                              struct dn_list *list)
960 {
961         struct ldb_context *ldb;
962         struct ldb_dn *key;
963         struct ldb_val val;
964         int ret;
965
966         ldb = ldb_module_get_ctx(module);
967
968         /* work out the index key from the parent DN */
969         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
970         val.length = strlen((char *)val.data);
971         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
972         if (!key) {
973                 ldb_oom(ldb);
974                 return LDB_ERR_OPERATIONS_ERROR;
975         }
976
977         ret = ltdb_dn_list_load(module, key, list);
978         talloc_free(key);
979         if (ret != LDB_SUCCESS) {
980                 return ret;
981         }
982
983         if (list->count == 0) {
984                 return LDB_ERR_NO_SUCH_OBJECT;
985         }
986
987         return LDB_SUCCESS;
988 }
989
990 /*
991   return a list of dn's that might match a indexed search or
992   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
993  */
994 static int ltdb_index_dn(struct ldb_module *module,
995                          struct ltdb_private *ltdb,
996                          const struct ldb_parse_tree *tree,
997                          struct dn_list *list)
998 {
999         int ret = LDB_ERR_OPERATIONS_ERROR;
1000
1001         switch (tree->operation) {
1002         case LDB_OP_AND:
1003                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1004                 break;
1005
1006         case LDB_OP_OR:
1007                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1008                 break;
1009
1010         case LDB_OP_NOT:
1011                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1012                 break;
1013
1014         case LDB_OP_EQUALITY:
1015                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1016                 break;
1017
1018         case LDB_OP_SUBSTRING:
1019         case LDB_OP_GREATER:
1020         case LDB_OP_LESS:
1021         case LDB_OP_PRESENT:
1022         case LDB_OP_APPROX:
1023         case LDB_OP_EXTENDED:
1024                 /* we can't index with fancy bitops yet */
1025                 ret = LDB_ERR_OPERATIONS_ERROR;
1026                 break;
1027         }
1028
1029         return ret;
1030 }
1031
1032 /*
1033   filter a candidate dn_list from an indexed search into a set of results
1034   extracting just the given attributes
1035 */
1036 static int ltdb_index_filter(struct ltdb_private *ltdb,
1037                              const struct dn_list *dn_list,
1038                              struct ltdb_context *ac,
1039                              uint32_t *match_count)
1040 {
1041         struct ldb_context *ldb;
1042         struct ldb_message *msg;
1043         struct ldb_message *filtered_msg;
1044         unsigned int i;
1045
1046         ldb = ldb_module_get_ctx(ac->module);
1047
1048         for (i = 0; i < dn_list->count; i++) {
1049                 struct ldb_dn *dn;
1050                 int ret;
1051                 bool matched;
1052
1053                 msg = ldb_msg_new(ac);
1054                 if (!msg) {
1055                         return LDB_ERR_OPERATIONS_ERROR;
1056                 }
1057
1058                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
1059                 if (dn == NULL) {
1060                         talloc_free(msg);
1061                         return LDB_ERR_OPERATIONS_ERROR;
1062                 }
1063
1064                 ret = ltdb_search_dn1(ac->module, dn, msg,
1065                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1066                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1067                 talloc_free(dn);
1068                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1069                         /* the record has disappeared? yes, this can happen */
1070                         talloc_free(msg);
1071                         continue;
1072                 }
1073
1074                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1075                         /* an internal error */
1076                         talloc_free(msg);
1077                         return LDB_ERR_OPERATIONS_ERROR;
1078                 }
1079
1080                 ret = ldb_match_msg_error(ldb, msg,
1081                                           ac->tree, ac->base, ac->scope, &matched);
1082                 if (ret != LDB_SUCCESS) {
1083                         talloc_free(msg);
1084                         return ret;
1085                 }
1086                 if (!matched) {
1087                         talloc_free(msg);
1088                         continue;
1089                 }
1090
1091                 /* filter the attributes that the user wants */
1092                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1093
1094                 talloc_free(msg);
1095
1096                 if (ret == -1) {
1097                         return LDB_ERR_OPERATIONS_ERROR;
1098                 }
1099
1100                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1101                 if (ret != LDB_SUCCESS) {
1102                         /* Regardless of success or failure, the msg
1103                          * is the callbacks responsiblity, and should
1104                          * not be talloc_free()'ed */
1105                         ac->request_terminated = true;
1106                         return ret;
1107                 }
1108
1109                 (*match_count)++;
1110         }
1111
1112         return LDB_SUCCESS;
1113 }
1114
1115 /*
1116   remove any duplicated entries in a indexed result
1117  */
1118 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1119 {
1120         unsigned int i, new_count;
1121
1122         if (list->count < 2) {
1123                 return;
1124         }
1125
1126         TYPESAFE_QSORT(list->dn, list->count,
1127                        ldb_val_equal_exact_for_qsort);
1128
1129         new_count = 1;
1130         for (i=1; i<list->count; i++) {
1131                 if (ldb_val_equal_exact(&list->dn[i],
1132                                         &list->dn[new_count-1]) == 0) {
1133                         if (new_count != i) {
1134                                 list->dn[new_count] = list->dn[i];
1135                         }
1136                         new_count++;
1137                 }
1138         }
1139
1140         list->count = new_count;
1141 }
1142
1143 /*
1144   search the database with a LDAP-like expression using indexes
1145   returns -1 if an indexed search is not possible, in which
1146   case the caller should call ltdb_search_full()
1147 */
1148 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1149 {
1150         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1151         struct dn_list *dn_list;
1152         int ret;
1153
1154         /* see if indexing is enabled */
1155         if (!ltdb->cache->attribute_indexes &&
1156             !ltdb->cache->one_level_indexes &&
1157             ac->scope != LDB_SCOPE_BASE) {
1158                 /* fallback to a full search */
1159                 return LDB_ERR_OPERATIONS_ERROR;
1160         }
1161
1162         dn_list = talloc_zero(ac, struct dn_list);
1163         if (dn_list == NULL) {
1164                 return ldb_module_oom(ac->module);
1165         }
1166
1167         switch (ac->scope) {
1168         case LDB_SCOPE_BASE:
1169                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1170                 if (dn_list->dn == NULL) {
1171                         talloc_free(dn_list);
1172                         return ldb_module_oom(ac->module);
1173                 }
1174                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1175                 if (dn_list->dn[0].data == NULL) {
1176                         talloc_free(dn_list);
1177                         return ldb_module_oom(ac->module);
1178                 }
1179                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1180                 dn_list->count = 1;
1181                 break;
1182
1183         case LDB_SCOPE_ONELEVEL:
1184                 if (!ltdb->cache->one_level_indexes) {
1185                         talloc_free(dn_list);
1186                         return LDB_ERR_OPERATIONS_ERROR;
1187                 }
1188                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1189                 if (ret != LDB_SUCCESS) {
1190                         talloc_free(dn_list);
1191                         return ret;
1192                 }
1193                 break;
1194
1195         case LDB_SCOPE_SUBTREE:
1196         case LDB_SCOPE_DEFAULT:
1197                 if (!ltdb->cache->attribute_indexes) {
1198                         talloc_free(dn_list);
1199                         return LDB_ERR_OPERATIONS_ERROR;
1200                 }
1201                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1202                 if (ret != LDB_SUCCESS) {
1203                         talloc_free(dn_list);
1204                         return ret;
1205                 }
1206                 ltdb_dn_list_remove_duplicates(dn_list);
1207                 break;
1208         }
1209
1210         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count);
1211         talloc_free(dn_list);
1212         return ret;
1213 }
1214
1215 /**
1216  * @brief Add a DN in the index list of a given attribute name/value pair
1217  *
1218  * This function will add the DN in the index list for the index for
1219  * the given attribute name and value.
1220  *
1221  * @param[in]  module       A ldb_module structure
1222  *
1223  * @param[in]  dn           The string representation of the DN as it
1224  *                          will be stored in the index entry
1225  *
1226  * @param[in]  el           A ldb_message_element array, one of the entry
1227  *                          referred by the v_idx is the attribute name and
1228  *                          value pair which will be used to construct the
1229  *                          index name
1230  *
1231  * @param[in]  v_idx        The index of element in the el array to use
1232  *
1233  * @return                  An ldb error code
1234  */
1235 static int ltdb_index_add1(struct ldb_module *module,
1236                            struct ltdb_private *ltdb,
1237                            const char *dn,
1238                            struct ldb_message_element *el, int v_idx)
1239 {
1240         struct ldb_context *ldb;
1241         struct ldb_dn *dn_key;
1242         int ret;
1243         const struct ldb_schema_attribute *a;
1244         struct dn_list *list;
1245         unsigned alloc_len;
1246
1247         ldb = ldb_module_get_ctx(module);
1248
1249         list = talloc_zero(module, struct dn_list);
1250         if (list == NULL) {
1251                 return LDB_ERR_OPERATIONS_ERROR;
1252         }
1253
1254         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1255         if (!dn_key) {
1256                 talloc_free(list);
1257                 return LDB_ERR_OPERATIONS_ERROR;
1258         }
1259         talloc_steal(list, dn_key);
1260
1261         ret = ltdb_dn_list_load(module, dn_key, list);
1262         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1263                 talloc_free(list);
1264                 return ret;
1265         }
1266
1267         if (list->count > 0 &&
1268             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1269                 /*
1270                  * We do not want to print info about a possibly
1271                  * confidential DN that the conflict was with in the
1272                  * user-visible error string
1273                  */
1274                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1275                           __location__ ": unique index violation on %s in %s, "
1276                           "conficts with %*.*s in %s",
1277                           el->name, dn,
1278                           (int)list->dn[0].length,
1279                           (int)list->dn[0].length,
1280                           list->dn[0].data,
1281                           ldb_dn_get_linearized(dn_key));
1282                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1283                                        el->name, dn);
1284                 talloc_free(list);
1285                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1286         }
1287
1288         /* overallocate the list a bit, to reduce the number of
1289          * realloc trigered copies */
1290         alloc_len = ((list->count+1)+7) & ~7;
1291         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1292         if (list->dn == NULL) {
1293                 talloc_free(list);
1294                 return LDB_ERR_OPERATIONS_ERROR;
1295         }
1296
1297         list->dn[list->count].data
1298                 = (uint8_t *)talloc_strdup(list->dn, dn);
1299         if (list->dn[list->count].data == NULL) {
1300                 talloc_free(list);
1301                 return LDB_ERR_OPERATIONS_ERROR;
1302         }
1303         list->dn[list->count].length = strlen(dn);
1304         list->count++;
1305
1306         ret = ltdb_dn_list_store(module, dn_key, list);
1307
1308         talloc_free(list);
1309
1310         return ret;
1311 }
1312
1313 /*
1314   add index entries for one elements in a message
1315  */
1316 static int ltdb_index_add_el(struct ldb_module *module,
1317                              struct ltdb_private *ltdb,
1318                              const char *dn,
1319                              struct ldb_message_element *el)
1320 {
1321         unsigned int i;
1322         for (i = 0; i < el->num_values; i++) {
1323                 int ret = ltdb_index_add1(module, ltdb,
1324                                           dn, el, i);
1325                 if (ret != LDB_SUCCESS) {
1326                         return ret;
1327                 }
1328         }
1329
1330         return LDB_SUCCESS;
1331 }
1332
1333 /*
1334   add index entries for all elements in a message
1335  */
1336 static int ltdb_index_add_all(struct ldb_module *module,
1337                               struct ltdb_private *ltdb,
1338                               const char *dn,
1339                               struct ldb_message_element *elements,
1340                               unsigned int num_el)
1341 {
1342         unsigned int i;
1343
1344         if (dn[0] == '@') {
1345                 return LDB_SUCCESS;
1346         }
1347
1348         if (!ltdb->cache->attribute_indexes) {
1349                 /* no indexed fields */
1350                 return LDB_SUCCESS;
1351         }
1352
1353         for (i = 0; i < num_el; i++) {
1354                 int ret;
1355                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1356                         continue;
1357                 }
1358                 ret = ltdb_index_add_el(module, ltdb, dn, &elements[i]);
1359                 if (ret != LDB_SUCCESS) {
1360                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1361                         ldb_asprintf_errstring(ldb,
1362                                                __location__ ": Failed to re-index %s in %s - %s",
1363                                                elements[i].name, dn, ldb_errstring(ldb));
1364                         return ret;
1365                 }
1366         }
1367
1368         return LDB_SUCCESS;
1369 }
1370
1371
1372 /*
1373   insert a one level index for a message
1374 */
1375 static int ltdb_index_onelevel(struct ldb_module *module,
1376                                const struct ldb_message *msg, int add)
1377 {       
1378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1379                                                     struct ltdb_private);
1380         struct ldb_message_element el;
1381         struct ldb_val val;
1382         struct ldb_dn *pdn;
1383         const char *dn;
1384         int ret;
1385
1386         /* We index for ONE Level only if requested */
1387         if (!ltdb->cache->one_level_indexes) {
1388                 return LDB_SUCCESS;
1389         }
1390
1391         pdn = ldb_dn_get_parent(module, msg->dn);
1392         if (pdn == NULL) {
1393                 return LDB_ERR_OPERATIONS_ERROR;
1394         }
1395
1396         dn = ldb_dn_get_linearized(msg->dn);
1397         if (dn == NULL) {
1398                 talloc_free(pdn);
1399                 return LDB_ERR_OPERATIONS_ERROR;
1400         }
1401
1402         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1403         if (val.data == NULL) {
1404                 talloc_free(pdn);
1405                 return LDB_ERR_OPERATIONS_ERROR;
1406         }
1407
1408         val.length = strlen((char *)val.data);
1409         el.name = LTDB_IDXONE;
1410         el.values = &val;
1411         el.num_values = 1;
1412
1413         if (add) {
1414                 ret = ltdb_index_add1(module, ltdb, dn, &el, 0);
1415         } else { /* delete */
1416                 ret = ltdb_index_del_value(module, ltdb, msg->dn, &el, 0);
1417         }
1418
1419         talloc_free(pdn);
1420
1421         return ret;
1422 }
1423
1424 /*
1425   add the index entries for a new element in a record
1426   The caller guarantees that these element values are not yet indexed
1427 */
1428 int ltdb_index_add_element(struct ldb_module *module,
1429                            struct ltdb_private *ltdb,
1430                            struct ldb_dn *dn,
1431                            struct ldb_message_element *el)
1432 {
1433         if (ldb_dn_is_special(dn)) {
1434                 return LDB_SUCCESS;
1435         }
1436         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1437                 return LDB_SUCCESS;
1438         }
1439         return ltdb_index_add_el(module, ltdb,
1440                                  ldb_dn_get_linearized(dn), el);
1441 }
1442
1443 /*
1444   add the index entries for a new record
1445 */
1446 int ltdb_index_add_new(struct ldb_module *module,
1447                        struct ltdb_private *ltdb,
1448                        const struct ldb_message *msg)
1449 {
1450         const char *dn;
1451         int ret;
1452
1453         if (ldb_dn_is_special(msg->dn)) {
1454                 return LDB_SUCCESS;
1455         }
1456
1457         dn = ldb_dn_get_linearized(msg->dn);
1458         if (dn == NULL) {
1459                 return LDB_ERR_OPERATIONS_ERROR;
1460         }
1461
1462         ret = ltdb_index_add_all(module, ltdb, dn, msg->elements,
1463                                  msg->num_elements);
1464         if (ret != LDB_SUCCESS) {
1465                 return ret;
1466         }
1467
1468         return ltdb_index_onelevel(module, msg, 1);
1469 }
1470
1471
1472 /*
1473   delete an index entry for one message element
1474 */
1475 int ltdb_index_del_value(struct ldb_module *module,
1476                          struct ltdb_private *ltdb,
1477                          struct ldb_dn *dn,
1478                          struct ldb_message_element *el, unsigned int v_idx)
1479 {
1480         struct ldb_context *ldb;
1481         struct ldb_dn *dn_key;
1482         const char *dn_str;
1483         int ret, i;
1484         unsigned int j;
1485         struct dn_list *list;
1486
1487         ldb = ldb_module_get_ctx(module);
1488
1489         dn_str = ldb_dn_get_linearized(dn);
1490         if (dn_str == NULL) {
1491                 return LDB_ERR_OPERATIONS_ERROR;
1492         }
1493
1494         if (dn_str[0] == '@') {
1495                 return LDB_SUCCESS;
1496         }
1497
1498         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1499         if (!dn_key) {
1500                 return LDB_ERR_OPERATIONS_ERROR;
1501         }
1502
1503         list = talloc_zero(dn_key, struct dn_list);
1504         if (list == NULL) {
1505                 talloc_free(dn_key);
1506                 return LDB_ERR_OPERATIONS_ERROR;
1507         }
1508
1509         ret = ltdb_dn_list_load(module, dn_key, list);
1510         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1511                 /* it wasn't indexed. Did we have an earlier error? If we did then
1512                    its gone now */
1513                 talloc_free(dn_key);
1514                 return LDB_SUCCESS;
1515         }
1516
1517         if (ret != LDB_SUCCESS) {
1518                 talloc_free(dn_key);
1519                 return ret;
1520         }
1521
1522         i = ltdb_dn_list_find_str(ltdb, list, dn_str);
1523         if (i == -1) {
1524                 /* nothing to delete */
1525                 talloc_free(dn_key);
1526                 return LDB_SUCCESS;
1527         }
1528
1529         j = (unsigned int) i;
1530         if (j != list->count - 1) {
1531                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1532         }
1533         list->count--;
1534         if (list->count == 0) {
1535                 talloc_free(list->dn);
1536                 list->dn = NULL;
1537         } else {
1538                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1539         }
1540
1541         ret = ltdb_dn_list_store(module, dn_key, list);
1542
1543         talloc_free(dn_key);
1544
1545         return ret;
1546 }
1547
1548 /*
1549   delete the index entries for a element
1550   return -1 on failure
1551 */
1552 int ltdb_index_del_element(struct ldb_module *module,
1553                            struct ltdb_private *ltdb,
1554                            struct ldb_dn *dn,
1555                            struct ldb_message_element *el)
1556 {
1557         const char *dn_str;
1558         int ret;
1559         unsigned int i;
1560
1561         if (!ltdb->cache->attribute_indexes) {
1562                 /* no indexed fields */
1563                 return LDB_SUCCESS;
1564         }
1565
1566         dn_str = ldb_dn_get_linearized(dn);
1567         if (dn_str == NULL) {
1568                 return LDB_ERR_OPERATIONS_ERROR;
1569         }
1570
1571         if (dn_str[0] == '@') {
1572                 return LDB_SUCCESS;
1573         }
1574
1575         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1576                 return LDB_SUCCESS;
1577         }
1578         for (i = 0; i < el->num_values; i++) {
1579                 ret = ltdb_index_del_value(module, ltdb, dn, el, i);
1580                 if (ret != LDB_SUCCESS) {
1581                         return ret;
1582                 }
1583         }
1584
1585         return LDB_SUCCESS;
1586 }
1587
1588 /*
1589   delete the index entries for a record
1590   return -1 on failure
1591 */
1592 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1593 {
1594         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1595         int ret;
1596         unsigned int i;
1597
1598         if (ldb_dn_is_special(msg->dn)) {
1599                 return LDB_SUCCESS;
1600         }
1601
1602         ret = ltdb_index_onelevel(module, msg, 0);
1603         if (ret != LDB_SUCCESS) {
1604                 return ret;
1605         }
1606
1607         if (!ltdb->cache->attribute_indexes) {
1608                 /* no indexed fields */
1609                 return LDB_SUCCESS;
1610         }
1611
1612         for (i = 0; i < msg->num_elements; i++) {
1613                 ret = ltdb_index_del_element(module, ltdb,
1614                                              msg->dn, &msg->elements[i]);
1615                 if (ret != LDB_SUCCESS) {
1616                         return ret;
1617                 }
1618         }
1619
1620         return LDB_SUCCESS;
1621 }
1622
1623
1624 /*
1625   traversal function that deletes all @INDEX records
1626 */
1627 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1628 {
1629         struct ldb_module *module = state;
1630         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1631         const char *dnstr = "DN=" LTDB_INDEX ":";
1632         struct dn_list list;
1633         struct ldb_dn *dn;
1634         struct ldb_val v;
1635         int ret;
1636
1637         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1638                 return 0;
1639         }
1640         /* we need to put a empty list in the internal tdb for this
1641          * index entry */
1642         list.dn = NULL;
1643         list.count = 0;
1644
1645         /* the offset of 3 is to remove the DN= prefix. */
1646         v.data = key.dptr + 3;
1647         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1648
1649         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1650         ret = ltdb_dn_list_store(module, dn, &list);
1651         if (ret != LDB_SUCCESS) {
1652                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1653                                        "Unable to store null index for %s\n",
1654                                                 ldb_dn_get_linearized(dn));
1655                 talloc_free(dn);
1656                 return -1;
1657         }
1658         talloc_free(dn);
1659         return 0;
1660 }
1661
1662 struct ltdb_reindex_context {
1663         struct ldb_module *module;
1664         int error;
1665 };
1666
1667 /*
1668   traversal function that adds @INDEX records during a re index
1669 */
1670 static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1671 {
1672         struct ldb_context *ldb;
1673         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1674         struct ldb_module *module = ctx->module;
1675         struct ldb_message *msg;
1676         unsigned int nb_elements_in_db;
1677         const struct ldb_val val = {
1678                 .data = data.dptr,
1679                 .length = data.dsize,
1680         };
1681         int ret;
1682         TDB_DATA key2;
1683         bool is_record;
1684         
1685         ldb = ldb_module_get_ctx(module);
1686
1687         if (key.dsize > 4 &&
1688             memcmp(key.dptr, "DN=@", 4) == 0) {
1689                 return 0;
1690         }
1691
1692         is_record = ltdb_key_is_record(key);
1693         if (is_record == false) {
1694                 return 0;
1695         }
1696         
1697         msg = ldb_msg_new(module);
1698         if (msg == NULL) {
1699                 return -1;
1700         }
1701
1702         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1703                                                    msg,
1704                                                    NULL, 0,
1705                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1706                                                    &nb_elements_in_db);
1707         if (ret != 0) {
1708                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1709                                                 ldb_dn_get_linearized(msg->dn));
1710                 ctx->error = ret;
1711                 talloc_free(msg);
1712                 return -1;
1713         }
1714
1715         if (msg->dn == NULL) {
1716                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1717                           "Refusing to re-index as GUID "
1718                           "key %*.*s with no DN\n",
1719                           (int)key.dsize, (int)key.dsize,
1720                           (char *)key.dptr);
1721                 talloc_free(msg);
1722                 return -1;
1723         }
1724         
1725         /* check if the DN key has changed, perhaps due to the case
1726            insensitivity of an element changing, or a change from DN
1727            to GUID keys */
1728         key2 = ltdb_key_msg(module, msg);
1729         if (key2.dptr == NULL) {
1730                 /* probably a corrupt record ... darn */
1731                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1732                                                 ldb_dn_get_linearized(msg->dn));
1733                 talloc_free(msg);
1734                 return 0;
1735         }
1736         if (key.dsize != key2.dsize ||
1737             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
1738                 int tdb_ret;
1739                 tdb_ret = tdb_delete(tdb, key);
1740                 if (tdb_ret != 0) {
1741                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1742                                   "Failed to delete %*.*s "
1743                                   "for rekey as %*.*s: %s",
1744                                   (int)key.dsize, (int)key.dsize,
1745                                   (const char *)key.dptr,
1746                                   (int)key2.dsize, (int)key2.dsize,
1747                                   (const char *)key.dptr,
1748                                   tdb_errorstr(tdb));
1749                         ctx->error = ltdb_err_map(tdb_error(tdb));
1750                         return -1;
1751                 }
1752                 tdb_ret = tdb_store(tdb, key2, data, 0);
1753                 if (tdb_ret != 0) {
1754                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1755                                   "Failed to rekey %*.*s as %*.*s: %s",
1756                                   (int)key.dsize, (int)key.dsize,
1757                                   (const char *)key.dptr,
1758                                   (int)key2.dsize, (int)key2.dsize,
1759                                   (const char *)key.dptr,
1760                                   tdb_errorstr(tdb));
1761                         ctx->error = ltdb_err_map(tdb_error(tdb));
1762                         return -1;
1763                 }
1764         }
1765         talloc_free(key2.dptr);
1766
1767         talloc_free(msg);
1768
1769         return 0;
1770 }
1771
1772 /*
1773   traversal function that adds @INDEX records during a re index
1774 */
1775 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1776 {
1777         struct ldb_context *ldb;
1778         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1779         struct ldb_module *module = ctx->module;
1780         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1781                                                     struct ltdb_private);
1782         struct ldb_message *msg;
1783         const char *dn = NULL;
1784         unsigned int nb_elements_in_db;
1785         const struct ldb_val val = {
1786                 .data = data.dptr,
1787                 .length = data.dsize,
1788         };
1789         int ret;
1790         bool is_record;
1791         
1792         ldb = ldb_module_get_ctx(module);
1793
1794         if (key.dsize > 4 &&
1795             memcmp(key.dptr, "DN=@", 4) == 0) {
1796                 return 0;
1797         }
1798
1799         is_record = ltdb_key_is_record(key);
1800         if (is_record == false) {
1801                 return 0;
1802         }
1803         
1804         msg = ldb_msg_new(module);
1805         if (msg == NULL) {
1806                 return -1;
1807         }
1808
1809         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1810                                                    msg,
1811                                                    NULL, 0,
1812                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1813                                                    &nb_elements_in_db);
1814         if (ret != 0) {
1815                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1816                                                 ldb_dn_get_linearized(msg->dn));
1817                 ctx->error = ret;
1818                 talloc_free(msg);
1819                 return -1;
1820         }
1821
1822         if (msg->dn == NULL) {
1823                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1824                           "Refusing to re-index as GUID "
1825                           "key %*.*s with no DN\n",
1826                           (int)key.dsize, (int)key.dsize,
1827                           (char *)key.dptr);
1828                 talloc_free(msg);
1829                 return -1;
1830         } else {
1831                 dn = ldb_dn_get_linearized(msg->dn);
1832         }
1833
1834         ret = ltdb_index_onelevel(module, msg, 1);
1835         if (ret != LDB_SUCCESS) {
1836                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1837                           "Adding special ONE LEVEL index failed (%s)!",
1838                                                 ldb_dn_get_linearized(msg->dn));
1839                 talloc_free(msg);
1840                 return -1;
1841         }
1842
1843         ret = ltdb_index_add_all(module, ltdb, dn,
1844                                  msg->elements, msg->num_elements);
1845
1846         if (ret != LDB_SUCCESS) {
1847                 ctx->error = ret;
1848                 talloc_free(msg);
1849                 return -1;
1850         }
1851
1852         talloc_free(msg);
1853
1854         return 0;
1855 }
1856
1857 /*
1858   force a complete reindex of the database
1859 */
1860 int ltdb_reindex(struct ldb_module *module)
1861 {
1862         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1863         int ret;
1864         struct ltdb_reindex_context ctx;
1865
1866         /*
1867          * Only triggered after a modification, but make clear we do
1868          * not re-index a read-only DB
1869          */
1870         if (ltdb->read_only) {
1871                 return LDB_ERR_UNWILLING_TO_PERFORM;
1872         }
1873
1874         if (ltdb_cache_reload(module) != 0) {
1875                 return LDB_ERR_OPERATIONS_ERROR;
1876         }
1877
1878         /*
1879          * Ensure we read (and so remove) the entries from the real
1880          * DB, no values stored so far are any use as we want to do a
1881          * re-index
1882          */
1883         ltdb_index_transaction_cancel(module);
1884
1885         ret = ltdb_index_transaction_start(module);
1886         if (ret != LDB_SUCCESS) {
1887                 return ret;
1888         }
1889
1890         /* first traverse the database deleting any @INDEX records by
1891          * putting NULL entries in the in-memory tdb
1892          */
1893         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1894         if (ret < 0) {
1895                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1896                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
1897                                        ldb_errstring(ldb));
1898                 return LDB_ERR_OPERATIONS_ERROR;
1899         }
1900
1901         /* if we don't have indexes we have nothing todo */
1902         if (!ltdb->cache->attribute_indexes) {
1903                 return LDB_SUCCESS;
1904         }
1905
1906         ctx.module = module;
1907         ctx.error = 0;
1908
1909         /* now traverse adding any indexes for normal LDB records */
1910         ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
1911         if (ret < 0) {
1912                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1913                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
1914                                        ldb_errstring(ldb));
1915                 return LDB_ERR_OPERATIONS_ERROR;
1916         }
1917
1918         if (ctx.error != LDB_SUCCESS) {
1919                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1920                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1921                 return ctx.error;
1922         }
1923
1924         ctx.error = 0;
1925
1926         /* now traverse adding any indexes for normal LDB records */
1927         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1928         if (ret < 0) {
1929                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1930                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
1931                                        ldb_errstring(ldb));
1932                 return LDB_ERR_OPERATIONS_ERROR;
1933         }
1934
1935         if (ctx.error != LDB_SUCCESS) {
1936                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1937                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1938                 return ctx.error;
1939         }
1940
1941         return LDB_SUCCESS;
1942 }