9327d095b284aebb7415cc08575cec9e9058418c
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "ldb_private.h"
36
37 struct dn_list {
38         unsigned int count;
39         struct ldb_val *dn;
40 };
41
42 struct ltdb_idxptr {
43         struct tdb_context *itdb;
44         int error;
45 };
46
47 static int ltdb_write_index_dn_guid(struct ldb_module *module,
48                                     const struct ldb_message *msg,
49                                     int add);
50
51 /* we put a @IDXVERSION attribute on index entries. This
52    allows us to tell if it was written by an older version
53 */
54 #define LTDB_INDEXING_VERSION 2
55
56 #define LTDB_GUID_INDEXING_VERSION 3
57
58 /* enable the idxptr mode when transactions start */
59 int ltdb_index_transaction_start(struct ldb_module *module)
60 {
61         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
62         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
63         if (ltdb->idxptr == NULL) {
64                 return ldb_oom(ldb_module_get_ctx(module));
65         }
66
67         return LDB_SUCCESS;
68 }
69
70 /*
71   see if two ldb_val structures contain exactly the same data
72   return -1 or 1 for a mismatch, 0 for match
73 */
74 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
75                                          const struct ldb_val *v2)
76 {
77         if (v1->length > v2->length) {
78                 return -1;
79         }
80         if (v1->length < v2->length) {
81                 return 1;
82         }
83         return memcmp(v1->data, v2->data, v1->length);
84 }
85
86
87 /*
88   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
89   binary-safe comparison for the 'dn' returns -1 if not found
90
91   This is therefore safe when the value is a GUID in the future
92  */
93 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
94                                  const struct dn_list *list,
95                                  const struct ldb_val *v)
96 {
97         unsigned int i;
98         for (i=0; i<list->count; i++) {
99                 if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
100                         return i;
101                 }
102         }
103         return -1;
104 }
105
106 /*
107   find a entry in a dn_list. Uses a case sensitive comparison with the dn
108   returns -1 if not found
109  */
110 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
111                                  struct dn_list *list,
112                                  const struct ldb_message *msg)
113 {
114         struct ldb_val v;
115         const struct ldb_val *key_val;
116         if (ltdb->cache->GUID_index_attribute == NULL) {
117                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
118                 v.data = discard_const_p(unsigned char, dn_str);
119                 v.length = strlen(dn_str);
120         } else {
121                 key_val = ldb_msg_find_ldb_val(msg,
122                                                ltdb->cache->GUID_index_attribute);
123                 if (key_val == NULL) {
124                         return -1;
125                 }
126                 v = *key_val;
127         }
128         return ltdb_dn_list_find_val(ltdb, list, &v);
129 }
130
131 /*
132   this is effectively a cast function, but with lots of paranoia
133   checks and also copes with CPUs that are fussy about pointer
134   alignment
135  */
136 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
137 {
138         struct dn_list *list;
139         if (rec.dsize != sizeof(void *)) {
140                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
141                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
142                 return NULL;
143         }
144         /* note that we can't just use a cast here, as rec.dptr may
145            not be aligned sufficiently for a pointer. A cast would cause
146            platforms like some ARM CPUs to crash */
147         memcpy(&list, rec.dptr, sizeof(void *));
148         list = talloc_get_type(list, struct dn_list);
149         if (list == NULL) {
150                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
151                                        "Bad type '%s' for idxptr",
152                                        talloc_get_name(list));
153                 return NULL;
154         }
155         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
156                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
157                                        "Bad parent '%s' for idxptr",
158                                        talloc_get_name(talloc_parent(list->dn)));
159                 return NULL;
160         }
161         return list;
162 }
163
164 /*
165   return the @IDX list in an index entry for a dn as a
166   struct dn_list
167  */
168 static int ltdb_dn_list_load(struct ldb_module *module,
169                              struct ltdb_private *ltdb,
170                              struct ldb_dn *dn, struct dn_list *list)
171 {
172         struct ldb_message *msg;
173         int ret;
174         struct ldb_message_element *el;
175         TDB_DATA rec;
176         struct dn_list *list2;
177         TDB_DATA key;
178
179         list->dn = NULL;
180         list->count = 0;
181
182         /* see if we have any in-memory index entries */
183         if (ltdb->idxptr == NULL ||
184             ltdb->idxptr->itdb == NULL) {
185                 goto normal_index;
186         }
187
188         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
189         key.dsize = strlen((char *)key.dptr);
190
191         rec = tdb_fetch(ltdb->idxptr->itdb, key);
192         if (rec.dptr == NULL) {
193                 goto normal_index;
194         }
195
196         /* we've found an in-memory index entry */
197         list2 = ltdb_index_idxptr(module, rec, true);
198         if (list2 == NULL) {
199                 free(rec.dptr);
200                 return LDB_ERR_OPERATIONS_ERROR;
201         }
202         free(rec.dptr);
203
204         *list = *list2;
205         return LDB_SUCCESS;
206
207 normal_index:
208         msg = ldb_msg_new(list);
209         if (msg == NULL) {
210                 return LDB_ERR_OPERATIONS_ERROR;
211         }
212
213         ret = ltdb_search_dn1(module, dn, msg,
214                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
215                               |LDB_UNPACK_DATA_FLAG_NO_DN);
216         if (ret != LDB_SUCCESS) {
217                 talloc_free(msg);
218                 return ret;
219         }
220
221         /* TODO: check indexing version number */
222
223         el = ldb_msg_find_element(msg, LTDB_IDX);
224         if (!el) {
225                 talloc_free(msg);
226                 return LDB_SUCCESS;
227         }
228
229         /*
230          * we avoid copying the strings by stealing the list.  We have
231          * to steal msg onto el->values (which looks odd) because we
232          * asked for the memory to be allocated on msg, not on each
233          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
234          */
235         if (ltdb->cache->GUID_index_attribute == NULL) {
236                 talloc_steal(el->values, msg);
237                 list->dn = talloc_steal(list, el->values);
238                 list->count = el->num_values;
239         } else {
240                 unsigned int i;
241                 static const size_t GUID_val_size = 16;
242                 if (el->num_values != 1) {
243                         return LDB_ERR_OPERATIONS_ERROR;
244                 }
245
246                 if ((el->values[0].length % GUID_val_size) != 0) {
247                         return LDB_ERR_OPERATIONS_ERROR;
248                 }
249
250                 list->count = el->values[0].length / GUID_val_size;
251                 list->dn = talloc_array(list, struct ldb_val, list->count);
252
253                 /*
254                  * The actual data is on msg, due to
255                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
256                  */
257                 talloc_steal(list->dn, msg);
258                 for (i = 0; i < list->count; i++) {
259                         list->dn[i].data = &el->values[0].data[i * GUID_val_size];
260                         list->dn[i].length = GUID_val_size;
261                 }
262         }
263
264         /* We don't need msg->elements any more */
265         talloc_free(msg->elements);
266         return LDB_SUCCESS;
267 }
268
269
270 /*
271   save a dn_list into a full @IDX style record
272  */
273 static int ltdb_dn_list_store_full(struct ldb_module *module,
274                                    struct ltdb_private *ltdb,
275                                    struct ldb_dn *dn,
276                                    struct dn_list *list)
277 {
278         struct ldb_message *msg;
279         int ret;
280
281         msg = ldb_msg_new(module);
282         if (!msg) {
283                 return ldb_module_oom(module);
284         }
285
286         msg->dn = dn;
287
288         if (list->count == 0) {
289                 ret = ltdb_delete_noindex(module, msg);
290                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
291                         talloc_free(msg);
292                         return LDB_SUCCESS;
293                 }
294                 return ret;
295         }
296
297         if (ltdb->cache->GUID_index_attribute == NULL) {
298                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
299                                       LTDB_INDEXING_VERSION);
300                 if (ret != LDB_SUCCESS) {
301                         talloc_free(msg);
302                         return ldb_module_oom(module);
303                 }
304         } else {
305                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
306                                       LTDB_GUID_INDEXING_VERSION);
307                 if (ret != LDB_SUCCESS) {
308                         talloc_free(msg);
309                         return ldb_module_oom(module);
310                 }
311         }
312
313         if (list->count > 0) {
314                 struct ldb_message_element *el;
315
316                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
317                 if (ret != LDB_SUCCESS) {
318                         talloc_free(msg);
319                         return ldb_module_oom(module);
320                 }
321
322                 if (ltdb->cache->GUID_index_attribute == NULL) {
323                         el->values = list->dn;
324                         el->num_values = list->count;
325                 } else {
326                         struct ldb_val v;
327                         unsigned int i;
328                         el->values = talloc_array(msg,
329                                                   struct ldb_val, 1);
330                         if (el->values == NULL) {
331                                 talloc_free(msg);
332                                 return ldb_module_oom(module);
333                         }
334
335                         v.data = talloc_array_size(el->values,
336                                                    list->count,
337                                                    LTDB_GUID_SIZE);
338                         if (v.data == NULL) {
339                                 talloc_free(msg);
340                                 return ldb_module_oom(module);
341                         }
342
343                         v.length = talloc_get_size(v.data);
344
345                         for (i = 0; i < list->count; i++) {
346                                 if (list->dn[i].length !=
347                                     LTDB_GUID_SIZE) {
348                                         talloc_free(msg);
349                                         return ldb_module_operr(module);
350                                 }
351                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
352                                        list->dn[i].data,
353                                        LTDB_GUID_SIZE);
354                         }
355                         el->values[0] = v;
356                         el->num_values = 1;
357                 }
358         }
359
360         ret = ltdb_store(module, msg, TDB_REPLACE);
361         talloc_free(msg);
362         return ret;
363 }
364
365 /*
366   save a dn_list into the database, in either @IDX or internal format
367  */
368 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
369                               struct dn_list *list)
370 {
371         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
372         TDB_DATA rec, key;
373         int ret;
374         struct dn_list *list2;
375
376         if (ltdb->idxptr == NULL) {
377                 return ltdb_dn_list_store_full(module, ltdb,
378                                                dn, list);
379         }
380
381         if (ltdb->idxptr->itdb == NULL) {
382                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
383                 if (ltdb->idxptr->itdb == NULL) {
384                         return LDB_ERR_OPERATIONS_ERROR;
385                 }
386         }
387
388         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
389         key.dsize = strlen((char *)key.dptr);
390
391         rec = tdb_fetch(ltdb->idxptr->itdb, key);
392         if (rec.dptr != NULL) {
393                 list2 = ltdb_index_idxptr(module, rec, false);
394                 if (list2 == NULL) {
395                         free(rec.dptr);
396                         return LDB_ERR_OPERATIONS_ERROR;
397                 }
398                 free(rec.dptr);
399                 list2->dn = talloc_steal(list2, list->dn);
400                 list2->count = list->count;
401                 return LDB_SUCCESS;
402         }
403
404         list2 = talloc(ltdb->idxptr, struct dn_list);
405         if (list2 == NULL) {
406                 return LDB_ERR_OPERATIONS_ERROR;
407         }
408         list2->dn = talloc_steal(list2, list->dn);
409         list2->count = list->count;
410
411         rec.dptr = (uint8_t *)&list2;
412         rec.dsize = sizeof(void *);
413
414
415         /*
416          * This is not a store into the main DB, but into an in-memory
417          * TDB, so we don't need a guard on ltdb->read_only
418          */
419         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
420         if (ret != 0) {
421                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
422         }
423         return LDB_SUCCESS;
424 }
425
426 /*
427   traverse function for storing the in-memory index entries on disk
428  */
429 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
430 {
431         struct ldb_module *module = state;
432         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
433         struct ldb_dn *dn;
434         struct ldb_context *ldb = ldb_module_get_ctx(module);
435         struct ldb_val v;
436         struct dn_list *list;
437
438         list = ltdb_index_idxptr(module, data, true);
439         if (list == NULL) {
440                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
441                 return -1;
442         }
443
444         v.data = key.dptr;
445         v.length = strnlen((char *)key.dptr, key.dsize);
446
447         dn = ldb_dn_from_ldb_val(module, ldb, &v);
448         if (dn == NULL) {
449                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
450                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
451                 return -1;
452         }
453
454         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
455                                                       dn, list);
456         talloc_free(dn);
457         if (ltdb->idxptr->error != 0) {
458                 return -1;
459         }
460         return 0;
461 }
462
463 /* cleanup the idxptr mode when transaction commits */
464 int ltdb_index_transaction_commit(struct ldb_module *module)
465 {
466         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
467         int ret;
468
469         struct ldb_context *ldb = ldb_module_get_ctx(module);
470
471         ldb_reset_err_string(ldb);
472
473         if (ltdb->idxptr->itdb) {
474                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
475                 tdb_close(ltdb->idxptr->itdb);
476         }
477
478         ret = ltdb->idxptr->error;
479         if (ret != LDB_SUCCESS) {
480                 if (!ldb_errstring(ldb)) {
481                         ldb_set_errstring(ldb, ldb_strerror(ret));
482                 }
483                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
484         }
485
486         talloc_free(ltdb->idxptr);
487         ltdb->idxptr = NULL;
488         return ret;
489 }
490
491 /* cleanup the idxptr mode when transaction cancels */
492 int ltdb_index_transaction_cancel(struct ldb_module *module)
493 {
494         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
495         if (ltdb->idxptr && ltdb->idxptr->itdb) {
496                 tdb_close(ltdb->idxptr->itdb);
497         }
498         talloc_free(ltdb->idxptr);
499         ltdb->idxptr = NULL;
500         return LDB_SUCCESS;
501 }
502
503
504 /*
505   return the dn key to be used for an index
506   the caller is responsible for freeing
507 */
508 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
509                                      const char *attr, const struct ldb_val *value,
510                                      const struct ldb_schema_attribute **ap)
511 {
512         struct ldb_dn *ret;
513         struct ldb_val v;
514         const struct ldb_schema_attribute *a;
515         char *attr_folded;
516         int r;
517
518         attr_folded = ldb_attr_casefold(ldb, attr);
519         if (!attr_folded) {
520                 return NULL;
521         }
522
523         a = ldb_schema_attribute_by_name(ldb, attr);
524         if (ap) {
525                 *ap = a;
526         }
527         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
528         if (r != LDB_SUCCESS) {
529                 const char *errstr = ldb_errstring(ldb);
530                 /* canonicalisation can be refused. For example,
531                    a attribute that takes wildcards will refuse to canonicalise
532                    if the value contains a wildcard */
533                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
534                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
535                 talloc_free(attr_folded);
536                 return NULL;
537         }
538         if (ldb_should_b64_encode(ldb, &v)) {
539                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
540                 if (!vstr) {
541                         talloc_free(attr_folded);
542                         return NULL;
543                 }
544                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
545                 talloc_free(vstr);
546         } else {
547                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
548         }
549
550         if (v.data != value->data) {
551                 talloc_free(v.data);
552         }
553         talloc_free(attr_folded);
554
555         return ret;
556 }
557
558 /*
559   see if a attribute value is in the list of indexed attributes
560 */
561 static bool ltdb_is_indexed(struct ldb_module *module,
562                             struct ltdb_private *ltdb,
563                             const char *attr)
564 {
565         struct ldb_context *ldb = ldb_module_get_ctx(module);
566         unsigned int i;
567         struct ldb_message_element *el;
568
569         if (ldb->schema.index_handler_override) {
570                 const struct ldb_schema_attribute *a
571                         = ldb_schema_attribute_by_name(ldb, attr);
572
573                 if (a == NULL) {
574                         return false;
575                 }
576
577                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
578                         return true;
579                 } else {
580                         return false;
581                 }
582         }
583
584         if (!ltdb->cache->attribute_indexes) {
585                 return false;
586         }
587
588         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
589         if (el == NULL) {
590                 return false;
591         }
592
593         /* TODO: this is too expensive! At least use a binary search */
594         for (i=0; i<el->num_values; i++) {
595                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
596                         return true;
597                 }
598         }
599         return false;
600 }
601
602 /*
603   in the following logic functions, the return value is treated as
604   follows:
605
606      LDB_SUCCESS: we found some matching index values
607
608      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
609
610      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
611                                we'll need a full search
612  */
613
614 /*
615   return a list of dn's that might match a simple indexed search (an
616   equality search only)
617  */
618 static int ltdb_index_dn_simple(struct ldb_module *module,
619                                 struct ltdb_private *ltdb,
620                                 const struct ldb_parse_tree *tree,
621                                 struct dn_list *list)
622 {
623         struct ldb_context *ldb;
624         struct ldb_dn *dn;
625         int ret;
626
627         ldb = ldb_module_get_ctx(module);
628
629         list->count = 0;
630         list->dn = NULL;
631
632         /* if the attribute isn't in the list of indexed attributes then
633            this node needs a full search */
634         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
635                 return LDB_ERR_OPERATIONS_ERROR;
636         }
637
638         /* the attribute is indexed. Pull the list of DNs that match the
639            search criterion */
640         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
641         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
642
643         ret = ltdb_dn_list_load(module, ltdb, dn, list);
644         talloc_free(dn);
645         return ret;
646 }
647
648
649 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
650
651 /*
652   return a list of dn's that might match a leaf indexed search
653  */
654 static int ltdb_index_dn_leaf(struct ldb_module *module,
655                               struct ltdb_private *ltdb,
656                               const struct ldb_parse_tree *tree,
657                               struct dn_list *list)
658 {
659         if (ltdb->disallow_dn_filter &&
660             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
661                 /* in AD mode we do not support "(dn=...)" search filters */
662                 list->dn = NULL;
663                 list->count = 0;
664                 return LDB_SUCCESS;
665         }
666         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
667                 list->dn = talloc_array(list, struct ldb_val, 1);
668                 if (list->dn == NULL) {
669                         ldb_module_oom(module);
670                         return LDB_ERR_OPERATIONS_ERROR;
671                 }
672                 list->dn[0] = tree->u.equality.value;
673                 list->count = 1;
674                 return LDB_SUCCESS;
675         }
676         return ltdb_index_dn_simple(module, ltdb, tree, list);
677 }
678
679
680 /*
681   list intersection
682   list = list & list2
683 */
684 static bool list_intersect(struct ldb_context *ldb,
685                            struct ltdb_private *ltdb,
686                            struct dn_list *list, const struct dn_list *list2)
687 {
688         struct dn_list *list3;
689         unsigned int i;
690
691         if (list->count == 0) {
692                 /* 0 & X == 0 */
693                 return true;
694         }
695         if (list2->count == 0) {
696                 /* X & 0 == 0 */
697                 list->count = 0;
698                 list->dn = NULL;
699                 return true;
700         }
701
702         /* the indexing code is allowed to return a longer list than
703            what really matches, as all results are filtered by the
704            full expression at the end - this shortcut avoids a lot of
705            work in some cases */
706         if (list->count < 2 && list2->count > 10) {
707                 return true;
708         }
709         if (list2->count < 2 && list->count > 10) {
710                 list->count = list2->count;
711                 list->dn = list2->dn;
712                 /* note that list2 may not be the parent of list2->dn,
713                    as list2->dn may be owned by ltdb->idxptr. In that
714                    case we expect this reparent call to fail, which is
715                    OK */
716                 talloc_reparent(list2, list, list2->dn);
717                 return true;
718         }
719
720         list3 = talloc_zero(list, struct dn_list);
721         if (list3 == NULL) {
722                 return false;
723         }
724
725         list3->dn = talloc_array(list3, struct ldb_val, list->count);
726         if (!list3->dn) {
727                 talloc_free(list3);
728                 return false;
729         }
730         list3->count = 0;
731
732         for (i=0;i<list->count;i++) {
733                 if (ltdb_dn_list_find_val(ltdb, list2,
734                                           &list->dn[i]) != -1) {
735                         list3->dn[list3->count] = list->dn[i];
736                         list3->count++;
737                 }
738         }
739
740         list->dn = talloc_steal(list, list3->dn);
741         list->count = list3->count;
742         talloc_free(list3);
743
744         return true;
745 }
746
747
748 /*
749   list union
750   list = list | list2
751 */
752 static bool list_union(struct ldb_context *ldb,
753                        struct dn_list *list, const struct dn_list *list2)
754 {
755         struct ldb_val *dn3;
756
757         if (list2->count == 0) {
758                 /* X | 0 == X */
759                 return true;
760         }
761
762         if (list->count == 0) {
763                 /* 0 | X == X */
764                 list->count = list2->count;
765                 list->dn = list2->dn;
766                 /* note that list2 may not be the parent of list2->dn,
767                    as list2->dn may be owned by ltdb->idxptr. In that
768                    case we expect this reparent call to fail, which is
769                    OK */
770                 talloc_reparent(list2, list, list2->dn);
771                 return true;
772         }
773
774         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
775         if (!dn3) {
776                 ldb_oom(ldb);
777                 return false;
778         }
779
780         /* we allow for duplicates here, and get rid of them later */
781         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
782         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
783
784         list->dn = dn3;
785         list->count += list2->count;
786
787         return true;
788 }
789
790 static int ltdb_index_dn(struct ldb_module *module,
791                          struct ltdb_private *ltdb,
792                          const struct ldb_parse_tree *tree,
793                          struct dn_list *list);
794
795
796 /*
797   process an OR list (a union)
798  */
799 static int ltdb_index_dn_or(struct ldb_module *module,
800                             struct ltdb_private *ltdb,
801                             const struct ldb_parse_tree *tree,
802                             struct dn_list *list)
803 {
804         struct ldb_context *ldb;
805         unsigned int i;
806
807         ldb = ldb_module_get_ctx(module);
808
809         list->dn = NULL;
810         list->count = 0;
811
812         for (i=0; i<tree->u.list.num_elements; i++) {
813                 struct dn_list *list2;
814                 int ret;
815
816                 list2 = talloc_zero(list, struct dn_list);
817                 if (list2 == NULL) {
818                         return LDB_ERR_OPERATIONS_ERROR;
819                 }
820
821                 ret = ltdb_index_dn(module, ltdb,
822                                     tree->u.list.elements[i], list2);
823
824                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
825                         /* X || 0 == X */
826                         talloc_free(list2);
827                         continue;
828                 }
829
830                 if (ret != LDB_SUCCESS) {
831                         /* X || * == * */
832                         talloc_free(list2);
833                         return ret;
834                 }
835
836                 if (!list_union(ldb, list, list2)) {
837                         talloc_free(list2);
838                         return LDB_ERR_OPERATIONS_ERROR;
839                 }
840         }
841
842         if (list->count == 0) {
843                 return LDB_ERR_NO_SUCH_OBJECT;
844         }
845
846         return LDB_SUCCESS;
847 }
848
849
850 /*
851   NOT an index results
852  */
853 static int ltdb_index_dn_not(struct ldb_module *module,
854                              struct ltdb_private *ltdb,
855                              const struct ldb_parse_tree *tree,
856                              struct dn_list *list)
857 {
858         /* the only way to do an indexed not would be if we could
859            negate the not via another not or if we knew the total
860            number of database elements so we could know that the
861            existing expression covered the whole database.
862
863            instead, we just give up, and rely on a full index scan
864            (unless an outer & manages to reduce the list)
865         */
866         return LDB_ERR_OPERATIONS_ERROR;
867 }
868
869
870 static bool ltdb_index_unique(struct ldb_context *ldb,
871                               const char *attr)
872 {
873         const struct ldb_schema_attribute *a;
874         a = ldb_schema_attribute_by_name(ldb, attr);
875         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
876                 return true;
877         }
878         return false;
879 }
880
881 /*
882   process an AND expression (intersection)
883  */
884 static int ltdb_index_dn_and(struct ldb_module *module,
885                              struct ltdb_private *ltdb,
886                              const struct ldb_parse_tree *tree,
887                              struct dn_list *list)
888 {
889         struct ldb_context *ldb;
890         unsigned int i;
891         bool found;
892
893         ldb = ldb_module_get_ctx(module);
894
895         list->dn = NULL;
896         list->count = 0;
897
898         /* in the first pass we only look for unique simple
899            equality tests, in the hope of avoiding having to look
900            at any others */
901         for (i=0; i<tree->u.list.num_elements; i++) {
902                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
903                 int ret;
904
905                 if (subtree->operation != LDB_OP_EQUALITY ||
906                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
907                         continue;
908                 }
909
910                 ret = ltdb_index_dn(module, ltdb, subtree, list);
911                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
912                         /* 0 && X == 0 */
913                         return LDB_ERR_NO_SUCH_OBJECT;
914                 }
915                 if (ret == LDB_SUCCESS) {
916                         /* a unique index match means we can
917                          * stop. Note that we don't care if we return
918                          * a few too many objects, due to later
919                          * filtering */
920                         return LDB_SUCCESS;
921                 }
922         }
923
924         /* now do a full intersection */
925         found = false;
926
927         for (i=0; i<tree->u.list.num_elements; i++) {
928                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
929                 struct dn_list *list2;
930                 int ret;
931
932                 list2 = talloc_zero(list, struct dn_list);
933                 if (list2 == NULL) {
934                         return ldb_module_oom(module);
935                 }
936
937                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
938
939                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
940                         /* X && 0 == 0 */
941                         list->dn = NULL;
942                         list->count = 0;
943                         talloc_free(list2);
944                         return LDB_ERR_NO_SUCH_OBJECT;
945                 }
946
947                 if (ret != LDB_SUCCESS) {
948                         /* this didn't adding anything */
949                         talloc_free(list2);
950                         continue;
951                 }
952
953                 if (!found) {
954                         talloc_reparent(list2, list, list->dn);
955                         list->dn = list2->dn;
956                         list->count = list2->count;
957                         found = true;
958                 } else if (!list_intersect(ldb, ltdb,
959                                            list, list2)) {
960                         talloc_free(list2);
961                         return LDB_ERR_OPERATIONS_ERROR;
962                 }
963
964                 if (list->count == 0) {
965                         list->dn = NULL;
966                         return LDB_ERR_NO_SUCH_OBJECT;
967                 }
968
969                 if (list->count < 2) {
970                         /* it isn't worth loading the next part of the tree */
971                         return LDB_SUCCESS;
972                 }
973         }
974
975         if (!found) {
976                 /* none of the attributes were indexed */
977                 return LDB_ERR_OPERATIONS_ERROR;
978         }
979
980         return LDB_SUCCESS;
981 }
982
983 /*
984   return a list of matching objects using a one-level index
985  */
986 static int ltdb_index_dn_one(struct ldb_module *module,
987                              struct ltdb_private *ltdb,
988                              struct ldb_dn *parent_dn,
989                              struct dn_list *list)
990 {
991         struct ldb_context *ldb;
992         struct ldb_dn *key;
993         struct ldb_val val;
994         int ret;
995
996         ldb = ldb_module_get_ctx(module);
997
998         /* work out the index key from the parent DN */
999         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
1000         val.length = strlen((char *)val.data);
1001         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
1002         if (!key) {
1003                 ldb_oom(ldb);
1004                 return LDB_ERR_OPERATIONS_ERROR;
1005         }
1006
1007         ret = ltdb_dn_list_load(module, ltdb, key, list);
1008         talloc_free(key);
1009         if (ret != LDB_SUCCESS) {
1010                 return ret;
1011         }
1012
1013         if (list->count == 0) {
1014                 return LDB_ERR_NO_SUCH_OBJECT;
1015         }
1016
1017         return LDB_SUCCESS;
1018 }
1019
1020 /*
1021   return a list of dn's that might match a indexed search or
1022   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1023  */
1024 static int ltdb_index_dn(struct ldb_module *module,
1025                          struct ltdb_private *ltdb,
1026                          const struct ldb_parse_tree *tree,
1027                          struct dn_list *list)
1028 {
1029         int ret = LDB_ERR_OPERATIONS_ERROR;
1030
1031         switch (tree->operation) {
1032         case LDB_OP_AND:
1033                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1034                 break;
1035
1036         case LDB_OP_OR:
1037                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1038                 break;
1039
1040         case LDB_OP_NOT:
1041                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1042                 break;
1043
1044         case LDB_OP_EQUALITY:
1045                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1046                 break;
1047
1048         case LDB_OP_SUBSTRING:
1049         case LDB_OP_GREATER:
1050         case LDB_OP_LESS:
1051         case LDB_OP_PRESENT:
1052         case LDB_OP_APPROX:
1053         case LDB_OP_EXTENDED:
1054                 /* we can't index with fancy bitops yet */
1055                 ret = LDB_ERR_OPERATIONS_ERROR;
1056                 break;
1057         }
1058
1059         return ret;
1060 }
1061
1062 /*
1063   filter a candidate dn_list from an indexed search into a set of results
1064   extracting just the given attributes
1065 */
1066 static int ltdb_index_filter(struct ltdb_private *ltdb,
1067                              const struct dn_list *dn_list,
1068                              struct ltdb_context *ac,
1069                              uint32_t *match_count)
1070 {
1071         struct ldb_context *ldb;
1072         struct ldb_message *msg;
1073         struct ldb_message *filtered_msg;
1074         unsigned int i;
1075
1076         ldb = ldb_module_get_ctx(ac->module);
1077
1078         for (i = 0; i < dn_list->count; i++) {
1079                 struct ldb_dn *dn;
1080                 int ret;
1081                 bool matched;
1082
1083                 msg = ldb_msg_new(ac);
1084                 if (!msg) {
1085                         return LDB_ERR_OPERATIONS_ERROR;
1086                 }
1087
1088                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
1089                 if (dn == NULL) {
1090                         talloc_free(msg);
1091                         return LDB_ERR_OPERATIONS_ERROR;
1092                 }
1093
1094                 ret = ltdb_search_dn1(ac->module, dn, msg,
1095                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1096                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1097                 talloc_free(dn);
1098                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1099                         /* the record has disappeared? yes, this can happen */
1100                         talloc_free(msg);
1101                         continue;
1102                 }
1103
1104                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1105                         /* an internal error */
1106                         talloc_free(msg);
1107                         return LDB_ERR_OPERATIONS_ERROR;
1108                 }
1109
1110                 ret = ldb_match_msg_error(ldb, msg,
1111                                           ac->tree, ac->base, ac->scope, &matched);
1112                 if (ret != LDB_SUCCESS) {
1113                         talloc_free(msg);
1114                         return ret;
1115                 }
1116                 if (!matched) {
1117                         talloc_free(msg);
1118                         continue;
1119                 }
1120
1121                 /* filter the attributes that the user wants */
1122                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1123
1124                 talloc_free(msg);
1125
1126                 if (ret == -1) {
1127                         return LDB_ERR_OPERATIONS_ERROR;
1128                 }
1129
1130                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1131                 if (ret != LDB_SUCCESS) {
1132                         /* Regardless of success or failure, the msg
1133                          * is the callbacks responsiblity, and should
1134                          * not be talloc_free()'ed */
1135                         ac->request_terminated = true;
1136                         return ret;
1137                 }
1138
1139                 (*match_count)++;
1140         }
1141
1142         return LDB_SUCCESS;
1143 }
1144
1145 /*
1146   remove any duplicated entries in a indexed result
1147  */
1148 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
1149 {
1150         unsigned int i, new_count;
1151
1152         if (list->count < 2) {
1153                 return;
1154         }
1155
1156         TYPESAFE_QSORT(list->dn, list->count,
1157                        ldb_val_equal_exact_for_qsort);
1158
1159         new_count = 1;
1160         for (i=1; i<list->count; i++) {
1161                 if (ldb_val_equal_exact(&list->dn[i],
1162                                         &list->dn[new_count-1]) == 0) {
1163                         if (new_count != i) {
1164                                 list->dn[new_count] = list->dn[i];
1165                         }
1166                         new_count++;
1167                 }
1168         }
1169
1170         list->count = new_count;
1171 }
1172
1173 /*
1174   search the database with a LDAP-like expression using indexes
1175   returns -1 if an indexed search is not possible, in which
1176   case the caller should call ltdb_search_full()
1177 */
1178 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1179 {
1180         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1181         struct dn_list *dn_list;
1182         int ret;
1183
1184         /* see if indexing is enabled */
1185         if (!ltdb->cache->attribute_indexes &&
1186             !ltdb->cache->one_level_indexes &&
1187             ac->scope != LDB_SCOPE_BASE) {
1188                 /* fallback to a full search */
1189                 return LDB_ERR_OPERATIONS_ERROR;
1190         }
1191
1192         dn_list = talloc_zero(ac, struct dn_list);
1193         if (dn_list == NULL) {
1194                 return ldb_module_oom(ac->module);
1195         }
1196
1197         switch (ac->scope) {
1198         case LDB_SCOPE_BASE:
1199                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1200                 if (dn_list->dn == NULL) {
1201                         talloc_free(dn_list);
1202                         return ldb_module_oom(ac->module);
1203                 }
1204                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1205                 if (dn_list->dn[0].data == NULL) {
1206                         talloc_free(dn_list);
1207                         return ldb_module_oom(ac->module);
1208                 }
1209                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1210                 dn_list->count = 1;
1211                 break;
1212
1213         case LDB_SCOPE_ONELEVEL:
1214                 if (!ltdb->cache->one_level_indexes) {
1215                         talloc_free(dn_list);
1216                         return LDB_ERR_OPERATIONS_ERROR;
1217                 }
1218                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list);
1219                 if (ret != LDB_SUCCESS) {
1220                         talloc_free(dn_list);
1221                         return ret;
1222                 }
1223                 break;
1224
1225         case LDB_SCOPE_SUBTREE:
1226         case LDB_SCOPE_DEFAULT:
1227                 if (!ltdb->cache->attribute_indexes) {
1228                         talloc_free(dn_list);
1229                         return LDB_ERR_OPERATIONS_ERROR;
1230                 }
1231                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1232                 if (ret != LDB_SUCCESS) {
1233                         talloc_free(dn_list);
1234                         return ret;
1235                 }
1236                 ltdb_dn_list_remove_duplicates(dn_list);
1237                 break;
1238         }
1239
1240         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count);
1241         talloc_free(dn_list);
1242         return ret;
1243 }
1244
1245 /**
1246  * @brief Add a DN in the index list of a given attribute name/value pair
1247  *
1248  * This function will add the DN in the index list for the index for
1249  * the given attribute name and value.
1250  *
1251  * @param[in]  module       A ldb_module structure
1252  *
1253  * @param[in]  dn           The string representation of the DN as it
1254  *                          will be stored in the index entry
1255  *
1256  * @param[in]  el           A ldb_message_element array, one of the entry
1257  *                          referred by the v_idx is the attribute name and
1258  *                          value pair which will be used to construct the
1259  *                          index name
1260  *
1261  * @param[in]  v_idx        The index of element in the el array to use
1262  *
1263  * @return                  An ldb error code
1264  */
1265 static int ltdb_index_add1(struct ldb_module *module,
1266                            struct ltdb_private *ltdb,
1267                            const struct ldb_message *msg,
1268                            struct ldb_message_element *el, int v_idx)
1269 {
1270         struct ldb_context *ldb;
1271         struct ldb_dn *dn_key;
1272         int ret;
1273         const struct ldb_schema_attribute *a;
1274         struct dn_list *list;
1275         unsigned alloc_len;
1276
1277         ldb = ldb_module_get_ctx(module);
1278
1279         list = talloc_zero(module, struct dn_list);
1280         if (list == NULL) {
1281                 return LDB_ERR_OPERATIONS_ERROR;
1282         }
1283
1284         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1285         if (!dn_key) {
1286                 talloc_free(list);
1287                 return LDB_ERR_OPERATIONS_ERROR;
1288         }
1289         talloc_steal(list, dn_key);
1290
1291         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
1292         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1293                 talloc_free(list);
1294                 return ret;
1295         }
1296
1297         if (list->count > 0 &&
1298             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1299                 /*
1300                  * We do not want to print info about a possibly
1301                  * confidential DN that the conflict was with in the
1302                  * user-visible error string
1303                  */
1304                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1305                           __location__ ": unique index violation on %s in %s, "
1306                           "conficts with %*.*s in %s",
1307                           el->name, ldb_dn_get_linearized(msg->dn),
1308                           (int)list->dn[0].length,
1309                           (int)list->dn[0].length,
1310                           list->dn[0].data,
1311                           ldb_dn_get_linearized(dn_key));
1312                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1313                                        el->name,
1314                                        ldb_dn_get_linearized(msg->dn));
1315                 talloc_free(list);
1316                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1317         }
1318
1319         /* overallocate the list a bit, to reduce the number of
1320          * realloc trigered copies */
1321         alloc_len = ((list->count+1)+7) & ~7;
1322         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1323         if (list->dn == NULL) {
1324                 talloc_free(list);
1325                 return LDB_ERR_OPERATIONS_ERROR;
1326         }
1327
1328         if (ltdb->cache->GUID_index_attribute == NULL) {
1329                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
1330                 list->dn[list->count].data
1331                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
1332                 if (list->dn[list->count].data == NULL) {
1333                         talloc_free(list);
1334                         return LDB_ERR_OPERATIONS_ERROR;
1335                 }
1336                 list->dn[list->count].length = strlen(dn_str);
1337         } else {
1338                 const struct ldb_val *key_val;
1339                 key_val = ldb_msg_find_ldb_val(msg,
1340                                                ltdb->cache->GUID_index_attribute);
1341                 if (key_val == NULL) {
1342                         talloc_free(list);
1343                         return ldb_module_operr(module);
1344                 }
1345
1346                 if (key_val->length != LTDB_GUID_SIZE) {
1347                         talloc_free(list);
1348                         return ldb_module_operr(module);
1349                 }
1350                 list->dn[list->count] = ldb_val_dup(list->dn, key_val);
1351                 if (list->dn[list->count].data == NULL) {
1352                         talloc_free(list);
1353                         return ldb_module_operr(module);
1354                 }
1355         }
1356         list->count++;
1357
1358         ret = ltdb_dn_list_store(module, dn_key, list);
1359
1360         talloc_free(list);
1361
1362         return ret;
1363 }
1364
1365 /*
1366   add index entries for one elements in a message
1367  */
1368 static int ltdb_index_add_el(struct ldb_module *module,
1369                              struct ltdb_private *ltdb,
1370                              const struct ldb_message *msg,
1371                              struct ldb_message_element *el)
1372 {
1373         unsigned int i;
1374         for (i = 0; i < el->num_values; i++) {
1375                 int ret = ltdb_index_add1(module, ltdb,
1376                                           msg, el, i);
1377                 if (ret != LDB_SUCCESS) {
1378                         return ret;
1379                 }
1380         }
1381
1382         return LDB_SUCCESS;
1383 }
1384
1385 /*
1386   add index entries for all elements in a message
1387  */
1388 static int ltdb_index_add_all(struct ldb_module *module,
1389                               struct ltdb_private *ltdb,
1390                               const struct ldb_message *msg)
1391 {
1392         struct ldb_message_element *elements = msg->elements;
1393         unsigned int i;
1394         const char *dn_str;
1395         int ret;
1396
1397         if (ldb_dn_is_special(msg->dn)) {
1398                 return LDB_SUCCESS;
1399         }
1400
1401         dn_str = ldb_dn_get_linearized(msg->dn);
1402         if (dn_str == NULL) {
1403                 return LDB_ERR_OPERATIONS_ERROR;
1404         }
1405
1406         ret = ltdb_write_index_dn_guid(module, msg, 1);
1407         if (ret != LDB_SUCCESS) {
1408                 return ret;
1409         }
1410
1411         if (!ltdb->cache->attribute_indexes) {
1412                 /* no indexed fields */
1413                 return LDB_SUCCESS;
1414         }
1415
1416         for (i = 0; i < msg->num_elements; i++) {
1417                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1418                         continue;
1419                 }
1420                 ret = ltdb_index_add_el(module, ltdb,
1421                                         msg, &elements[i]);
1422                 if (ret != LDB_SUCCESS) {
1423                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1424                         ldb_asprintf_errstring(ldb,
1425                                                __location__ ": Failed to re-index %s in %s - %s",
1426                                                elements[i].name, dn_str,
1427                                                ldb_errstring(ldb));
1428                         return ret;
1429                 }
1430         }
1431
1432         return LDB_SUCCESS;
1433 }
1434
1435
1436 /*
1437   insert a DN index for a message
1438 */
1439 static int ltdb_modify_index_dn(struct ldb_module *module,
1440                                 struct ltdb_private *ltdb,
1441                                 const struct ldb_message *msg,
1442                                 struct ldb_dn *dn,
1443                                 const char *index, int add)
1444 {
1445         struct ldb_message_element el;
1446         struct ldb_val val;
1447         int ret;
1448
1449         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1450         if (val.data == NULL) {
1451                 return LDB_ERR_OPERATIONS_ERROR;
1452         }
1453
1454         val.length = strlen((char *)val.data);
1455         el.name = index;
1456         el.values = &val;
1457         el.num_values = 1;
1458
1459         if (add) {
1460                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
1461         } else { /* delete */
1462                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
1463         }
1464
1465         if (ret != LDB_SUCCESS) {
1466                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1467                 const char *dn_str = ldb_dn_get_linearized(dn);
1468                 ldb_asprintf_errstring(ldb,
1469                                        __location__
1470                                        ": Failed to modify %s "
1471                                        "against %s in %s - %s",
1472                                        index,
1473                                        ltdb->cache->GUID_index_attribute,
1474                                        dn_str, ldb_errstring(ldb));
1475                 return ret;
1476         }
1477         return ret;
1478 }
1479
1480 /*
1481   insert a one level index for a message
1482 */
1483 static int ltdb_index_onelevel(struct ldb_module *module,
1484                                const struct ldb_message *msg, int add)
1485 {
1486         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1487                                                     struct ltdb_private);
1488         struct ldb_dn *pdn;
1489         int ret;
1490
1491         /* We index for ONE Level only if requested */
1492         if (!ltdb->cache->one_level_indexes) {
1493                 return LDB_SUCCESS;
1494         }
1495
1496         pdn = ldb_dn_get_parent(module, msg->dn);
1497         if (pdn == NULL) {
1498                 return LDB_ERR_OPERATIONS_ERROR;
1499         }
1500         ret = ltdb_modify_index_dn(module, ltdb,
1501                                    msg, pdn, LTDB_IDXONE, add);
1502
1503         talloc_free(pdn);
1504
1505         return ret;
1506 }
1507
1508 /*
1509   insert a one level index for a message
1510 */
1511 static int ltdb_write_index_dn_guid(struct ldb_module *module,
1512                                     const struct ldb_message *msg,
1513                                     int add)
1514 {
1515         int ret;
1516         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1517                                                     struct ltdb_private);
1518
1519         /* We index for DN only if using a GUID index */
1520         if (ltdb->cache->GUID_index_attribute == NULL) {
1521                 return LDB_SUCCESS;
1522         }
1523
1524         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
1525                                    LTDB_IDXDN, add);
1526
1527         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
1528                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1529                                        "Entry %s already exists",
1530                                        ldb_dn_get_linearized(msg->dn));
1531                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1532         }
1533         return ret;
1534 }
1535
1536 /*
1537   add the index entries for a new element in a record
1538   The caller guarantees that these element values are not yet indexed
1539 */
1540 int ltdb_index_add_element(struct ldb_module *module,
1541                            struct ltdb_private *ltdb,
1542                            const struct ldb_message *msg,
1543                            struct ldb_message_element *el)
1544 {
1545         if (ldb_dn_is_special(msg->dn)) {
1546                 return LDB_SUCCESS;
1547         }
1548         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1549                 return LDB_SUCCESS;
1550         }
1551         return ltdb_index_add_el(module, ltdb, msg, el);
1552 }
1553
1554 /*
1555   add the index entries for a new record
1556 */
1557 int ltdb_index_add_new(struct ldb_module *module,
1558                        struct ltdb_private *ltdb,
1559                        const struct ldb_message *msg)
1560 {
1561         int ret;
1562
1563         if (ldb_dn_is_special(msg->dn)) {
1564                 return LDB_SUCCESS;
1565         }
1566
1567         ret = ltdb_index_add_all(module, ltdb, msg);
1568         if (ret != LDB_SUCCESS) {
1569                 return ret;
1570         }
1571
1572         return ltdb_index_onelevel(module, msg, 1);
1573 }
1574
1575
1576 /*
1577   delete an index entry for one message element
1578 */
1579 int ltdb_index_del_value(struct ldb_module *module,
1580                          struct ltdb_private *ltdb,
1581                          const struct ldb_message *msg,
1582                          struct ldb_message_element *el, unsigned int v_idx)
1583 {
1584         struct ldb_context *ldb;
1585         struct ldb_dn *dn_key;
1586         const char *dn_str;
1587         int ret, i;
1588         unsigned int j;
1589         struct dn_list *list;
1590         struct ldb_dn *dn = msg->dn;
1591
1592         ldb = ldb_module_get_ctx(module);
1593
1594         dn_str = ldb_dn_get_linearized(dn);
1595         if (dn_str == NULL) {
1596                 return LDB_ERR_OPERATIONS_ERROR;
1597         }
1598
1599         if (dn_str[0] == '@') {
1600                 return LDB_SUCCESS;
1601         }
1602
1603         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1604         if (!dn_key) {
1605                 return LDB_ERR_OPERATIONS_ERROR;
1606         }
1607
1608         list = talloc_zero(dn_key, struct dn_list);
1609         if (list == NULL) {
1610                 talloc_free(dn_key);
1611                 return LDB_ERR_OPERATIONS_ERROR;
1612         }
1613
1614         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
1615         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1616                 /* it wasn't indexed. Did we have an earlier error? If we did then
1617                    its gone now */
1618                 talloc_free(dn_key);
1619                 return LDB_SUCCESS;
1620         }
1621
1622         if (ret != LDB_SUCCESS) {
1623                 talloc_free(dn_key);
1624                 return ret;
1625         }
1626
1627         i = ltdb_dn_list_find_msg(ltdb, list, msg);
1628         if (i == -1) {
1629                 /* nothing to delete */
1630                 talloc_free(dn_key);
1631                 return LDB_SUCCESS;
1632         }
1633
1634         j = (unsigned int) i;
1635         if (j != list->count - 1) {
1636                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1637         }
1638         list->count--;
1639         if (list->count == 0) {
1640                 talloc_free(list->dn);
1641                 list->dn = NULL;
1642         } else {
1643                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1644         }
1645
1646         ret = ltdb_dn_list_store(module, dn_key, list);
1647
1648         talloc_free(dn_key);
1649
1650         return ret;
1651 }
1652
1653 /*
1654   delete the index entries for a element
1655   return -1 on failure
1656 */
1657 int ltdb_index_del_element(struct ldb_module *module,
1658                            struct ltdb_private *ltdb,
1659                            const struct ldb_message *msg,
1660                            struct ldb_message_element *el)
1661 {
1662         const char *dn_str;
1663         int ret;
1664         unsigned int i;
1665
1666         if (!ltdb->cache->attribute_indexes) {
1667                 /* no indexed fields */
1668                 return LDB_SUCCESS;
1669         }
1670
1671         dn_str = ldb_dn_get_linearized(msg->dn);
1672         if (dn_str == NULL) {
1673                 return LDB_ERR_OPERATIONS_ERROR;
1674         }
1675
1676         if (dn_str[0] == '@') {
1677                 return LDB_SUCCESS;
1678         }
1679
1680         if (!ltdb_is_indexed(module, ltdb, el->name)) {
1681                 return LDB_SUCCESS;
1682         }
1683         for (i = 0; i < el->num_values; i++) {
1684                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
1685                 if (ret != LDB_SUCCESS) {
1686                         return ret;
1687                 }
1688         }
1689
1690         return LDB_SUCCESS;
1691 }
1692
1693 /*
1694   delete the index entries for a record
1695   return -1 on failure
1696 */
1697 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1698 {
1699         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1700         int ret;
1701         unsigned int i;
1702
1703         if (ldb_dn_is_special(msg->dn)) {
1704                 return LDB_SUCCESS;
1705         }
1706
1707         ret = ltdb_index_onelevel(module, msg, 0);
1708         if (ret != LDB_SUCCESS) {
1709                 return ret;
1710         }
1711
1712         ret = ltdb_write_index_dn_guid(module, msg, 0);
1713         if (ret != LDB_SUCCESS) {
1714                 return ret;
1715         }
1716
1717         if (!ltdb->cache->attribute_indexes) {
1718                 /* no indexed fields */
1719                 return LDB_SUCCESS;
1720         }
1721
1722         for (i = 0; i < msg->num_elements; i++) {
1723                 ret = ltdb_index_del_element(module, ltdb,
1724                                              msg, &msg->elements[i]);
1725                 if (ret != LDB_SUCCESS) {
1726                         return ret;
1727                 }
1728         }
1729
1730         return LDB_SUCCESS;
1731 }
1732
1733
1734 /*
1735   traversal function that deletes all @INDEX records
1736 */
1737 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1738 {
1739         struct ldb_module *module = state;
1740         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1741         const char *dnstr = "DN=" LTDB_INDEX ":";
1742         struct dn_list list;
1743         struct ldb_dn *dn;
1744         struct ldb_val v;
1745         int ret;
1746
1747         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1748                 return 0;
1749         }
1750         /* we need to put a empty list in the internal tdb for this
1751          * index entry */
1752         list.dn = NULL;
1753         list.count = 0;
1754
1755         /* the offset of 3 is to remove the DN= prefix. */
1756         v.data = key.dptr + 3;
1757         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1758
1759         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1760         ret = ltdb_dn_list_store(module, dn, &list);
1761         if (ret != LDB_SUCCESS) {
1762                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1763                                        "Unable to store null index for %s\n",
1764                                                 ldb_dn_get_linearized(dn));
1765                 talloc_free(dn);
1766                 return -1;
1767         }
1768         talloc_free(dn);
1769         return 0;
1770 }
1771
1772 struct ltdb_reindex_context {
1773         struct ldb_module *module;
1774         int error;
1775 };
1776
1777 /*
1778   traversal function that adds @INDEX records during a re index
1779 */
1780 static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1781 {
1782         struct ldb_context *ldb;
1783         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1784         struct ldb_module *module = ctx->module;
1785         struct ldb_message *msg;
1786         unsigned int nb_elements_in_db;
1787         const struct ldb_val val = {
1788                 .data = data.dptr,
1789                 .length = data.dsize,
1790         };
1791         int ret;
1792         TDB_DATA key2;
1793         bool is_record;
1794         
1795         ldb = ldb_module_get_ctx(module);
1796
1797         if (key.dsize > 4 &&
1798             memcmp(key.dptr, "DN=@", 4) == 0) {
1799                 return 0;
1800         }
1801
1802         is_record = ltdb_key_is_record(key);
1803         if (is_record == false) {
1804                 return 0;
1805         }
1806         
1807         msg = ldb_msg_new(module);
1808         if (msg == NULL) {
1809                 return -1;
1810         }
1811
1812         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1813                                                    msg,
1814                                                    NULL, 0,
1815                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1816                                                    &nb_elements_in_db);
1817         if (ret != 0) {
1818                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1819                                                 ldb_dn_get_linearized(msg->dn));
1820                 ctx->error = ret;
1821                 talloc_free(msg);
1822                 return -1;
1823         }
1824
1825         if (msg->dn == NULL) {
1826                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1827                           "Refusing to re-index as GUID "
1828                           "key %*.*s with no DN\n",
1829                           (int)key.dsize, (int)key.dsize,
1830                           (char *)key.dptr);
1831                 talloc_free(msg);
1832                 return -1;
1833         }
1834         
1835         /* check if the DN key has changed, perhaps due to the case
1836            insensitivity of an element changing, or a change from DN
1837            to GUID keys */
1838         key2 = ltdb_key_msg(module, msg);
1839         if (key2.dptr == NULL) {
1840                 /* probably a corrupt record ... darn */
1841                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1842                                                 ldb_dn_get_linearized(msg->dn));
1843                 talloc_free(msg);
1844                 return 0;
1845         }
1846         if (key.dsize != key2.dsize ||
1847             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
1848                 int tdb_ret;
1849                 tdb_ret = tdb_delete(tdb, key);
1850                 if (tdb_ret != 0) {
1851                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1852                                   "Failed to delete %*.*s "
1853                                   "for rekey as %*.*s: %s",
1854                                   (int)key.dsize, (int)key.dsize,
1855                                   (const char *)key.dptr,
1856                                   (int)key2.dsize, (int)key2.dsize,
1857                                   (const char *)key.dptr,
1858                                   tdb_errorstr(tdb));
1859                         ctx->error = ltdb_err_map(tdb_error(tdb));
1860                         return -1;
1861                 }
1862                 tdb_ret = tdb_store(tdb, key2, data, 0);
1863                 if (tdb_ret != 0) {
1864                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1865                                   "Failed to rekey %*.*s as %*.*s: %s",
1866                                   (int)key.dsize, (int)key.dsize,
1867                                   (const char *)key.dptr,
1868                                   (int)key2.dsize, (int)key2.dsize,
1869                                   (const char *)key.dptr,
1870                                   tdb_errorstr(tdb));
1871                         ctx->error = ltdb_err_map(tdb_error(tdb));
1872                         return -1;
1873                 }
1874         }
1875         talloc_free(key2.dptr);
1876
1877         talloc_free(msg);
1878
1879         return 0;
1880 }
1881
1882 /*
1883   traversal function that adds @INDEX records during a re index
1884 */
1885 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1886 {
1887         struct ldb_context *ldb;
1888         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1889         struct ldb_module *module = ctx->module;
1890         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
1891                                                     struct ltdb_private);
1892         struct ldb_message *msg;
1893         unsigned int nb_elements_in_db;
1894         const struct ldb_val val = {
1895                 .data = data.dptr,
1896                 .length = data.dsize,
1897         };
1898         int ret;
1899         bool is_record;
1900         
1901         ldb = ldb_module_get_ctx(module);
1902
1903         if (key.dsize > 4 &&
1904             memcmp(key.dptr, "DN=@", 4) == 0) {
1905                 return 0;
1906         }
1907
1908         is_record = ltdb_key_is_record(key);
1909         if (is_record == false) {
1910                 return 0;
1911         }
1912         
1913         msg = ldb_msg_new(module);
1914         if (msg == NULL) {
1915                 return -1;
1916         }
1917
1918         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
1919                                                    msg,
1920                                                    NULL, 0,
1921                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
1922                                                    &nb_elements_in_db);
1923         if (ret != 0) {
1924                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1925                                                 ldb_dn_get_linearized(msg->dn));
1926                 ctx->error = ret;
1927                 talloc_free(msg);
1928                 return -1;
1929         }
1930
1931         if (msg->dn == NULL) {
1932                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1933                           "Refusing to re-index as GUID "
1934                           "key %*.*s with no DN\n",
1935                           (int)key.dsize, (int)key.dsize,
1936                           (char *)key.dptr);
1937                 talloc_free(msg);
1938                 return -1;
1939         }
1940
1941         ret = ltdb_index_onelevel(module, msg, 1);
1942         if (ret != LDB_SUCCESS) {
1943                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1944                           "Adding special ONE LEVEL index failed (%s)!",
1945                                                 ldb_dn_get_linearized(msg->dn));
1946                 talloc_free(msg);
1947                 return -1;
1948         }
1949
1950         ret = ltdb_index_add_all(module, ltdb, msg);
1951
1952         if (ret != LDB_SUCCESS) {
1953                 ctx->error = ret;
1954                 talloc_free(msg);
1955                 return -1;
1956         }
1957
1958         talloc_free(msg);
1959
1960         return 0;
1961 }
1962
1963 /*
1964   force a complete reindex of the database
1965 */
1966 int ltdb_reindex(struct ldb_module *module)
1967 {
1968         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1969         int ret;
1970         struct ltdb_reindex_context ctx;
1971
1972         /*
1973          * Only triggered after a modification, but make clear we do
1974          * not re-index a read-only DB
1975          */
1976         if (ltdb->read_only) {
1977                 return LDB_ERR_UNWILLING_TO_PERFORM;
1978         }
1979
1980         if (ltdb_cache_reload(module) != 0) {
1981                 return LDB_ERR_OPERATIONS_ERROR;
1982         }
1983
1984         /*
1985          * Ensure we read (and so remove) the entries from the real
1986          * DB, no values stored so far are any use as we want to do a
1987          * re-index
1988          */
1989         ltdb_index_transaction_cancel(module);
1990
1991         ret = ltdb_index_transaction_start(module);
1992         if (ret != LDB_SUCCESS) {
1993                 return ret;
1994         }
1995
1996         /* first traverse the database deleting any @INDEX records by
1997          * putting NULL entries in the in-memory tdb
1998          */
1999         ret = tdb_traverse(ltdb->tdb, delete_index, module);
2000         if (ret < 0) {
2001                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2002                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
2003                                        ldb_errstring(ldb));
2004                 return LDB_ERR_OPERATIONS_ERROR;
2005         }
2006
2007         /* if we don't have indexes we have nothing todo */
2008         if (!ltdb->cache->attribute_indexes) {
2009                 return LDB_SUCCESS;
2010         }
2011
2012         ctx.module = module;
2013         ctx.error = 0;
2014
2015         /* now traverse adding any indexes for normal LDB records */
2016         ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
2017         if (ret < 0) {
2018                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2019                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
2020                                        ldb_errstring(ldb));
2021                 return LDB_ERR_OPERATIONS_ERROR;
2022         }
2023
2024         if (ctx.error != LDB_SUCCESS) {
2025                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2026                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2027                 return ctx.error;
2028         }
2029
2030         ctx.error = 0;
2031
2032         /* now traverse adding any indexes for normal LDB records */
2033         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
2034         if (ret < 0) {
2035                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2036                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
2037                                        ldb_errstring(ldb));
2038                 return LDB_ERR_OPERATIONS_ERROR;
2039         }
2040
2041         if (ctx.error != LDB_SUCCESS) {
2042                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2043                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2044                 return ctx.error;
2045         }
2046
2047         return LDB_SUCCESS;
2048 }