ldb: One-level search was incorrectly falling back to full DB scan
[kai/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ltdb_write_index_dn_guid(struct ldb_module *module,
174                                     const struct ldb_message *msg,
175                                     int add);
176 static int ltdb_index_dn_base_dn(struct ldb_module *module,
177                                  struct ltdb_private *ltdb,
178                                  struct ldb_dn *base_dn,
179                                  struct dn_list *dn_list,
180                                  enum key_truncation *truncation);
181
182 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
183                               struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
193         if (ltdb->max_key_length == 0){
194                 return UINT_MAX;
195         }
196         return ltdb->max_key_length;
197 }
198
199 /* enable the idxptr mode when transactions start */
200 int ltdb_index_transaction_start(struct ldb_module *module)
201 {
202         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
203         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
204         if (ltdb->idxptr == NULL) {
205                 return ldb_oom(ldb_module_get_ctx(module));
206         }
207
208         return LDB_SUCCESS;
209 }
210
211 /*
212   see if two ldb_val structures contain exactly the same data
213   return -1 or 1 for a mismatch, 0 for match
214 */
215 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
216                                          const struct ldb_val *v2)
217 {
218         if (v1->length > v2->length) {
219                 return -1;
220         }
221         if (v1->length < v2->length) {
222                 return 1;
223         }
224         return memcmp(v1->data, v2->data, v1->length);
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
232                                        const struct ldb_val *v2)
233 {
234         if (v1.length > v2->length) {
235                 return -1;
236         }
237         if (v1.length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1.data, v2->data, v1.length);
241 }
242
243
244 /*
245   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
246   binary-safe comparison for the 'dn' returns -1 if not found
247
248   This is therefore safe when the value is a GUID in the future
249  */
250 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
251                                  const struct dn_list *list,
252                                  const struct ldb_val *v)
253 {
254         unsigned int i;
255         struct ldb_val *exact = NULL, *next = NULL;
256
257         if (ltdb->cache->GUID_index_attribute == NULL) {
258                 for (i=0; i<list->count; i++) {
259                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
260                                 return i;
261                         }
262                 }
263                 return -1;
264         }
265
266         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
267                                 *v, ldb_val_equal_exact_ordered,
268                                 exact, next);
269         if (exact == NULL) {
270                 return -1;
271         }
272         /* Not required, but keeps the compiler quiet */
273         if (next != NULL) {
274                 return -1;
275         }
276
277         i = exact - list->dn;
278         return i;
279 }
280
281 /*
282   find a entry in a dn_list. Uses a case sensitive comparison with the dn
283   returns -1 if not found
284  */
285 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
286                                  struct dn_list *list,
287                                  const struct ldb_message *msg)
288 {
289         struct ldb_val v;
290         const struct ldb_val *key_val;
291         if (ltdb->cache->GUID_index_attribute == NULL) {
292                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
293                 v.data = discard_const_p(unsigned char, dn_str);
294                 v.length = strlen(dn_str);
295         } else {
296                 key_val = ldb_msg_find_ldb_val(msg,
297                                                ltdb->cache->GUID_index_attribute);
298                 if (key_val == NULL) {
299                         return -1;
300                 }
301                 v = *key_val;
302         }
303         return ltdb_dn_list_find_val(ltdb, list, &v);
304 }
305
306 /*
307   this is effectively a cast function, but with lots of paranoia
308   checks and also copes with CPUs that are fussy about pointer
309   alignment
310  */
311 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
312 {
313         struct dn_list *list;
314         if (rec.dsize != sizeof(void *)) {
315                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
316                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
317                 return NULL;
318         }
319         /* note that we can't just use a cast here, as rec.dptr may
320            not be aligned sufficiently for a pointer. A cast would cause
321            platforms like some ARM CPUs to crash */
322         memcpy(&list, rec.dptr, sizeof(void *));
323         list = talloc_get_type(list, struct dn_list);
324         if (list == NULL) {
325                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
326                                        "Bad type '%s' for idxptr",
327                                        talloc_get_name(list));
328                 return NULL;
329         }
330         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
331                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
332                                        "Bad parent '%s' for idxptr",
333                                        talloc_get_name(talloc_parent(list->dn)));
334                 return NULL;
335         }
336         return list;
337 }
338
339 /*
340   return the @IDX list in an index entry for a dn as a
341   struct dn_list
342  */
343 static int ltdb_dn_list_load(struct ldb_module *module,
344                              struct ltdb_private *ltdb,
345                              struct ldb_dn *dn, struct dn_list *list)
346 {
347         struct ldb_message *msg;
348         int ret, version;
349         struct ldb_message_element *el;
350         TDB_DATA rec;
351         struct dn_list *list2;
352         TDB_DATA key;
353
354         list->dn = NULL;
355         list->count = 0;
356
357         /* see if we have any in-memory index entries */
358         if (ltdb->idxptr == NULL ||
359             ltdb->idxptr->itdb == NULL) {
360                 goto normal_index;
361         }
362
363         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
364         key.dsize = strlen((char *)key.dptr);
365
366         rec = tdb_fetch(ltdb->idxptr->itdb, key);
367         if (rec.dptr == NULL) {
368                 goto normal_index;
369         }
370
371         /* we've found an in-memory index entry */
372         list2 = ltdb_index_idxptr(module, rec, true);
373         if (list2 == NULL) {
374                 free(rec.dptr);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         free(rec.dptr);
378
379         *list = *list2;
380         return LDB_SUCCESS;
381
382 normal_index:
383         msg = ldb_msg_new(list);
384         if (msg == NULL) {
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_search_dn1(module, dn, msg,
389                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
390                               |LDB_UNPACK_DATA_FLAG_NO_DN);
391         if (ret != LDB_SUCCESS) {
392                 talloc_free(msg);
393                 return ret;
394         }
395
396         el = ldb_msg_find_element(msg, LTDB_IDX);
397         if (!el) {
398                 talloc_free(msg);
399                 return LDB_SUCCESS;
400         }
401
402         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
403
404         /*
405          * we avoid copying the strings by stealing the list.  We have
406          * to steal msg onto el->values (which looks odd) because we
407          * asked for the memory to be allocated on msg, not on each
408          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
409          */
410         if (ltdb->cache->GUID_index_attribute == NULL) {
411                 /* check indexing version number */
412                 if (version != LTDB_INDEXING_VERSION) {
413                         ldb_debug_set(ldb_module_get_ctx(module),
414                                       LDB_DEBUG_ERROR,
415                                       "Wrong DN index version %d "
416                                       "expected %d for %s",
417                                       version, LTDB_INDEXING_VERSION,
418                                       ldb_dn_get_linearized(dn));
419                         return LDB_ERR_OPERATIONS_ERROR;
420                 }
421
422                 talloc_steal(el->values, msg);
423                 list->dn = talloc_steal(list, el->values);
424                 list->count = el->num_values;
425         } else {
426                 unsigned int i;
427                 if (version != LTDB_GUID_INDEXING_VERSION) {
428                         /* This is quite likely during the DB startup
429                            on first upgrade to using a GUID index */
430                         ldb_debug_set(ldb_module_get_ctx(module),
431                                       LDB_DEBUG_ERROR,
432                                       "Wrong GUID index version %d "
433                                       "expected %d for %s",
434                                       version, LTDB_GUID_INDEXING_VERSION,
435                                       ldb_dn_get_linearized(dn));
436                         return LDB_ERR_OPERATIONS_ERROR;
437                 }
438
439                 if (el->num_values == 0) {
440                         return LDB_ERR_OPERATIONS_ERROR;
441                 }
442
443                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 list->count = el->values[0].length / LTDB_GUID_SIZE;
448                 list->dn = talloc_array(list, struct ldb_val, list->count);
449
450                 /*
451                  * The actual data is on msg, due to
452                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
453                  */
454                 talloc_steal(list->dn, msg);
455                 for (i = 0; i < list->count; i++) {
456                         list->dn[i].data
457                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
458                         list->dn[i].length = LTDB_GUID_SIZE;
459                 }
460         }
461
462         /* We don't need msg->elements any more */
463         talloc_free(msg->elements);
464         return LDB_SUCCESS;
465 }
466
467 int ltdb_key_dn_from_idx(struct ldb_module *module,
468                          struct ltdb_private *ltdb,
469                          TALLOC_CTX *mem_ctx,
470                          struct ldb_dn *dn,
471                          TDB_DATA *tdb_key)
472 {
473         struct ldb_context *ldb = ldb_module_get_ctx(module);
474         int ret;
475         int index = 0;
476         enum key_truncation truncation = KEY_NOT_TRUNCATED;
477         struct dn_list *list = talloc(mem_ctx, struct dn_list);
478         if (list == NULL) {
479                 ldb_oom(ldb);
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483
484         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
485         if (ret != LDB_SUCCESS) {
486                 TALLOC_FREE(list);
487                 return ret;
488         }
489
490         if (list->count == 0) {
491                 TALLOC_FREE(list);
492                 return LDB_ERR_NO_SUCH_OBJECT;
493         }
494
495         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
496                 const char *dn_str = ldb_dn_get_linearized(dn);
497                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
498                                        __location__
499                                        ": Failed to read DN index "
500                                        "against %s for %s: too many "
501                                        "values (%u > 1)",
502                                        ltdb->cache->GUID_index_attribute,
503                                        dn_str, list->count);
504                 TALLOC_FREE(list);
505                 return LDB_ERR_CONSTRAINT_VIOLATION;
506         }
507
508         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
509                 /*
510                  * DN key has been truncated, need to inspect the actual
511                  * records to locate the actual DN
512                  */
513                 int i;
514                 index = -1;
515                 for (i=0; i < list->count; i++) {
516                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
517                         TDB_DATA key = {
518                                 .dptr = guid_key,
519                                 .dsize = sizeof(guid_key)
520                         };
521                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
522                         struct ldb_message *rec = ldb_msg_new(ldb);
523                         if (rec == NULL) {
524                                 return LDB_ERR_OPERATIONS_ERROR;
525                         }
526
527                         ret = ltdb_idx_to_key(module, ltdb,
528                                               ldb, &list->dn[i],
529                                               &key);
530                         if (ret != LDB_SUCCESS) {
531                                 TALLOC_FREE(list);
532                                 TALLOC_FREE(rec);
533                                 return ret;
534                         }
535
536                         ret = ltdb_search_key(module, ltdb, key,
537                                               rec, flags);
538                         if (key.dptr != guid_key) {
539                                 TALLOC_FREE(key.dptr);
540                         }
541                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
542                                 /*
543                                  * the record has disappeared?
544                                  * yes, this can happen
545                                  */
546                                 TALLOC_FREE(rec);
547                                 continue;
548                         }
549
550                         if (ret != LDB_SUCCESS) {
551                                 /* an internal error */
552                                 TALLOC_FREE(rec);
553                                 TALLOC_FREE(list);
554                                 return LDB_ERR_OPERATIONS_ERROR;
555                         }
556
557                         /*
558                          * We found the actual DN that we wanted from in the
559                          * multiple values that matched the index
560                          * (due to truncation), so return that.
561                          *
562                          */
563                         if (ldb_dn_compare(dn, rec->dn) == 0) {
564                                 index = i;
565                                 TALLOC_FREE(rec);
566                                 break;
567                         }
568                 }
569
570                 /*
571                  * We matched the index but the actual DN we wanted
572                  * was not here.
573                  */
574                 if (index == -1) {
575                         TALLOC_FREE(list);
576                         return LDB_ERR_NO_SUCH_OBJECT;
577                 }
578         }
579
580         /* The tdb_key memory is allocated by the caller */
581         ret = ltdb_guid_to_key(module, ltdb,
582                                &list->dn[index], tdb_key);
583         TALLOC_FREE(list);
584
585         if (ret != LDB_SUCCESS) {
586                 return LDB_ERR_OPERATIONS_ERROR;
587         }
588
589         return LDB_SUCCESS;
590 }
591
592
593
594 /*
595   save a dn_list into a full @IDX style record
596  */
597 static int ltdb_dn_list_store_full(struct ldb_module *module,
598                                    struct ltdb_private *ltdb,
599                                    struct ldb_dn *dn,
600                                    struct dn_list *list)
601 {
602         struct ldb_message *msg;
603         int ret;
604
605         msg = ldb_msg_new(module);
606         if (!msg) {
607                 return ldb_module_oom(module);
608         }
609
610         msg->dn = dn;
611
612         if (list->count == 0) {
613                 ret = ltdb_delete_noindex(module, msg);
614                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
615                         talloc_free(msg);
616                         return LDB_SUCCESS;
617                 }
618                 return ret;
619         }
620
621         if (ltdb->cache->GUID_index_attribute == NULL) {
622                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
623                                       LTDB_INDEXING_VERSION);
624                 if (ret != LDB_SUCCESS) {
625                         talloc_free(msg);
626                         return ldb_module_oom(module);
627                 }
628         } else {
629                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
630                                       LTDB_GUID_INDEXING_VERSION);
631                 if (ret != LDB_SUCCESS) {
632                         talloc_free(msg);
633                         return ldb_module_oom(module);
634                 }
635         }
636
637         if (list->count > 0) {
638                 struct ldb_message_element *el;
639
640                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
641                 if (ret != LDB_SUCCESS) {
642                         talloc_free(msg);
643                         return ldb_module_oom(module);
644                 }
645
646                 if (ltdb->cache->GUID_index_attribute == NULL) {
647                         el->values = list->dn;
648                         el->num_values = list->count;
649                 } else {
650                         struct ldb_val v;
651                         unsigned int i;
652                         el->values = talloc_array(msg,
653                                                   struct ldb_val, 1);
654                         if (el->values == NULL) {
655                                 talloc_free(msg);
656                                 return ldb_module_oom(module);
657                         }
658
659                         v.data = talloc_array_size(el->values,
660                                                    list->count,
661                                                    LTDB_GUID_SIZE);
662                         if (v.data == NULL) {
663                                 talloc_free(msg);
664                                 return ldb_module_oom(module);
665                         }
666
667                         v.length = talloc_get_size(v.data);
668
669                         for (i = 0; i < list->count; i++) {
670                                 if (list->dn[i].length !=
671                                     LTDB_GUID_SIZE) {
672                                         talloc_free(msg);
673                                         return ldb_module_operr(module);
674                                 }
675                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
676                                        list->dn[i].data,
677                                        LTDB_GUID_SIZE);
678                         }
679                         el->values[0] = v;
680                         el->num_values = 1;
681                 }
682         }
683
684         ret = ltdb_store(module, msg, TDB_REPLACE);
685         talloc_free(msg);
686         return ret;
687 }
688
689 /*
690   save a dn_list into the database, in either @IDX or internal format
691  */
692 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
693                               struct dn_list *list)
694 {
695         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
696         TDB_DATA rec, key;
697         int ret;
698         struct dn_list *list2;
699
700         if (ltdb->idxptr == NULL) {
701                 return ltdb_dn_list_store_full(module, ltdb,
702                                                dn, list);
703         }
704
705         if (ltdb->idxptr->itdb == NULL) {
706                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
707                 if (ltdb->idxptr->itdb == NULL) {
708                         return LDB_ERR_OPERATIONS_ERROR;
709                 }
710         }
711
712         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
713         key.dsize = strlen((char *)key.dptr);
714
715         rec = tdb_fetch(ltdb->idxptr->itdb, key);
716         if (rec.dptr != NULL) {
717                 list2 = ltdb_index_idxptr(module, rec, false);
718                 if (list2 == NULL) {
719                         free(rec.dptr);
720                         return LDB_ERR_OPERATIONS_ERROR;
721                 }
722                 free(rec.dptr);
723                 list2->dn = talloc_steal(list2, list->dn);
724                 list2->count = list->count;
725                 return LDB_SUCCESS;
726         }
727
728         list2 = talloc(ltdb->idxptr, struct dn_list);
729         if (list2 == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         list2->dn = talloc_steal(list2, list->dn);
733         list2->count = list->count;
734
735         rec.dptr = (uint8_t *)&list2;
736         rec.dsize = sizeof(void *);
737
738
739         /*
740          * This is not a store into the main DB, but into an in-memory
741          * TDB, so we don't need a guard on ltdb->read_only
742          */
743         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
744         if (ret != 0) {
745                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
746         }
747         return LDB_SUCCESS;
748 }
749
750 /*
751   traverse function for storing the in-memory index entries on disk
752  */
753 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
754 {
755         struct ldb_module *module = state;
756         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
757         struct ldb_dn *dn;
758         struct ldb_context *ldb = ldb_module_get_ctx(module);
759         struct ldb_val v;
760         struct dn_list *list;
761
762         list = ltdb_index_idxptr(module, data, true);
763         if (list == NULL) {
764                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
765                 return -1;
766         }
767
768         v.data = key.dptr;
769         v.length = strnlen((char *)key.dptr, key.dsize);
770
771         dn = ldb_dn_from_ldb_val(module, ldb, &v);
772         if (dn == NULL) {
773                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
774                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
775                 return -1;
776         }
777
778         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
779                                                       dn, list);
780         talloc_free(dn);
781         if (ltdb->idxptr->error != 0) {
782                 return -1;
783         }
784         return 0;
785 }
786
787 /* cleanup the idxptr mode when transaction commits */
788 int ltdb_index_transaction_commit(struct ldb_module *module)
789 {
790         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
791         int ret;
792
793         struct ldb_context *ldb = ldb_module_get_ctx(module);
794
795         ldb_reset_err_string(ldb);
796
797         if (ltdb->idxptr->itdb) {
798                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
799                 tdb_close(ltdb->idxptr->itdb);
800         }
801
802         ret = ltdb->idxptr->error;
803         if (ret != LDB_SUCCESS) {
804                 if (!ldb_errstring(ldb)) {
805                         ldb_set_errstring(ldb, ldb_strerror(ret));
806                 }
807                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
808         }
809
810         talloc_free(ltdb->idxptr);
811         ltdb->idxptr = NULL;
812         return ret;
813 }
814
815 /* cleanup the idxptr mode when transaction cancels */
816 int ltdb_index_transaction_cancel(struct ldb_module *module)
817 {
818         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
819         if (ltdb->idxptr && ltdb->idxptr->itdb) {
820                 tdb_close(ltdb->idxptr->itdb);
821         }
822         talloc_free(ltdb->idxptr);
823         ltdb->idxptr = NULL;
824         return LDB_SUCCESS;
825 }
826
827
828 /*
829   return the dn key to be used for an index
830   the caller is responsible for freeing
831 */
832 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
833                                      struct ltdb_private *ltdb,
834                                      const char *attr, const struct ldb_val *value,
835                                      const struct ldb_schema_attribute **ap,
836                                      enum key_truncation *truncation)
837 {
838         struct ldb_dn *ret;
839         struct ldb_val v;
840         const struct ldb_schema_attribute *a = NULL;
841         char *attr_folded = NULL;
842         const char *attr_for_dn = NULL;
843         int r;
844         bool should_b64_encode;
845
846         unsigned int max_key_length = ltdb_max_key_length(ltdb);
847         size_t key_len = 0;
848         size_t attr_len = 0;
849         const size_t indx_len = sizeof(LTDB_INDEX) - 1;
850         unsigned frmt_len = 0;
851         const size_t additional_key_length = 4;
852         unsigned int num_separators = 3; /* Estimate for overflow check */
853         const size_t min_data = 1;
854         const size_t min_key_length = additional_key_length
855                 + indx_len + num_separators + min_data;
856
857         if (attr[0] == '@') {
858                 attr_for_dn = attr;
859                 v = *value;
860                 if (ap != NULL) {
861                         *ap = NULL;
862                 }
863         } else {
864                 attr_folded = ldb_attr_casefold(ldb, attr);
865                 if (!attr_folded) {
866                         return NULL;
867                 }
868
869                 attr_for_dn = attr_folded;
870
871                 a = ldb_schema_attribute_by_name(ldb, attr);
872                 if (ap) {
873                         *ap = a;
874                 }
875                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
876                 if (r != LDB_SUCCESS) {
877                         const char *errstr = ldb_errstring(ldb);
878                         /* canonicalisation can be refused. For
879                            example, a attribute that takes wildcards
880                            will refuse to canonicalise if the value
881                            contains a wildcard */
882                         ldb_asprintf_errstring(ldb,
883                                                "Failed to create index "
884                                                "key for attribute '%s':%s%s%s",
885                                                attr, ldb_strerror(r),
886                                                (errstr?":":""),
887                                                (errstr?errstr:""));
888                         talloc_free(attr_folded);
889                         return NULL;
890                 }
891         }
892         attr_len = strlen(attr_for_dn);
893
894         /*
895          * Check if there is any hope this will fit into the DB.
896          * Overflow here is not actually critical the code below
897          * checks again to make the printf and the DB does another
898          * check for too long keys
899          */
900         if (max_key_length - attr_len < min_key_length) {
901                 ldb_asprintf_errstring(
902                         ldb,
903                         __location__ ": max_key_length "
904                         "is too small (%u) < (%u)",
905                         max_key_length,
906                         (unsigned)(min_key_length + attr_len));
907                 talloc_free(attr_folded);
908                 return NULL;
909         }
910
911         /*
912          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
913          * "DN=" and a trailing string terminator
914          */
915         max_key_length -= additional_key_length;
916
917         /*
918          * We do not base 64 encode a DN in a key, it has already been
919          * casefold and lineraized, that is good enough.  That already
920          * avoids embedded NUL etc.
921          */
922         if (ltdb->cache->GUID_index_attribute != NULL) {
923                 if (strcmp(attr, LTDB_IDXDN) == 0) {
924                         should_b64_encode = false;
925                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
926                         /*
927                          * We can only change the behaviour for IDXONE
928                          * when the GUID index is enabled
929                          */
930                         should_b64_encode = false;
931                 } else {
932                         should_b64_encode
933                                 = ldb_should_b64_encode(ldb, &v);
934                 }
935         } else {
936                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
937         }
938
939         if (should_b64_encode) {
940                 size_t vstr_len = 0;
941                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
942                 if (!vstr) {
943                         talloc_free(attr_folded);
944                         return NULL;
945                 }
946                 vstr_len = strlen(vstr);
947                 /* 
948                  * Overflow here is not critical as we only use this
949                  * to choose the printf truncation
950                  */
951                 key_len = num_separators + indx_len + attr_len + vstr_len;
952                 if (key_len > max_key_length) {
953                         size_t excess = key_len - max_key_length;
954                         frmt_len = vstr_len - excess;
955                         *truncation = KEY_TRUNCATED;
956                         /*
957                         * Truncated keys are placed in a separate key space
958                         * from the non truncated keys
959                         * Note: the double hash "##" is not a typo and
960                         * indicates that the following value is base64 encoded
961                         */
962                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
963                                              LTDB_INDEX, attr_for_dn,
964                                              frmt_len, vstr);
965                 } else {
966                         frmt_len = vstr_len;
967                         *truncation = KEY_NOT_TRUNCATED;
968                         /*
969                          * Note: the double colon "::" is not a typo and
970                          * indicates that the following value is base64 encoded
971                          */
972                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
973                                              LTDB_INDEX, attr_for_dn,
974                                              frmt_len, vstr);
975                 }
976                 talloc_free(vstr);
977         } else {
978                 /* Only need two seperators */
979                 num_separators = 2;
980
981                 /*
982                  * Overflow here is not critical as we only use this
983                  * to choose the printf truncation
984                  */
985                 key_len = num_separators + indx_len + attr_len + (int)v.length;
986                 if (key_len > max_key_length) {
987                         size_t excess = key_len - max_key_length;
988                         frmt_len = v.length - excess;
989                         *truncation = KEY_TRUNCATED;
990                         /*
991                          * Truncated keys are placed in a separate key space
992                          * from the non truncated keys
993                          */
994                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
995                                              LTDB_INDEX, attr_for_dn,
996                                              frmt_len, (char *)v.data);
997                 } else {
998                         frmt_len = v.length;
999                         *truncation = KEY_NOT_TRUNCATED;
1000                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1001                                              LTDB_INDEX, attr_for_dn,
1002                                              frmt_len, (char *)v.data);
1003                 }
1004         }
1005
1006         if (v.data != value->data) {
1007                 talloc_free(v.data);
1008         }
1009         talloc_free(attr_folded);
1010
1011         return ret;
1012 }
1013
1014 /*
1015   see if a attribute value is in the list of indexed attributes
1016 */
1017 static bool ltdb_is_indexed(struct ldb_module *module,
1018                             struct ltdb_private *ltdb,
1019                             const char *attr)
1020 {
1021         struct ldb_context *ldb = ldb_module_get_ctx(module);
1022         unsigned int i;
1023         struct ldb_message_element *el;
1024
1025         if ((ltdb->cache->GUID_index_attribute != NULL) &&
1026             (ldb_attr_cmp(attr,
1027                           ltdb->cache->GUID_index_attribute) == 0)) {
1028                 /* Implicity covered, this is the index key */
1029                 return false;
1030         }
1031         if (ldb->schema.index_handler_override) {
1032                 const struct ldb_schema_attribute *a
1033                         = ldb_schema_attribute_by_name(ldb, attr);
1034
1035                 if (a == NULL) {
1036                         return false;
1037                 }
1038
1039                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1040                         return true;
1041                 } else {
1042                         return false;
1043                 }
1044         }
1045
1046         if (!ltdb->cache->attribute_indexes) {
1047                 return false;
1048         }
1049
1050         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
1051         if (el == NULL) {
1052                 return false;
1053         }
1054
1055         /* TODO: this is too expensive! At least use a binary search */
1056         for (i=0; i<el->num_values; i++) {
1057                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1058                         return true;
1059                 }
1060         }
1061         return false;
1062 }
1063
1064 /*
1065   in the following logic functions, the return value is treated as
1066   follows:
1067
1068      LDB_SUCCESS: we found some matching index values
1069
1070      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1071
1072      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1073                                we'll need a full search
1074  */
1075
1076 /*
1077   return a list of dn's that might match a simple indexed search (an
1078   equality search only)
1079  */
1080 static int ltdb_index_dn_simple(struct ldb_module *module,
1081                                 struct ltdb_private *ltdb,
1082                                 const struct ldb_parse_tree *tree,
1083                                 struct dn_list *list)
1084 {
1085         struct ldb_context *ldb;
1086         struct ldb_dn *dn;
1087         int ret;
1088         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1089
1090         ldb = ldb_module_get_ctx(module);
1091
1092         list->count = 0;
1093         list->dn = NULL;
1094
1095         /* if the attribute isn't in the list of indexed attributes then
1096            this node needs a full search */
1097         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
1098                 return LDB_ERR_OPERATIONS_ERROR;
1099         }
1100
1101         /* the attribute is indexed. Pull the list of DNs that match the
1102            search criterion */
1103         dn = ltdb_index_key(ldb, ltdb,
1104                             tree->u.equality.attr,
1105                             &tree->u.equality.value, NULL, &truncation);
1106         /*
1107          * We ignore truncation here and allow multi-valued matches
1108          * as ltdb_search_indexed will filter out the wrong one in
1109          * ltdb_index_filter() which calls ldb_match_message().
1110          */
1111         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1112
1113         ret = ltdb_dn_list_load(module, ltdb, dn, list);
1114         talloc_free(dn);
1115         return ret;
1116 }
1117
1118
1119 static bool list_union(struct ldb_context *ldb,
1120                        struct ltdb_private *ltdb,
1121                        struct dn_list *list, struct dn_list *list2);
1122
1123 /*
1124   return a list of dn's that might match a leaf indexed search
1125  */
1126 static int ltdb_index_dn_leaf(struct ldb_module *module,
1127                               struct ltdb_private *ltdb,
1128                               const struct ldb_parse_tree *tree,
1129                               struct dn_list *list)
1130 {
1131         if (ltdb->disallow_dn_filter &&
1132             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1133                 /* in AD mode we do not support "(dn=...)" search filters */
1134                 list->dn = NULL;
1135                 list->count = 0;
1136                 return LDB_SUCCESS;
1137         }
1138         if (tree->u.equality.attr[0] == '@') {
1139                 /* Do not allow a indexed search against an @ */
1140                 list->dn = NULL;
1141                 list->count = 0;
1142                 return LDB_SUCCESS;
1143         }
1144         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1145                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1146                 struct ldb_dn *dn
1147                         = ldb_dn_from_ldb_val(list,
1148                                               ldb_module_get_ctx(module),
1149                                               &tree->u.equality.value);
1150                 if (dn == NULL) {
1151                         /* If we can't parse it, no match */
1152                         list->dn = NULL;
1153                         list->count = 0;
1154                         return LDB_SUCCESS;
1155                 }
1156
1157                 /*
1158                  * Re-use the same code we use for a SCOPE_BASE
1159                  * search
1160                  *
1161                  * We can't call TALLOC_FREE(dn) as this must belong
1162                  * to list for the memory to remain valid.
1163                  */
1164                 return ltdb_index_dn_base_dn(module, ltdb, dn, list,
1165                                              &truncation);
1166                 /*
1167                  * We ignore truncation here and allow multi-valued matches
1168                  * as ltdb_search_indexed will filter out the wrong one in
1169                  * ltdb_index_filter() which calls ldb_match_message().
1170                  */
1171
1172         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
1173                    (ldb_attr_cmp(tree->u.equality.attr,
1174                                  ltdb->cache->GUID_index_attribute) == 0)) {
1175                 int ret;
1176                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1177                 list->dn = talloc_array(list, struct ldb_val, 1);
1178                 if (list->dn == NULL) {
1179                         ldb_module_oom(module);
1180                         return LDB_ERR_OPERATIONS_ERROR;
1181                 }
1182                 /*
1183                  * We need to go via the canonicalise_fn() to
1184                  * ensure we get the index in binary, rather
1185                  * than a string
1186                  */
1187                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
1188                                                                list->dn,
1189                                                                &tree->u.equality.value,
1190                                                                &list->dn[0]);
1191                 if (ret != LDB_SUCCESS) {
1192                         return LDB_ERR_OPERATIONS_ERROR;
1193                 }
1194                 list->count = 1;
1195                 return LDB_SUCCESS;
1196         }
1197
1198         return ltdb_index_dn_simple(module, ltdb, tree, list);
1199 }
1200
1201
1202 /*
1203   list intersection
1204   list = list & list2
1205 */
1206 static bool list_intersect(struct ldb_context *ldb,
1207                            struct ltdb_private *ltdb,
1208                            struct dn_list *list, const struct dn_list *list2)
1209 {
1210         const struct dn_list *short_list, *long_list;
1211         struct dn_list *list3;
1212         unsigned int i;
1213
1214         if (list->count == 0) {
1215                 /* 0 & X == 0 */
1216                 return true;
1217         }
1218         if (list2->count == 0) {
1219                 /* X & 0 == 0 */
1220                 list->count = 0;
1221                 list->dn = NULL;
1222                 return true;
1223         }
1224
1225         /* the indexing code is allowed to return a longer list than
1226            what really matches, as all results are filtered by the
1227            full expression at the end - this shortcut avoids a lot of
1228            work in some cases */
1229         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1230                 return true;
1231         }
1232         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1233                 list->count = list2->count;
1234                 list->dn = list2->dn;
1235                 /* note that list2 may not be the parent of list2->dn,
1236                    as list2->dn may be owned by ltdb->idxptr. In that
1237                    case we expect this reparent call to fail, which is
1238                    OK */
1239                 talloc_reparent(list2, list, list2->dn);
1240                 return true;
1241         }
1242
1243         if (list->count > list2->count) {
1244                 short_list = list2;
1245                 long_list = list;
1246         } else {
1247                 short_list = list;
1248                 long_list = list2;
1249         }
1250
1251         list3 = talloc_zero(list, struct dn_list);
1252         if (list3 == NULL) {
1253                 return false;
1254         }
1255
1256         list3->dn = talloc_array(list3, struct ldb_val,
1257                                  MIN(list->count, list2->count));
1258         if (!list3->dn) {
1259                 talloc_free(list3);
1260                 return false;
1261         }
1262         list3->count = 0;
1263
1264         for (i=0;i<short_list->count;i++) {
1265                 /* For the GUID index case, this is a binary search */
1266                 if (ltdb_dn_list_find_val(ltdb, long_list,
1267                                           &short_list->dn[i]) != -1) {
1268                         list3->dn[list3->count] = short_list->dn[i];
1269                         list3->count++;
1270                 }
1271         }
1272
1273         list->strict |= list2->strict;
1274         list->dn = talloc_steal(list, list3->dn);
1275         list->count = list3->count;
1276         talloc_free(list3);
1277
1278         return true;
1279 }
1280
1281
1282 /*
1283   list union
1284   list = list | list2
1285 */
1286 static bool list_union(struct ldb_context *ldb,
1287                        struct ltdb_private *ltdb,
1288                        struct dn_list *list, struct dn_list *list2)
1289 {
1290         struct ldb_val *dn3;
1291         unsigned int i = 0, j = 0, k = 0;
1292
1293         if (list2->count == 0) {
1294                 /* X | 0 == X */
1295                 return true;
1296         }
1297
1298         if (list->count == 0) {
1299                 /* 0 | X == X */
1300                 list->count = list2->count;
1301                 list->dn = list2->dn;
1302                 /* note that list2 may not be the parent of list2->dn,
1303                    as list2->dn may be owned by ltdb->idxptr. In that
1304                    case we expect this reparent call to fail, which is
1305                    OK */
1306                 talloc_reparent(list2, list, list2->dn);
1307                 return true;
1308         }
1309
1310         /*
1311          * Sort the lists (if not in GUID DN mode) so we can do
1312          * the de-duplication during the merge
1313          *
1314          * NOTE: This can sort the in-memory index values, as list or
1315          * list2 might not be a copy!
1316          */
1317         ltdb_dn_list_sort(ltdb, list);
1318         ltdb_dn_list_sort(ltdb, list2);
1319
1320         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1321         if (!dn3) {
1322                 ldb_oom(ldb);
1323                 return false;
1324         }
1325
1326         while (i < list->count || j < list2->count) {
1327                 int cmp;
1328                 if (i >= list->count) {
1329                         cmp = 1;
1330                 } else if (j >= list2->count) {
1331                         cmp = -1;
1332                 } else {
1333                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1334                                                           &list2->dn[j]);
1335                 }
1336
1337                 if (cmp < 0) {
1338                         /* Take list */
1339                         dn3[k] = list->dn[i];
1340                         i++;
1341                         k++;
1342                 } else if (cmp > 0) {
1343                         /* Take list2 */
1344                         dn3[k] = list2->dn[j];
1345                         j++;
1346                         k++;
1347                 } else {
1348                         /* Equal, take list */
1349                         dn3[k] = list->dn[i];
1350                         i++;
1351                         j++;
1352                         k++;
1353                 }
1354         }
1355
1356         list->dn = dn3;
1357         list->count = k;
1358
1359         return true;
1360 }
1361
1362 static int ltdb_index_dn(struct ldb_module *module,
1363                          struct ltdb_private *ltdb,
1364                          const struct ldb_parse_tree *tree,
1365                          struct dn_list *list);
1366
1367
1368 /*
1369   process an OR list (a union)
1370  */
1371 static int ltdb_index_dn_or(struct ldb_module *module,
1372                             struct ltdb_private *ltdb,
1373                             const struct ldb_parse_tree *tree,
1374                             struct dn_list *list)
1375 {
1376         struct ldb_context *ldb;
1377         unsigned int i;
1378
1379         ldb = ldb_module_get_ctx(module);
1380
1381         list->dn = NULL;
1382         list->count = 0;
1383
1384         for (i=0; i<tree->u.list.num_elements; i++) {
1385                 struct dn_list *list2;
1386                 int ret;
1387
1388                 list2 = talloc_zero(list, struct dn_list);
1389                 if (list2 == NULL) {
1390                         return LDB_ERR_OPERATIONS_ERROR;
1391                 }
1392
1393                 ret = ltdb_index_dn(module, ltdb,
1394                                     tree->u.list.elements[i], list2);
1395
1396                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1397                         /* X || 0 == X */
1398                         talloc_free(list2);
1399                         continue;
1400                 }
1401
1402                 if (ret != LDB_SUCCESS) {
1403                         /* X || * == * */
1404                         talloc_free(list2);
1405                         return ret;
1406                 }
1407
1408                 if (!list_union(ldb, ltdb, list, list2)) {
1409                         talloc_free(list2);
1410                         return LDB_ERR_OPERATIONS_ERROR;
1411                 }
1412         }
1413
1414         if (list->count == 0) {
1415                 return LDB_ERR_NO_SUCH_OBJECT;
1416         }
1417
1418         return LDB_SUCCESS;
1419 }
1420
1421
1422 /*
1423   NOT an index results
1424  */
1425 static int ltdb_index_dn_not(struct ldb_module *module,
1426                              struct ltdb_private *ltdb,
1427                              const struct ldb_parse_tree *tree,
1428                              struct dn_list *list)
1429 {
1430         /* the only way to do an indexed not would be if we could
1431            negate the not via another not or if we knew the total
1432            number of database elements so we could know that the
1433            existing expression covered the whole database.
1434
1435            instead, we just give up, and rely on a full index scan
1436            (unless an outer & manages to reduce the list)
1437         */
1438         return LDB_ERR_OPERATIONS_ERROR;
1439 }
1440
1441 /*
1442  * These things are unique, so avoid a full scan if this is a search
1443  * by GUID, DN or a unique attribute
1444  */
1445 static bool ltdb_index_unique(struct ldb_context *ldb,
1446                               struct ltdb_private *ltdb,
1447                               const char *attr)
1448 {
1449         const struct ldb_schema_attribute *a;
1450         if (ltdb->cache->GUID_index_attribute != NULL) {
1451                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1452                         return true;
1453                 }
1454         }
1455         if (ldb_attr_dn(attr) == 0) {
1456                 return true;
1457         }
1458
1459         a = ldb_schema_attribute_by_name(ldb, attr);
1460         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1461                 return true;
1462         }
1463         return false;
1464 }
1465
1466 /*
1467   process an AND expression (intersection)
1468  */
1469 static int ltdb_index_dn_and(struct ldb_module *module,
1470                              struct ltdb_private *ltdb,
1471                              const struct ldb_parse_tree *tree,
1472                              struct dn_list *list)
1473 {
1474         struct ldb_context *ldb;
1475         unsigned int i;
1476         bool found;
1477
1478         ldb = ldb_module_get_ctx(module);
1479
1480         list->dn = NULL;
1481         list->count = 0;
1482
1483         /* in the first pass we only look for unique simple
1484            equality tests, in the hope of avoiding having to look
1485            at any others */
1486         for (i=0; i<tree->u.list.num_elements; i++) {
1487                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1488                 int ret;
1489
1490                 if (subtree->operation != LDB_OP_EQUALITY ||
1491                     !ltdb_index_unique(ldb, ltdb,
1492                                        subtree->u.equality.attr)) {
1493                         continue;
1494                 }
1495
1496                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1497                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1498                         /* 0 && X == 0 */
1499                         return LDB_ERR_NO_SUCH_OBJECT;
1500                 }
1501                 if (ret == LDB_SUCCESS) {
1502                         /* a unique index match means we can
1503                          * stop. Note that we don't care if we return
1504                          * a few too many objects, due to later
1505                          * filtering */
1506                         return LDB_SUCCESS;
1507                 }
1508         }
1509
1510         /* now do a full intersection */
1511         found = false;
1512
1513         for (i=0; i<tree->u.list.num_elements; i++) {
1514                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1515                 struct dn_list *list2;
1516                 int ret;
1517
1518                 list2 = talloc_zero(list, struct dn_list);
1519                 if (list2 == NULL) {
1520                         return ldb_module_oom(module);
1521                 }
1522
1523                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1524
1525                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1526                         /* X && 0 == 0 */
1527                         list->dn = NULL;
1528                         list->count = 0;
1529                         talloc_free(list2);
1530                         return LDB_ERR_NO_SUCH_OBJECT;
1531                 }
1532
1533                 if (ret != LDB_SUCCESS) {
1534                         /* this didn't adding anything */
1535                         talloc_free(list2);
1536                         continue;
1537                 }
1538
1539                 if (!found) {
1540                         talloc_reparent(list2, list, list->dn);
1541                         list->dn = list2->dn;
1542                         list->count = list2->count;
1543                         found = true;
1544                 } else if (!list_intersect(ldb, ltdb,
1545                                            list, list2)) {
1546                         talloc_free(list2);
1547                         return LDB_ERR_OPERATIONS_ERROR;
1548                 }
1549
1550                 if (list->count == 0) {
1551                         list->dn = NULL;
1552                         return LDB_ERR_NO_SUCH_OBJECT;
1553                 }
1554
1555                 if (list->count < 2) {
1556                         /* it isn't worth loading the next part of the tree */
1557                         return LDB_SUCCESS;
1558                 }
1559         }
1560
1561         if (!found) {
1562                 /* none of the attributes were indexed */
1563                 return LDB_ERR_OPERATIONS_ERROR;
1564         }
1565
1566         return LDB_SUCCESS;
1567 }
1568
1569 /*
1570   return a list of matching objects using a one-level index
1571  */
1572 static int ltdb_index_dn_attr(struct ldb_module *module,
1573                               struct ltdb_private *ltdb,
1574                               const char *attr,
1575                               struct ldb_dn *dn,
1576                               struct dn_list *list,
1577                               enum key_truncation *truncation)
1578 {
1579         struct ldb_context *ldb;
1580         struct ldb_dn *key;
1581         struct ldb_val val;
1582         int ret;
1583
1584         ldb = ldb_module_get_ctx(module);
1585
1586         /* work out the index key from the parent DN */
1587         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1588         val.length = strlen((char *)val.data);
1589         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
1590         if (!key) {
1591                 ldb_oom(ldb);
1592                 return LDB_ERR_OPERATIONS_ERROR;
1593         }
1594
1595         ret = ltdb_dn_list_load(module, ltdb, key, list);
1596         talloc_free(key);
1597         if (ret != LDB_SUCCESS) {
1598                 return ret;
1599         }
1600
1601         if (list->count == 0) {
1602                 return LDB_ERR_NO_SUCH_OBJECT;
1603         }
1604
1605         return LDB_SUCCESS;
1606 }
1607
1608 /*
1609   return a list of matching objects using a one-level index
1610  */
1611 static int ltdb_index_dn_one(struct ldb_module *module,
1612                              struct ltdb_private *ltdb,
1613                              struct ldb_dn *parent_dn,
1614                              struct dn_list *list,
1615                              enum key_truncation *truncation)
1616 {
1617         /* Ensure we do not shortcut on intersection for this list */
1618         list->strict = true;
1619         return ltdb_index_dn_attr(module, ltdb,
1620                                   LTDB_IDXONE, parent_dn, list, truncation);
1621
1622 }
1623
1624 /*
1625   return a list of matching objects using the DN index
1626  */
1627 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1628                                  struct ltdb_private *ltdb,
1629                                  struct ldb_dn *base_dn,
1630                                  struct dn_list *dn_list,
1631                                  enum key_truncation *truncation)
1632 {
1633         const struct ldb_val *guid_val = NULL;
1634         if (ltdb->cache->GUID_index_attribute == NULL) {
1635                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1636                 if (dn_list->dn == NULL) {
1637                         return ldb_module_oom(module);
1638                 }
1639                 dn_list->dn[0].data = discard_const_p(unsigned char,
1640                                                       ldb_dn_get_linearized(base_dn));
1641                 if (dn_list->dn[0].data == NULL) {
1642                         return ldb_module_oom(module);
1643                 }
1644                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1645                 dn_list->count = 1;
1646
1647                 return LDB_SUCCESS;
1648         }
1649
1650         if (ltdb->cache->GUID_index_dn_component != NULL) {
1651                 guid_val = ldb_dn_get_extended_component(base_dn,
1652                                                          ltdb->cache->GUID_index_dn_component);
1653         }
1654
1655         if (guid_val != NULL) {
1656                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1657                 if (dn_list->dn == NULL) {
1658                         return ldb_module_oom(module);
1659                 }
1660                 dn_list->dn[0].data = guid_val->data;
1661                 dn_list->dn[0].length = guid_val->length;
1662                 dn_list->count = 1;
1663
1664                 return LDB_SUCCESS;
1665         }
1666
1667         return ltdb_index_dn_attr(module, ltdb,
1668                                   LTDB_IDXDN, base_dn, dn_list, truncation);
1669 }
1670
1671 /*
1672   return a list of dn's that might match a indexed search or
1673   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1674  */
1675 static int ltdb_index_dn(struct ldb_module *module,
1676                          struct ltdb_private *ltdb,
1677                          const struct ldb_parse_tree *tree,
1678                          struct dn_list *list)
1679 {
1680         int ret = LDB_ERR_OPERATIONS_ERROR;
1681
1682         switch (tree->operation) {
1683         case LDB_OP_AND:
1684                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1685                 break;
1686
1687         case LDB_OP_OR:
1688                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1689                 break;
1690
1691         case LDB_OP_NOT:
1692                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1693                 break;
1694
1695         case LDB_OP_EQUALITY:
1696                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1697                 break;
1698
1699         case LDB_OP_SUBSTRING:
1700         case LDB_OP_GREATER:
1701         case LDB_OP_LESS:
1702         case LDB_OP_PRESENT:
1703         case LDB_OP_APPROX:
1704         case LDB_OP_EXTENDED:
1705                 /* we can't index with fancy bitops yet */
1706                 ret = LDB_ERR_OPERATIONS_ERROR;
1707                 break;
1708         }
1709
1710         return ret;
1711 }
1712
1713 /*
1714   filter a candidate dn_list from an indexed search into a set of results
1715   extracting just the given attributes
1716 */
1717 static int ltdb_index_filter(struct ltdb_private *ltdb,
1718                              const struct dn_list *dn_list,
1719                              struct ltdb_context *ac,
1720                              uint32_t *match_count,
1721                              enum key_truncation scope_one_truncation)
1722 {
1723         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1724         struct ldb_message *msg;
1725         struct ldb_message *filtered_msg;
1726         unsigned int i;
1727         unsigned int num_keys = 0;
1728         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1729         TDB_DATA *keys = NULL;
1730
1731         /*
1732          * We have to allocate the key list (rather than just walk the
1733          * caller supplied list) as the callback could change the list
1734          * (by modifying an indexed attribute hosted in the in-memory
1735          * index cache!)
1736          */
1737         keys = talloc_array(ac, TDB_DATA, dn_list->count);
1738         if (keys == NULL) {
1739                 return ldb_module_oom(ac->module);
1740         }
1741
1742         if (ltdb->cache->GUID_index_attribute != NULL) {
1743                 /*
1744                  * We speculate that the keys will be GUID based and so
1745                  * pre-fill in enough space for a GUID (avoiding a pile of
1746                  * small allocations)
1747                  */
1748                 struct guid_tdb_key {
1749                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1750                 } *key_values = NULL;
1751
1752                 key_values = talloc_array(keys,
1753                                           struct guid_tdb_key,
1754                                           dn_list->count);
1755
1756                 for (i = 0; i < dn_list->count; i++) {
1757                         keys[i].dptr = key_values[i].guid_key;
1758                         keys[i].dsize = sizeof(key_values[i].guid_key);
1759                 }
1760                 if (key_values == NULL) {
1761                         return ldb_module_oom(ac->module);
1762                 }
1763         } else {
1764                 for (i = 0; i < dn_list->count; i++) {
1765                         keys[i].dptr = NULL;
1766                         keys[i].dsize = 0;
1767                 }
1768         }
1769
1770         for (i = 0; i < dn_list->count; i++) {
1771                 int ret;
1772
1773                 ret = ltdb_idx_to_key(ac->module,
1774                                       ltdb,
1775                                       keys,
1776                                       &dn_list->dn[i],
1777                                       &keys[num_keys]);
1778                 if (ret != LDB_SUCCESS) {
1779                         return ret;
1780                 }
1781
1782                 if (ltdb->cache->GUID_index_attribute != NULL) {
1783                         /*
1784                          * If we are in GUID index mode, then the dn_list is
1785                          * sorted.  If we got a duplicate, forget about it, as
1786                          * otherwise we would send the same entry back more
1787                          * than once.
1788                          *
1789                          * This is needed in the truncated DN case, or if a
1790                          * duplicate was forced in via
1791                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1792                          */
1793
1794                         if (memcmp(previous_guid_key,
1795                                    keys[num_keys].dptr,
1796                                    sizeof(previous_guid_key)) == 0) {
1797                                 continue;
1798                         }
1799
1800                         memcpy(previous_guid_key,
1801                                keys[num_keys].dptr,
1802                                sizeof(previous_guid_key));
1803                 }
1804                 num_keys++;
1805         }
1806
1807
1808         /*
1809          * Now that the list is a safe copy, send the callbacks
1810          */
1811         for (i = 0; i < num_keys; i++) {
1812                 int ret;
1813                 bool matched;
1814                 msg = ldb_msg_new(ac);
1815                 if (!msg) {
1816                         return LDB_ERR_OPERATIONS_ERROR;
1817                 }
1818
1819                 ret = ltdb_search_key(ac->module, ltdb,
1820                                       keys[i], msg,
1821                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1822                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1823                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1824                         /*
1825                          * the record has disappeared? yes, this can
1826                          * happen if the entry is deleted by something
1827                          * operating in the callback (not another
1828                          * process, as we have a read lock)
1829                          */
1830                         talloc_free(msg);
1831                         continue;
1832                 }
1833
1834                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1835                         /* an internal error */
1836                         talloc_free(msg);
1837                         return LDB_ERR_OPERATIONS_ERROR;
1838                 }
1839
1840                 /*
1841                  * We trust the index for LDB_SCOPE_ONELEVEL
1842                  * unless the index key has been truncated.
1843                  *
1844                  * LDB_SCOPE_BASE is not passed in by our only caller.
1845                  */
1846                 if (ac->scope == LDB_SCOPE_ONELEVEL
1847                     && ltdb->cache->one_level_indexes
1848                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1849                         ret = ldb_match_message(ldb, msg, ac->tree,
1850                                                 ac->scope, &matched);
1851                 } else {
1852                         ret = ldb_match_msg_error(ldb, msg,
1853                                                   ac->tree, ac->base,
1854                                                   ac->scope, &matched);
1855                 }
1856
1857                 if (ret != LDB_SUCCESS) {
1858                         talloc_free(msg);
1859                         return ret;
1860                 }
1861                 if (!matched) {
1862                         talloc_free(msg);
1863                         continue;
1864                 }
1865
1866                 /* filter the attributes that the user wants */
1867                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1868
1869                 talloc_free(msg);
1870
1871                 if (ret == -1) {
1872                         return LDB_ERR_OPERATIONS_ERROR;
1873                 }
1874
1875                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1876                 if (ret != LDB_SUCCESS) {
1877                         /* Regardless of success or failure, the msg
1878                          * is the callbacks responsiblity, and should
1879                          * not be talloc_free()'ed */
1880                         ac->request_terminated = true;
1881                         return ret;
1882                 }
1883
1884                 (*match_count)++;
1885         }
1886
1887         TALLOC_FREE(keys);
1888         return LDB_SUCCESS;
1889 }
1890
1891 /*
1892   sort a DN list
1893  */
1894 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1895                               struct dn_list *list)
1896 {
1897         if (list->count < 2) {
1898                 return;
1899         }
1900
1901         /* We know the list is sorted when using the GUID index */
1902         if (ltdb->cache->GUID_index_attribute != NULL) {
1903                 return;
1904         }
1905
1906         TYPESAFE_QSORT(list->dn, list->count,
1907                        ldb_val_equal_exact_for_qsort);
1908 }
1909
1910 /*
1911   search the database with a LDAP-like expression using indexes
1912   returns -1 if an indexed search is not possible, in which
1913   case the caller should call ltdb_search_full()
1914 */
1915 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1916 {
1917         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1918         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1919         struct dn_list *dn_list;
1920         int ret;
1921         enum ldb_scope index_scope;
1922         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1923
1924         /* see if indexing is enabled */
1925         if (!ltdb->cache->attribute_indexes &&
1926             !ltdb->cache->one_level_indexes &&
1927             ac->scope != LDB_SCOPE_BASE) {
1928                 /* fallback to a full search */
1929                 return LDB_ERR_OPERATIONS_ERROR;
1930         }
1931
1932         dn_list = talloc_zero(ac, struct dn_list);
1933         if (dn_list == NULL) {
1934                 return ldb_module_oom(ac->module);
1935         }
1936
1937         /*
1938          * For the purposes of selecting the switch arm below, if we
1939          * don't have a one-level index then treat it like a subtree
1940          * search
1941          */
1942         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1943             !ltdb->cache->one_level_indexes) {
1944                 index_scope = LDB_SCOPE_SUBTREE;
1945         } else {
1946                 index_scope = ac->scope;
1947         }
1948
1949         switch (index_scope) {
1950         case LDB_SCOPE_BASE:
1951                 /*
1952                  * The only caller will have filtered the operation out
1953                  * so we should never get here
1954                  */
1955                 return ldb_operr(ldb);
1956
1957         case LDB_SCOPE_ONELEVEL:
1958                 /*
1959                  * If we ever start to also load the index values for
1960                  * the tree, we must ensure we strictly intersect with
1961                  * this list, as we trust the ONELEVEL index
1962                  */
1963                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
1964                                         &scope_one_truncation);
1965                 if (ret != LDB_SUCCESS) {
1966                         talloc_free(dn_list);
1967                         return ret;
1968                 }
1969
1970                 /*
1971                  * If we have too many matches, running the filter
1972                  * tree over the SCOPE_ONELEVEL can be quite expensive
1973                  * so we now check the filter tree index as well.
1974                  *
1975                  * We only do this in the GUID index mode, which is
1976                  * O(n*log(m)) otherwise the intersection below will
1977                  * be too costly at O(n*m).
1978                  *
1979                  * We don't set a heuristic for 'too many' but instead
1980                  * do it always and rely on the index lookup being
1981                  * fast enough in the small case.
1982                  */
1983                 if (ltdb->cache->GUID_index_attribute != NULL) {
1984                         struct dn_list *idx_one_tree_list
1985                                 = talloc_zero(ac, struct dn_list);
1986                         if (idx_one_tree_list == NULL) {
1987                                 return ldb_module_oom(ac->module);
1988                         }
1989
1990                         if (!ltdb->cache->attribute_indexes) {
1991                                 talloc_free(idx_one_tree_list);
1992                                 talloc_free(dn_list);
1993                                 return LDB_ERR_OPERATIONS_ERROR;
1994                         }
1995                         /*
1996                          * Here we load the index for the tree.
1997                          *
1998                          * We only care if this is successful, if the
1999                          * index can't trim the result list down then
2000                          * the ONELEVEL index is still good enough.
2001                          */
2002                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
2003                                             idx_one_tree_list);
2004                         if (ret == LDB_SUCCESS) {
2005                                 if (!list_intersect(ldb, ltdb,
2006                                                     dn_list,
2007                                                     idx_one_tree_list)) {
2008                                         talloc_free(idx_one_tree_list);
2009                                         talloc_free(dn_list);
2010                                         return LDB_ERR_OPERATIONS_ERROR;
2011                                 }
2012                         }
2013                 }
2014                 break;
2015
2016         case LDB_SCOPE_SUBTREE:
2017         case LDB_SCOPE_DEFAULT:
2018                 if (!ltdb->cache->attribute_indexes) {
2019                         talloc_free(dn_list);
2020                         return LDB_ERR_OPERATIONS_ERROR;
2021                 }
2022                 /*
2023                  * Here we load the index for the tree.  We have no
2024                  * index for the subtree.
2025                  */
2026                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
2027                 if (ret != LDB_SUCCESS) {
2028                         talloc_free(dn_list);
2029                         return ret;
2030                 }
2031                 break;
2032         }
2033
2034         /*
2035          * It is critical that this function do the re-filter even
2036          * on things found by the index as the index can over-match
2037          * in cases of truncation (as well as when it decides it is
2038          * not worth further filtering)
2039          *
2040          * If this changes, then the index code above would need to
2041          * pass up a flag to say if any index was truncated during
2042          * processing as the truncation here refers only to the
2043          * SCOPE_ONELEVEL index.
2044          */
2045         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
2046                                 scope_one_truncation);
2047         talloc_free(dn_list);
2048         return ret;
2049 }
2050
2051 /**
2052  * @brief Add a DN in the index list of a given attribute name/value pair
2053  *
2054  * This function will add the DN in the index list for the index for
2055  * the given attribute name and value.
2056  *
2057  * @param[in]  module       A ldb_module structure
2058  *
2059  * @param[in]  dn           The string representation of the DN as it
2060  *                          will be stored in the index entry
2061  *
2062  * @param[in]  el           A ldb_message_element array, one of the entry
2063  *                          referred by the v_idx is the attribute name and
2064  *                          value pair which will be used to construct the
2065  *                          index name
2066  *
2067  * @param[in]  v_idx        The index of element in the el array to use
2068  *
2069  * @return                  An ldb error code
2070  */
2071 static int ltdb_index_add1(struct ldb_module *module,
2072                            struct ltdb_private *ltdb,
2073                            const struct ldb_message *msg,
2074                            struct ldb_message_element *el, int v_idx)
2075 {
2076         struct ldb_context *ldb;
2077         struct ldb_dn *dn_key;
2078         int ret;
2079         const struct ldb_schema_attribute *a;
2080         struct dn_list *list;
2081         unsigned alloc_len;
2082         enum key_truncation truncation = KEY_TRUNCATED;
2083
2084
2085         ldb = ldb_module_get_ctx(module);
2086
2087         list = talloc_zero(module, struct dn_list);
2088         if (list == NULL) {
2089                 return LDB_ERR_OPERATIONS_ERROR;
2090         }
2091
2092         dn_key = ltdb_index_key(ldb, ltdb,
2093                                 el->name, &el->values[v_idx], &a, &truncation);
2094         if (!dn_key) {
2095                 talloc_free(list);
2096                 return LDB_ERR_OPERATIONS_ERROR;
2097         }
2098         /*
2099          * Samba only maintains unique indexes on the objectSID and objectGUID
2100          * so if a unique index key exceeds the maximum length there is a
2101          * problem.
2102          */
2103         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2104                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2105                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2106
2107                 ldb_asprintf_errstring(
2108                         ldb,
2109                         __location__ ": unique index key on %s in %s, "
2110                         "exceeds maximum key length of %u (encoded).",
2111                         el->name,
2112                         ldb_dn_get_linearized(msg->dn),
2113                         ltdb->max_key_length);
2114                 talloc_free(list);
2115                 return LDB_ERR_CONSTRAINT_VIOLATION;
2116         }
2117         talloc_steal(list, dn_key);
2118
2119         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2120         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2121                 talloc_free(list);
2122                 return ret;
2123         }
2124
2125         /*
2126          * Check for duplicates in the @IDXDN DN -> GUID record
2127          *
2128          * This is very normal, it just means a duplicate DN creation
2129          * was attempted, so don't set the error string or print scary
2130          * messages.
2131          */
2132         if (list->count > 0 &&
2133             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2134             truncation == KEY_NOT_TRUNCATED) {
2135
2136                 talloc_free(list);
2137                 return LDB_ERR_CONSTRAINT_VIOLATION;
2138
2139         } else if (list->count > 0
2140                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2141
2142                 /*
2143                  * At least one existing entry in the DN->GUID index, which
2144                  * arises when the DN indexes have been truncated
2145                  *
2146                  * So need to pull the DN's to check if it's really a duplicate
2147                  */
2148                 int i;
2149                 for (i=0; i < list->count; i++) {
2150                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2151                         TDB_DATA key = {
2152                                 .dptr = guid_key,
2153                                 .dsize = sizeof(guid_key)
2154                         };
2155                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2156                         struct ldb_message *rec = ldb_msg_new(ldb);
2157                         if (rec == NULL) {
2158                                 return LDB_ERR_OPERATIONS_ERROR;
2159                         }
2160
2161                         ret = ltdb_idx_to_key(module, ltdb,
2162                                               ldb, &list->dn[i],
2163                                               &key);
2164                         if (ret != LDB_SUCCESS) {
2165                                 TALLOC_FREE(list);
2166                                 TALLOC_FREE(rec);
2167                                 return ret;
2168                         }
2169
2170                         ret = ltdb_search_key(module, ltdb, key,
2171                                               rec, flags);
2172                         if (key.dptr != guid_key) {
2173                                 TALLOC_FREE(key.dptr);
2174                         }
2175                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2176                                 /*
2177                                  * the record has disappeared?
2178                                  * yes, this can happen
2179                                  */
2180                                 talloc_free(rec);
2181                                 continue;
2182                         }
2183
2184                         if (ret != LDB_SUCCESS) {
2185                                 /* an internal error */
2186                                 TALLOC_FREE(rec);
2187                                 TALLOC_FREE(list);
2188                                 return LDB_ERR_OPERATIONS_ERROR;
2189                         }
2190                         /*
2191                          * The DN we are trying to add to the DB and index
2192                          * is already here, so we must deny the addition
2193                          */
2194                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2195                                 TALLOC_FREE(rec);
2196                                 TALLOC_FREE(list);
2197                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2198                         }
2199                 }
2200         }
2201
2202         /*
2203          * Check for duplicates in unique indexes
2204          *
2205          * We don't need to do a loop test like the @IDXDN case
2206          * above as we have a ban on long unique index values
2207          * at the start of this function.
2208          */
2209         if (list->count > 0 &&
2210             ((a != NULL
2211               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2212                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2213                 /*
2214                  * We do not want to print info about a possibly
2215                  * confidential DN that the conflict was with in the
2216                  * user-visible error string
2217                  */
2218
2219                 if (ltdb->cache->GUID_index_attribute == NULL) {
2220                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2221                                   __location__
2222                                   ": unique index violation on %s in %s, "
2223                                   "conficts with %*.*s in %s",
2224                                   el->name, ldb_dn_get_linearized(msg->dn),
2225                                   (int)list->dn[0].length,
2226                                   (int)list->dn[0].length,
2227                                   list->dn[0].data,
2228                                   ldb_dn_get_linearized(dn_key));
2229                 } else {
2230                         /* This can't fail, gives a default at worst */
2231                         const struct ldb_schema_attribute *attr
2232                                 = ldb_schema_attribute_by_name(
2233                                         ldb,
2234                                         ltdb->cache->GUID_index_attribute);
2235                         struct ldb_val v;
2236                         ret = attr->syntax->ldif_write_fn(ldb, list,
2237                                                           &list->dn[0], &v);
2238                         if (ret == LDB_SUCCESS) {
2239                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2240                                           __location__
2241                                           ": unique index violation on %s in "
2242                                           "%s, conficts with %s %*.*s in %s",
2243                                           el->name,
2244                                           ldb_dn_get_linearized(msg->dn),
2245                                           ltdb->cache->GUID_index_attribute,
2246                                           (int)v.length,
2247                                           (int)v.length,
2248                                           v.data,
2249                                           ldb_dn_get_linearized(dn_key));
2250                         }
2251                 }
2252                 ldb_asprintf_errstring(ldb,
2253                                        __location__ ": unique index violation "
2254                                        "on %s in %s",
2255                                        el->name,
2256                                        ldb_dn_get_linearized(msg->dn));
2257                 talloc_free(list);
2258                 return LDB_ERR_CONSTRAINT_VIOLATION;
2259         }
2260
2261         /* overallocate the list a bit, to reduce the number of
2262          * realloc trigered copies */
2263         alloc_len = ((list->count+1)+7) & ~7;
2264         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2265         if (list->dn == NULL) {
2266                 talloc_free(list);
2267                 return LDB_ERR_OPERATIONS_ERROR;
2268         }
2269
2270         if (ltdb->cache->GUID_index_attribute == NULL) {
2271                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2272                 list->dn[list->count].data
2273                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2274                 if (list->dn[list->count].data == NULL) {
2275                         talloc_free(list);
2276                         return LDB_ERR_OPERATIONS_ERROR;
2277                 }
2278                 list->dn[list->count].length = strlen(dn_str);
2279         } else {
2280                 const struct ldb_val *key_val;
2281                 struct ldb_val *exact = NULL, *next = NULL;
2282                 key_val = ldb_msg_find_ldb_val(msg,
2283                                                ltdb->cache->GUID_index_attribute);
2284                 if (key_val == NULL) {
2285                         talloc_free(list);
2286                         return ldb_module_operr(module);
2287                 }
2288
2289                 if (key_val->length != LTDB_GUID_SIZE) {
2290                         talloc_free(list);
2291                         return ldb_module_operr(module);
2292                 }
2293
2294                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2295                                         *key_val, ldb_val_equal_exact_ordered,
2296                                         exact, next);
2297
2298                 /*
2299                  * Give a warning rather than fail, this could be a
2300                  * duplicate value in the record allowed by a caller
2301                  * forcing in the value with
2302                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2303                  */
2304                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2305                         /* This can't fail, gives a default at worst */
2306                         const struct ldb_schema_attribute *attr
2307                                 = ldb_schema_attribute_by_name(
2308                                         ldb,
2309                                         ltdb->cache->GUID_index_attribute);
2310                         struct ldb_val v;
2311                         ret = attr->syntax->ldif_write_fn(ldb, list,
2312                                                           exact, &v);
2313                         if (ret == LDB_SUCCESS) {
2314                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2315                                           __location__
2316                                           ": duplicate attribute value in %s "
2317                                           "for index on %s, "
2318                                           "duplicate of %s %*.*s in %s",
2319                                           ldb_dn_get_linearized(msg->dn),
2320                                           el->name,
2321                                           ltdb->cache->GUID_index_attribute,
2322                                           (int)v.length,
2323                                           (int)v.length,
2324                                           v.data,
2325                                           ldb_dn_get_linearized(dn_key));
2326                         }
2327                 }
2328
2329                 if (next == NULL) {
2330                         next = &list->dn[list->count];
2331                 } else {
2332                         memmove(&next[1], next,
2333                                 sizeof(*next) * (list->count - (next - list->dn)));
2334                 }
2335                 *next = ldb_val_dup(list->dn, key_val);
2336                 if (next->data == NULL) {
2337                         talloc_free(list);
2338                         return ldb_module_operr(module);
2339                 }
2340         }
2341         list->count++;
2342
2343         ret = ltdb_dn_list_store(module, dn_key, list);
2344
2345         talloc_free(list);
2346
2347         return ret;
2348 }
2349
2350 /*
2351   add index entries for one elements in a message
2352  */
2353 static int ltdb_index_add_el(struct ldb_module *module,
2354                              struct ltdb_private *ltdb,
2355                              const struct ldb_message *msg,
2356                              struct ldb_message_element *el)
2357 {
2358         unsigned int i;
2359         for (i = 0; i < el->num_values; i++) {
2360                 int ret = ltdb_index_add1(module, ltdb,
2361                                           msg, el, i);
2362                 if (ret != LDB_SUCCESS) {
2363                         return ret;
2364                 }
2365         }
2366
2367         return LDB_SUCCESS;
2368 }
2369
2370 /*
2371   add index entries for all elements in a message
2372  */
2373 static int ltdb_index_add_all(struct ldb_module *module,
2374                               struct ltdb_private *ltdb,
2375                               const struct ldb_message *msg)
2376 {
2377         struct ldb_message_element *elements = msg->elements;
2378         unsigned int i;
2379         const char *dn_str;
2380         int ret;
2381
2382         if (ldb_dn_is_special(msg->dn)) {
2383                 return LDB_SUCCESS;
2384         }
2385
2386         dn_str = ldb_dn_get_linearized(msg->dn);
2387         if (dn_str == NULL) {
2388                 return LDB_ERR_OPERATIONS_ERROR;
2389         }
2390
2391         ret = ltdb_write_index_dn_guid(module, msg, 1);
2392         if (ret != LDB_SUCCESS) {
2393                 return ret;
2394         }
2395
2396         if (!ltdb->cache->attribute_indexes) {
2397                 /* no indexed fields */
2398                 return LDB_SUCCESS;
2399         }
2400
2401         for (i = 0; i < msg->num_elements; i++) {
2402                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
2403                         continue;
2404                 }
2405                 ret = ltdb_index_add_el(module, ltdb,
2406                                         msg, &elements[i]);
2407                 if (ret != LDB_SUCCESS) {
2408                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2409                         ldb_asprintf_errstring(ldb,
2410                                                __location__ ": Failed to re-index %s in %s - %s",
2411                                                elements[i].name, dn_str,
2412                                                ldb_errstring(ldb));
2413                         return ret;
2414                 }
2415         }
2416
2417         return LDB_SUCCESS;
2418 }
2419
2420
2421 /*
2422   insert a DN index for a message
2423 */
2424 static int ltdb_modify_index_dn(struct ldb_module *module,
2425                                 struct ltdb_private *ltdb,
2426                                 const struct ldb_message *msg,
2427                                 struct ldb_dn *dn,
2428                                 const char *index, int add)
2429 {
2430         struct ldb_message_element el;
2431         struct ldb_val val;
2432         int ret;
2433
2434         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2435         if (val.data == NULL) {
2436                 const char *dn_str = ldb_dn_get_linearized(dn);
2437                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2438                                        __location__
2439                                        ": Failed to modify %s "
2440                                        "against %s in %s: failed "
2441                                        "to get casefold DN",
2442                                        index,
2443                                        ltdb->cache->GUID_index_attribute,
2444                                        dn_str);
2445                 return LDB_ERR_OPERATIONS_ERROR;
2446         }
2447
2448         val.length = strlen((char *)val.data);
2449         el.name = index;
2450         el.values = &val;
2451         el.num_values = 1;
2452
2453         if (add) {
2454                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2455         } else { /* delete */
2456                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2457         }
2458
2459         if (ret != LDB_SUCCESS) {
2460                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2461                 const char *dn_str = ldb_dn_get_linearized(dn);
2462                 ldb_asprintf_errstring(ldb,
2463                                        __location__
2464                                        ": Failed to modify %s "
2465                                        "against %s in %s - %s",
2466                                        index,
2467                                        ltdb->cache->GUID_index_attribute,
2468                                        dn_str, ldb_errstring(ldb));
2469                 return ret;
2470         }
2471         return ret;
2472 }
2473
2474 /*
2475   insert a one level index for a message
2476 */
2477 static int ltdb_index_onelevel(struct ldb_module *module,
2478                                const struct ldb_message *msg, int add)
2479 {
2480         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2481                                                     struct ltdb_private);
2482         struct ldb_dn *pdn;
2483         int ret;
2484
2485         /* We index for ONE Level only if requested */
2486         if (!ltdb->cache->one_level_indexes) {
2487                 return LDB_SUCCESS;
2488         }
2489
2490         pdn = ldb_dn_get_parent(module, msg->dn);
2491         if (pdn == NULL) {
2492                 return LDB_ERR_OPERATIONS_ERROR;
2493         }
2494         ret = ltdb_modify_index_dn(module, ltdb,
2495                                    msg, pdn, LTDB_IDXONE, add);
2496
2497         talloc_free(pdn);
2498
2499         return ret;
2500 }
2501
2502 /*
2503   insert a one level index for a message
2504 */
2505 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2506                                     const struct ldb_message *msg,
2507                                     int add)
2508 {
2509         int ret;
2510         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2511                                                     struct ltdb_private);
2512
2513         /* We index for DN only if using a GUID index */
2514         if (ltdb->cache->GUID_index_attribute == NULL) {
2515                 return LDB_SUCCESS;
2516         }
2517
2518         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2519                                    LTDB_IDXDN, add);
2520
2521         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2522                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2523                                        "Entry %s already exists",
2524                                        ldb_dn_get_linearized(msg->dn));
2525                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2526         }
2527         return ret;
2528 }
2529
2530 /*
2531   add the index entries for a new element in a record
2532   The caller guarantees that these element values are not yet indexed
2533 */
2534 int ltdb_index_add_element(struct ldb_module *module,
2535                            struct ltdb_private *ltdb,
2536                            const struct ldb_message *msg,
2537                            struct ldb_message_element *el)
2538 {
2539         if (ldb_dn_is_special(msg->dn)) {
2540                 return LDB_SUCCESS;
2541         }
2542         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2543                 return LDB_SUCCESS;
2544         }
2545         return ltdb_index_add_el(module, ltdb, msg, el);
2546 }
2547
2548 /*
2549   add the index entries for a new record
2550 */
2551 int ltdb_index_add_new(struct ldb_module *module,
2552                        struct ltdb_private *ltdb,
2553                        const struct ldb_message *msg)
2554 {
2555         int ret;
2556
2557         if (ldb_dn_is_special(msg->dn)) {
2558                 return LDB_SUCCESS;
2559         }
2560
2561         ret = ltdb_index_add_all(module, ltdb, msg);
2562         if (ret != LDB_SUCCESS) {
2563                 /*
2564                  * Because we can't trust the caller to be doing
2565                  * transactions properly, clean up any index for this
2566                  * entry rather than relying on a transaction
2567                  * cleanup
2568                  */
2569
2570                 ltdb_index_delete(module, msg);
2571                 return ret;
2572         }
2573
2574         ret = ltdb_index_onelevel(module, msg, 1);
2575         if (ret != LDB_SUCCESS) {
2576                 /*
2577                  * Because we can't trust the caller to be doing
2578                  * transactions properly, clean up any index for this
2579                  * entry rather than relying on a transaction
2580                  * cleanup
2581                  */
2582                 ltdb_index_delete(module, msg);
2583                 return ret;
2584         }
2585         return ret;
2586 }
2587
2588
2589 /*
2590   delete an index entry for one message element
2591 */
2592 int ltdb_index_del_value(struct ldb_module *module,
2593                          struct ltdb_private *ltdb,
2594                          const struct ldb_message *msg,
2595                          struct ldb_message_element *el, unsigned int v_idx)
2596 {
2597         struct ldb_context *ldb;
2598         struct ldb_dn *dn_key;
2599         const char *dn_str;
2600         int ret, i;
2601         unsigned int j;
2602         struct dn_list *list;
2603         struct ldb_dn *dn = msg->dn;
2604         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2605
2606         ldb = ldb_module_get_ctx(module);
2607
2608         dn_str = ldb_dn_get_linearized(dn);
2609         if (dn_str == NULL) {
2610                 return LDB_ERR_OPERATIONS_ERROR;
2611         }
2612
2613         if (dn_str[0] == '@') {
2614                 return LDB_SUCCESS;
2615         }
2616
2617         dn_key = ltdb_index_key(ldb, ltdb,
2618                                 el->name, &el->values[v_idx],
2619                                 NULL, &truncation);
2620         /*
2621          * We ignore key truncation in ltdb_index_add1() so
2622          * match that by ignoring it here as well
2623          *
2624          * Multiple values are legitimate and accepted
2625          */
2626         if (!dn_key) {
2627                 return LDB_ERR_OPERATIONS_ERROR;
2628         }
2629
2630         list = talloc_zero(dn_key, struct dn_list);
2631         if (list == NULL) {
2632                 talloc_free(dn_key);
2633                 return LDB_ERR_OPERATIONS_ERROR;
2634         }
2635
2636         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2637         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2638                 /* it wasn't indexed. Did we have an earlier error? If we did then
2639                    its gone now */
2640                 talloc_free(dn_key);
2641                 return LDB_SUCCESS;
2642         }
2643
2644         if (ret != LDB_SUCCESS) {
2645                 talloc_free(dn_key);
2646                 return ret;
2647         }
2648
2649         /*
2650          * Find one of the values matching this message to remove
2651          */
2652         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2653         if (i == -1) {
2654                 /* nothing to delete */
2655                 talloc_free(dn_key);
2656                 return LDB_SUCCESS;
2657         }
2658
2659         j = (unsigned int) i;
2660         if (j != list->count - 1) {
2661                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2662         }
2663         list->count--;
2664         if (list->count == 0) {
2665                 talloc_free(list->dn);
2666                 list->dn = NULL;
2667         } else {
2668                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2669         }
2670
2671         ret = ltdb_dn_list_store(module, dn_key, list);
2672
2673         talloc_free(dn_key);
2674
2675         return ret;
2676 }
2677
2678 /*
2679   delete the index entries for a element
2680   return -1 on failure
2681 */
2682 int ltdb_index_del_element(struct ldb_module *module,
2683                            struct ltdb_private *ltdb,
2684                            const struct ldb_message *msg,
2685                            struct ldb_message_element *el)
2686 {
2687         const char *dn_str;
2688         int ret;
2689         unsigned int i;
2690
2691         if (!ltdb->cache->attribute_indexes) {
2692                 /* no indexed fields */
2693                 return LDB_SUCCESS;
2694         }
2695
2696         dn_str = ldb_dn_get_linearized(msg->dn);
2697         if (dn_str == NULL) {
2698                 return LDB_ERR_OPERATIONS_ERROR;
2699         }
2700
2701         if (dn_str[0] == '@') {
2702                 return LDB_SUCCESS;
2703         }
2704
2705         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2706                 return LDB_SUCCESS;
2707         }
2708         for (i = 0; i < el->num_values; i++) {
2709                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2710                 if (ret != LDB_SUCCESS) {
2711                         return ret;
2712                 }
2713         }
2714
2715         return LDB_SUCCESS;
2716 }
2717
2718 /*
2719   delete the index entries for a record
2720   return -1 on failure
2721 */
2722 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2723 {
2724         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2725         int ret;
2726         unsigned int i;
2727
2728         if (ldb_dn_is_special(msg->dn)) {
2729                 return LDB_SUCCESS;
2730         }
2731
2732         ret = ltdb_index_onelevel(module, msg, 0);
2733         if (ret != LDB_SUCCESS) {
2734                 return ret;
2735         }
2736
2737         ret = ltdb_write_index_dn_guid(module, msg, 0);
2738         if (ret != LDB_SUCCESS) {
2739                 return ret;
2740         }
2741
2742         if (!ltdb->cache->attribute_indexes) {
2743                 /* no indexed fields */
2744                 return LDB_SUCCESS;
2745         }
2746
2747         for (i = 0; i < msg->num_elements; i++) {
2748                 ret = ltdb_index_del_element(module, ltdb,
2749                                              msg, &msg->elements[i]);
2750                 if (ret != LDB_SUCCESS) {
2751                         return ret;
2752                 }
2753         }
2754
2755         return LDB_SUCCESS;
2756 }
2757
2758
2759 /*
2760   traversal function that deletes all @INDEX records in the in-memory
2761   TDB.
2762
2763   This does not touch the actual DB, that is done at transaction
2764   commit, which in turn greatly reduces DB churn as we will likely
2765   be able to do a direct update into the old record.
2766 */
2767 static int delete_index(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, void *state)
2768 {
2769         struct ldb_module *module = state;
2770         const char *dnstr = "DN=" LTDB_INDEX ":";
2771         struct dn_list list;
2772         struct ldb_dn *dn;
2773         struct ldb_val v;
2774         int ret;
2775
2776         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2777                 return 0;
2778         }
2779         /* we need to put a empty list in the internal tdb for this
2780          * index entry */
2781         list.dn = NULL;
2782         list.count = 0;
2783
2784         /* the offset of 3 is to remove the DN= prefix. */
2785         v.data = key.data + 3;
2786         v.length = strnlen((char *)key.data, key.length) - 3;
2787
2788         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2789
2790         /*
2791          * This does not actually touch the DB quite yet, just
2792          * the in-memory index cache
2793          */
2794         ret = ltdb_dn_list_store(module, dn, &list);
2795         if (ret != LDB_SUCCESS) {
2796                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2797                                        "Unable to store null index for %s\n",
2798                                                 ldb_dn_get_linearized(dn));
2799                 talloc_free(dn);
2800                 return -1;
2801         }
2802         talloc_free(dn);
2803         return 0;
2804 }
2805
2806 /*
2807   traversal function that adds @INDEX records during a re index TODO wrong comment
2808 */
2809 static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2810 {
2811         struct ldb_context *ldb;
2812         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2813         struct ldb_module *module = ctx->module;
2814         struct ldb_message *msg;
2815         unsigned int nb_elements_in_db;
2816         int ret;
2817         TDB_DATA key2;
2818         bool is_record;
2819         TDB_DATA key = {
2820                 .dptr = ldb_key.data,
2821                 .dsize = ldb_key.length
2822         };
2823         
2824         ldb = ldb_module_get_ctx(module);
2825
2826         if (key.dsize > 4 &&
2827             memcmp(key.dptr, "DN=@", 4) == 0) {
2828                 return 0;
2829         }
2830
2831         is_record = ltdb_key_is_record(key);
2832         if (is_record == false) {
2833                 return 0;
2834         }
2835         
2836         msg = ldb_msg_new(module);
2837         if (msg == NULL) {
2838                 return -1;
2839         }
2840
2841         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2842                                                    msg,
2843                                                    NULL, 0,
2844                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2845                                                    &nb_elements_in_db);
2846         if (ret != 0) {
2847                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2848                                                 ldb_dn_get_linearized(msg->dn));
2849                 ctx->error = ret;
2850                 talloc_free(msg);
2851                 return -1;
2852         }
2853
2854         if (msg->dn == NULL) {
2855                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2856                           "Refusing to re-index as GUID "
2857                           "key %*.*s with no DN\n",
2858                           (int)key.dsize, (int)key.dsize,
2859                           (char *)key.dptr);
2860                 talloc_free(msg);
2861                 return -1;
2862         }
2863         
2864         /* check if the DN key has changed, perhaps due to the case
2865            insensitivity of an element changing, or a change from DN
2866            to GUID keys */
2867         key2 = ltdb_key_msg(module, msg, msg);
2868         if (key2.dptr == NULL) {
2869                 /* probably a corrupt record ... darn */
2870                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2871                                                 ldb_dn_get_linearized(msg->dn));
2872                 talloc_free(msg);
2873                 return 0;
2874         }
2875         if (key.dsize != key2.dsize ||
2876             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2877                 struct ldb_val ldb_key2 = {
2878                         .data = key2.dptr,
2879                         .length = key2.dsize
2880                 };
2881                 ltdb->kv_ops->update_in_iterate(ltdb, ldb_key, ldb_key2, val, ctx);
2882         }
2883         talloc_free(key2.dptr);
2884
2885         talloc_free(msg);
2886
2887         ctx->count++;
2888         if (ctx->count % 10000 == 0) {
2889                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2890                           "Reindexing: re-keyed %u records so far",
2891                           ctx->count);
2892         }
2893
2894         return 0;
2895 }
2896
2897 /*
2898   traversal function that adds @INDEX records during a re index
2899 */
2900 static int re_index(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2901 {
2902         struct ldb_context *ldb;
2903         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2904         struct ldb_module *module = ctx->module;
2905         struct ldb_message *msg;
2906         unsigned int nb_elements_in_db;
2907         TDB_DATA key = {
2908                 .dptr = ldb_key.data,
2909                 .dsize = ldb_key.length
2910         };
2911         int ret;
2912         bool is_record;
2913         
2914         ldb = ldb_module_get_ctx(module);
2915
2916         if (key.dsize > 4 &&
2917             memcmp(key.dptr, "DN=@", 4) == 0) {
2918                 return 0;
2919         }
2920
2921         is_record = ltdb_key_is_record(key);
2922         if (is_record == false) {
2923                 return 0;
2924         }
2925         
2926         msg = ldb_msg_new(module);
2927         if (msg == NULL) {
2928                 return -1;
2929         }
2930
2931         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2932                                                    msg,
2933                                                    NULL, 0,
2934                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2935                                                    &nb_elements_in_db);
2936         if (ret != 0) {
2937                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2938                                                 ldb_dn_get_linearized(msg->dn));
2939                 ctx->error = ret;
2940                 talloc_free(msg);
2941                 return -1;
2942         }
2943
2944         if (msg->dn == NULL) {
2945                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2946                           "Refusing to re-index as GUID "
2947                           "key %*.*s with no DN\n",
2948                           (int)key.dsize, (int)key.dsize,
2949                           (char *)key.dptr);
2950                 talloc_free(msg);
2951                 return -1;
2952         }
2953
2954         ret = ltdb_index_onelevel(module, msg, 1);
2955         if (ret != LDB_SUCCESS) {
2956                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2957                           "Adding special ONE LEVEL index failed (%s)!",
2958                                                 ldb_dn_get_linearized(msg->dn));
2959                 talloc_free(msg);
2960                 return -1;
2961         }
2962
2963         ret = ltdb_index_add_all(module, ltdb, msg);
2964
2965         if (ret != LDB_SUCCESS) {
2966                 ctx->error = ret;
2967                 talloc_free(msg);
2968                 return -1;
2969         }
2970
2971         talloc_free(msg);
2972
2973         ctx->count++;
2974         if (ctx->count % 10000 == 0) {
2975                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2976                           "Reindexing: re-indexed %u records so far",
2977                           ctx->count);
2978         }
2979
2980         return 0;
2981 }
2982
2983 /*
2984   force a complete reindex of the database
2985 */
2986 int ltdb_reindex(struct ldb_module *module)
2987 {
2988         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2989         int ret;
2990         struct ltdb_reindex_context ctx;
2991
2992         /*
2993          * Only triggered after a modification, but make clear we do
2994          * not re-index a read-only DB
2995          */
2996         if (ltdb->read_only) {
2997                 return LDB_ERR_UNWILLING_TO_PERFORM;
2998         }
2999
3000         if (ltdb_cache_reload(module) != 0) {
3001                 return LDB_ERR_OPERATIONS_ERROR;
3002         }
3003
3004         /*
3005          * Ensure we read (and so remove) the entries from the real
3006          * DB, no values stored so far are any use as we want to do a
3007          * re-index
3008          */
3009         ltdb_index_transaction_cancel(module);
3010
3011         ret = ltdb_index_transaction_start(module);
3012         if (ret != LDB_SUCCESS) {
3013                 return ret;
3014         }
3015
3016         /* first traverse the database deleting any @INDEX records by
3017          * putting NULL entries in the in-memory tdb
3018          */
3019         ret = ltdb->kv_ops->iterate(ltdb, delete_index, module);
3020         if (ret < 0) {
3021                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3022                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3023                                        ldb_errstring(ldb));
3024                 return LDB_ERR_OPERATIONS_ERROR;
3025         }
3026
3027         ctx.module = module;
3028         ctx.error = 0;
3029         ctx.count = 0;
3030
3031         ret = ltdb->kv_ops->iterate(ltdb, re_key, &ctx);
3032         if (ret < 0) {
3033                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3034                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3035                                        ldb_errstring(ldb));
3036                 return LDB_ERR_OPERATIONS_ERROR;
3037         }
3038
3039         if (ctx.error != LDB_SUCCESS) {
3040                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3041                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3042                 return ctx.error;
3043         }
3044
3045         ctx.error = 0;
3046         ctx.count = 0;
3047
3048         /* now traverse adding any indexes for normal LDB records */
3049         ret = ltdb->kv_ops->iterate(ltdb, re_index, &ctx);
3050         if (ret < 0) {
3051                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3052                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3053                                        ldb_errstring(ldb));
3054                 return LDB_ERR_OPERATIONS_ERROR;
3055         }
3056
3057         if (ctx.error != LDB_SUCCESS) {
3058                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3059                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3060                 return ctx.error;
3061         }
3062
3063         if (ctx.count > 10000) {
3064                 ldb_debug(ldb_module_get_ctx(module),
3065                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3066                           "final index write-out will be in transaction commit",
3067                           ltdb->kv_ops->name(ltdb));
3068         }
3069         return LDB_SUCCESS;
3070 }