6c21c19d65473dc2356c3e846420af9b18eb3ab9
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(struct ldb_module *module)
203 {
204         struct ldb_kv_private *ldb_kv = talloc_get_type(
205             ldb_module_get_private(module), struct ldb_kv_private);
206         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
207         if (ldb_kv->idxptr == NULL) {
208                 return ldb_oom(ldb_module_get_ctx(module));
209         }
210
211         return LDB_SUCCESS;
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
219                                          const struct ldb_val *v2)
220 {
221         if (v1->length > v2->length) {
222                 return -1;
223         }
224         if (v1->length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1->data, v2->data, v1->length);
228 }
229
230 /*
231   see if two ldb_val structures contain exactly the same data
232   return -1 or 1 for a mismatch, 0 for match
233 */
234 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
235                                        const struct ldb_val *v2)
236 {
237         if (v1.length > v2->length) {
238                 return -1;
239         }
240         if (v1.length < v2->length) {
241                 return 1;
242         }
243         return memcmp(v1.data, v2->data, v1.length);
244 }
245
246
247 /*
248   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
249   binary-safe comparison for the 'dn' returns -1 if not found
250
251   This is therefore safe when the value is a GUID in the future
252  */
253 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
254                                    const struct dn_list *list,
255                                    const struct ldb_val *v)
256 {
257         unsigned int i;
258         struct ldb_val *exact = NULL, *next = NULL;
259
260         if (ldb_kv->cache->GUID_index_attribute == NULL) {
261                 for (i=0; i<list->count; i++) {
262                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
263                                 return i;
264                         }
265                 }
266                 return -1;
267         }
268
269         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
270                                 *v, ldb_val_equal_exact_ordered,
271                                 exact, next);
272         if (exact == NULL) {
273                 return -1;
274         }
275         /* Not required, but keeps the compiler quiet */
276         if (next != NULL) {
277                 return -1;
278         }
279
280         i = exact - list->dn;
281         return i;
282 }
283
284 /*
285   find a entry in a dn_list. Uses a case sensitive comparison with the dn
286   returns -1 if not found
287  */
288 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
289                                    struct dn_list *list,
290                                    const struct ldb_message *msg)
291 {
292         struct ldb_val v;
293         const struct ldb_val *key_val;
294         if (ldb_kv->cache->GUID_index_attribute == NULL) {
295                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
296                 v.data = discard_const_p(unsigned char, dn_str);
297                 v.length = strlen(dn_str);
298         } else {
299                 key_val = ldb_msg_find_ldb_val(
300                     msg, ldb_kv->cache->GUID_index_attribute);
301                 if (key_val == NULL) {
302                         return -1;
303                 }
304                 v = *key_val;
305         }
306         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
307 }
308
309 /*
310   this is effectively a cast function, but with lots of paranoia
311   checks and also copes with CPUs that are fussy about pointer
312   alignment
313  */
314 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
315                                            TDB_DATA rec,
316                                            bool check_parent)
317 {
318         struct dn_list *list;
319         if (rec.dsize != sizeof(void *)) {
320                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
321                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
322                 return NULL;
323         }
324         /* note that we can't just use a cast here, as rec.dptr may
325            not be aligned sufficiently for a pointer. A cast would cause
326            platforms like some ARM CPUs to crash */
327         memcpy(&list, rec.dptr, sizeof(void *));
328         list = talloc_get_type(list, struct dn_list);
329         if (list == NULL) {
330                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
331                                        "Bad type '%s' for idxptr",
332                                        talloc_get_name(list));
333                 return NULL;
334         }
335         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
336                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
337                                        "Bad parent '%s' for idxptr",
338                                        talloc_get_name(talloc_parent(list->dn)));
339                 return NULL;
340         }
341         return list;
342 }
343
344 /*
345   return the @IDX list in an index entry for a dn as a
346   struct dn_list
347  */
348 static int ldb_kv_dn_list_load(struct ldb_module *module,
349                                struct ldb_kv_private *ldb_kv,
350                                struct ldb_dn *dn,
351                                struct dn_list *list)
352 {
353         struct ldb_message *msg;
354         int ret, version;
355         struct ldb_message_element *el;
356         TDB_DATA rec;
357         struct dn_list *list2;
358         TDB_DATA key;
359
360         list->dn = NULL;
361         list->count = 0;
362
363         /* see if we have any in-memory index entries */
364         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
365                 goto normal_index;
366         }
367
368         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
369         key.dsize = strlen((char *)key.dptr);
370
371         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
372         if (rec.dptr == NULL) {
373                 goto normal_index;
374         }
375
376         /* we've found an in-memory index entry */
377         list2 = ldb_kv_index_idxptr(module, rec, true);
378         if (list2 == NULL) {
379                 free(rec.dptr);
380                 return LDB_ERR_OPERATIONS_ERROR;
381         }
382         free(rec.dptr);
383
384         *list = *list2;
385         return LDB_SUCCESS;
386
387 normal_index:
388         msg = ldb_msg_new(list);
389         if (msg == NULL) {
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392
393         ret = ldb_kv_search_dn1(module,
394                                 dn,
395                                 msg,
396                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
397                                     LDB_UNPACK_DATA_FLAG_NO_DN);
398         if (ret != LDB_SUCCESS) {
399                 talloc_free(msg);
400                 return ret;
401         }
402
403         el = ldb_msg_find_element(msg, LDB_KV_IDX);
404         if (!el) {
405                 talloc_free(msg);
406                 return LDB_SUCCESS;
407         }
408
409         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
410
411         /*
412          * we avoid copying the strings by stealing the list.  We have
413          * to steal msg onto el->values (which looks odd) because we
414          * asked for the memory to be allocated on msg, not on each
415          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
416          */
417         if (ldb_kv->cache->GUID_index_attribute == NULL) {
418                 /* check indexing version number */
419                 if (version != LDB_KV_INDEXING_VERSION) {
420                         ldb_debug_set(ldb_module_get_ctx(module),
421                                       LDB_DEBUG_ERROR,
422                                       "Wrong DN index version %d "
423                                       "expected %d for %s",
424                                       version, LDB_KV_INDEXING_VERSION,
425                                       ldb_dn_get_linearized(dn));
426                         talloc_free(msg);
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 talloc_steal(el->values, msg);
431                 list->dn = talloc_steal(list, el->values);
432                 list->count = el->num_values;
433         } else {
434                 unsigned int i;
435                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
436                         /* This is quite likely during the DB startup
437                            on first upgrade to using a GUID index */
438                         ldb_debug_set(ldb_module_get_ctx(module),
439                                       LDB_DEBUG_ERROR,
440                                       "Wrong GUID index version %d "
441                                       "expected %d for %s",
442                                       version, LDB_KV_GUID_INDEXING_VERSION,
443                                       ldb_dn_get_linearized(dn));
444                         talloc_free(msg);
445                         return LDB_ERR_OPERATIONS_ERROR;
446                 }
447
448                 if (el->num_values == 0) {
449                         talloc_free(msg);
450                         return LDB_ERR_OPERATIONS_ERROR;
451                 }
452
453                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
459                 list->dn = talloc_array(list, struct ldb_val, list->count);
460                 if (list->dn == NULL) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 /*
466                  * The actual data is on msg, due to
467                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
468                  */
469                 talloc_steal(list->dn, msg);
470                 for (i = 0; i < list->count; i++) {
471                         list->dn[i].data
472                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
473                         list->dn[i].length = LDB_KV_GUID_SIZE;
474                 }
475         }
476
477         /* We don't need msg->elements any more */
478         talloc_free(msg->elements);
479         return LDB_SUCCESS;
480 }
481
482 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
483                            struct ldb_kv_private *ldb_kv,
484                            TALLOC_CTX *mem_ctx,
485                            struct ldb_dn *dn,
486                            struct ldb_val *ldb_key)
487 {
488         struct ldb_context *ldb = ldb_module_get_ctx(module);
489         int ret;
490         int index = 0;
491         enum key_truncation truncation = KEY_NOT_TRUNCATED;
492         struct dn_list *list = talloc(mem_ctx, struct dn_list);
493         if (list == NULL) {
494                 ldb_oom(ldb);
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
499         if (ret != LDB_SUCCESS) {
500                 TALLOC_FREE(list);
501                 return ret;
502         }
503
504         if (list->count == 0) {
505                 TALLOC_FREE(list);
506                 return LDB_ERR_NO_SUCH_OBJECT;
507         }
508
509         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
510                 const char *dn_str = ldb_dn_get_linearized(dn);
511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
512                                        __location__
513                                        ": Failed to read DN index "
514                                        "against %s for %s: too many "
515                                        "values (%u > 1)",
516                                        ldb_kv->cache->GUID_index_attribute,
517                                        dn_str,
518                                        list->count);
519                 TALLOC_FREE(list);
520                 return LDB_ERR_CONSTRAINT_VIOLATION;
521         }
522
523         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
524                 /*
525                  * DN key has been truncated, need to inspect the actual
526                  * records to locate the actual DN
527                  */
528                 int i;
529                 index = -1;
530                 for (i=0; i < list->count; i++) {
531                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
532                         struct ldb_val key = {
533                                 .data = guid_key,
534                                 .length = sizeof(guid_key)
535                         };
536                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
537                         struct ldb_message *rec = ldb_msg_new(ldb);
538                         if (rec == NULL) {
539                                 TALLOC_FREE(list);
540                                 return LDB_ERR_OPERATIONS_ERROR;
541                         }
542
543                         ret = ldb_kv_idx_to_key(
544                             module, ldb_kv, ldb, &list->dn[i], &key);
545                         if (ret != LDB_SUCCESS) {
546                                 TALLOC_FREE(list);
547                                 TALLOC_FREE(rec);
548                                 return ret;
549                         }
550
551                         ret =
552                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
553                         if (key.data != guid_key) {
554                                 TALLOC_FREE(key.data);
555                         }
556                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
557                                 /*
558                                  * the record has disappeared?
559                                  * yes, this can happen
560                                  */
561                                 TALLOC_FREE(rec);
562                                 continue;
563                         }
564
565                         if (ret != LDB_SUCCESS) {
566                                 /* an internal error */
567                                 TALLOC_FREE(rec);
568                                 TALLOC_FREE(list);
569                                 return LDB_ERR_OPERATIONS_ERROR;
570                         }
571
572                         /*
573                          * We found the actual DN that we wanted from in the
574                          * multiple values that matched the index
575                          * (due to truncation), so return that.
576                          *
577                          */
578                         if (ldb_dn_compare(dn, rec->dn) == 0) {
579                                 index = i;
580                                 TALLOC_FREE(rec);
581                                 break;
582                         }
583                 }
584
585                 /*
586                  * We matched the index but the actual DN we wanted
587                  * was not here.
588                  */
589                 if (index == -1) {
590                         TALLOC_FREE(list);
591                         return LDB_ERR_NO_SUCH_OBJECT;
592                 }
593         }
594
595         /* The ldb_key memory is allocated by the caller */
596         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
597         TALLOC_FREE(list);
598
599         if (ret != LDB_SUCCESS) {
600                 return LDB_ERR_OPERATIONS_ERROR;
601         }
602
603         return LDB_SUCCESS;
604 }
605
606
607
608 /*
609   save a dn_list into a full @IDX style record
610  */
611 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
612                                      struct ldb_kv_private *ldb_kv,
613                                      struct ldb_dn *dn,
614                                      struct dn_list *list)
615 {
616         struct ldb_message *msg;
617         int ret;
618
619         msg = ldb_msg_new(module);
620         if (!msg) {
621                 return ldb_module_oom(module);
622         }
623
624         msg->dn = dn;
625
626         if (list->count == 0) {
627                 ret = ldb_kv_delete_noindex(module, msg);
628                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
629                         ret = LDB_SUCCESS;
630                 }
631                 talloc_free(msg);
632                 return ret;
633         }
634
635         if (ldb_kv->cache->GUID_index_attribute == NULL) {
636                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
637                                       LDB_KV_INDEXING_VERSION);
638                 if (ret != LDB_SUCCESS) {
639                         talloc_free(msg);
640                         return ldb_module_oom(module);
641                 }
642         } else {
643                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
644                                       LDB_KV_GUID_INDEXING_VERSION);
645                 if (ret != LDB_SUCCESS) {
646                         talloc_free(msg);
647                         return ldb_module_oom(module);
648                 }
649         }
650
651         if (list->count > 0) {
652                 struct ldb_message_element *el;
653
654                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
655                 if (ret != LDB_SUCCESS) {
656                         talloc_free(msg);
657                         return ldb_module_oom(module);
658                 }
659
660                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
661                         el->values = list->dn;
662                         el->num_values = list->count;
663                 } else {
664                         struct ldb_val v;
665                         unsigned int i;
666                         el->values = talloc_array(msg,
667                                                   struct ldb_val, 1);
668                         if (el->values == NULL) {
669                                 talloc_free(msg);
670                                 return ldb_module_oom(module);
671                         }
672
673                         v.data = talloc_array_size(el->values,
674                                                    list->count,
675                                                    LDB_KV_GUID_SIZE);
676                         if (v.data == NULL) {
677                                 talloc_free(msg);
678                                 return ldb_module_oom(module);
679                         }
680
681                         v.length = talloc_get_size(v.data);
682
683                         for (i = 0; i < list->count; i++) {
684                                 if (list->dn[i].length !=
685                                     LDB_KV_GUID_SIZE) {
686                                         talloc_free(msg);
687                                         return ldb_module_operr(module);
688                                 }
689                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
690                                        list->dn[i].data,
691                                        LDB_KV_GUID_SIZE);
692                         }
693                         el->values[0] = v;
694                         el->num_values = 1;
695                 }
696         }
697
698         ret = ldb_kv_store(module, msg, TDB_REPLACE);
699         talloc_free(msg);
700         return ret;
701 }
702
703 /*
704   save a dn_list into the database, in either @IDX or internal format
705  */
706 static int ldb_kv_dn_list_store(struct ldb_module *module,
707                                 struct ldb_dn *dn,
708                                 struct dn_list *list)
709 {
710         struct ldb_kv_private *ldb_kv = talloc_get_type(
711             ldb_module_get_private(module), struct ldb_kv_private);
712         TDB_DATA rec, key;
713         int ret;
714         struct dn_list *list2;
715
716         if (ldb_kv->idxptr == NULL) {
717                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
718         }
719
720         if (ldb_kv->idxptr->itdb == NULL) {
721                 ldb_kv->idxptr->itdb =
722                     tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
723                 if (ldb_kv->idxptr->itdb == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
729         if (key.dptr == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         key.dsize = strlen((char *)key.dptr);
733
734         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
735         if (rec.dptr != NULL) {
736                 list2 = ldb_kv_index_idxptr(module, rec, false);
737                 if (list2 == NULL) {
738                         free(rec.dptr);
739                         return LDB_ERR_OPERATIONS_ERROR;
740                 }
741                 free(rec.dptr);
742                 list2->dn = talloc_steal(list2, list->dn);
743                 list2->count = list->count;
744                 return LDB_SUCCESS;
745         }
746
747         list2 = talloc(ldb_kv->idxptr, struct dn_list);
748         if (list2 == NULL) {
749                 return LDB_ERR_OPERATIONS_ERROR;
750         }
751         list2->dn = talloc_steal(list2, list->dn);
752         list2->count = list->count;
753
754         rec.dptr = (uint8_t *)&list2;
755         rec.dsize = sizeof(void *);
756
757
758         /*
759          * This is not a store into the main DB, but into an in-memory
760          * TDB, so we don't need a guard on ltdb->read_only
761          */
762         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
763         if (ret != 0) {
764                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
765         }
766         return LDB_SUCCESS;
767 }
768
769 /*
770   traverse function for storing the in-memory index entries on disk
771  */
772 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
773                                        TDB_DATA key,
774                                        TDB_DATA data,
775                                        void *state)
776 {
777         struct ldb_module *module = state;
778         struct ldb_kv_private *ldb_kv = talloc_get_type(
779             ldb_module_get_private(module), struct ldb_kv_private);
780         struct ldb_dn *dn;
781         struct ldb_context *ldb = ldb_module_get_ctx(module);
782         struct ldb_val v;
783         struct dn_list *list;
784
785         list = ldb_kv_index_idxptr(module, data, true);
786         if (list == NULL) {
787                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
788                 return -1;
789         }
790
791         v.data = key.dptr;
792         v.length = strnlen((char *)key.dptr, key.dsize);
793
794         dn = ldb_dn_from_ldb_val(module, ldb, &v);
795         if (dn == NULL) {
796                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
797                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
798                 return -1;
799         }
800
801         ldb_kv->idxptr->error =
802             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
803         talloc_free(dn);
804         if (ldb_kv->idxptr->error != 0) {
805                 return -1;
806         }
807         return 0;
808 }
809
810 /* cleanup the idxptr mode when transaction commits */
811 int ldb_kv_index_transaction_commit(struct ldb_module *module)
812 {
813         struct ldb_kv_private *ldb_kv = talloc_get_type(
814             ldb_module_get_private(module), struct ldb_kv_private);
815         int ret;
816
817         struct ldb_context *ldb = ldb_module_get_ctx(module);
818
819         ldb_reset_err_string(ldb);
820
821         if (ldb_kv->idxptr->itdb) {
822                 tdb_traverse(
823                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
824                 tdb_close(ldb_kv->idxptr->itdb);
825         }
826
827         ret = ldb_kv->idxptr->error;
828         if (ret != LDB_SUCCESS) {
829                 if (!ldb_errstring(ldb)) {
830                         ldb_set_errstring(ldb, ldb_strerror(ret));
831                 }
832                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
833         }
834
835         talloc_free(ldb_kv->idxptr);
836         ldb_kv->idxptr = NULL;
837         return ret;
838 }
839
840 /* cleanup the idxptr mode when transaction cancels */
841 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
842 {
843         struct ldb_kv_private *ldb_kv = talloc_get_type(
844             ldb_module_get_private(module), struct ldb_kv_private);
845         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
846                 tdb_close(ldb_kv->idxptr->itdb);
847         }
848         talloc_free(ldb_kv->idxptr);
849         ldb_kv->idxptr = NULL;
850         return LDB_SUCCESS;
851 }
852
853
854 /*
855   return the dn key to be used for an index
856   the caller is responsible for freeing
857 */
858 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
859                                        struct ldb_kv_private *ldb_kv,
860                                        const char *attr,
861                                        const struct ldb_val *value,
862                                        const struct ldb_schema_attribute **ap,
863                                        enum key_truncation *truncation)
864 {
865         struct ldb_dn *ret;
866         struct ldb_val v;
867         const struct ldb_schema_attribute *a = NULL;
868         char *attr_folded = NULL;
869         const char *attr_for_dn = NULL;
870         int r;
871         bool should_b64_encode;
872
873         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
874         size_t key_len = 0;
875         size_t attr_len = 0;
876         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
877         unsigned frmt_len = 0;
878         const size_t additional_key_length = 4;
879         unsigned int num_separators = 3; /* Estimate for overflow check */
880         const size_t min_data = 1;
881         const size_t min_key_length = additional_key_length
882                 + indx_len + num_separators + min_data;
883
884         if (attr[0] == '@') {
885                 attr_for_dn = attr;
886                 v = *value;
887                 if (ap != NULL) {
888                         *ap = NULL;
889                 }
890         } else {
891                 attr_folded = ldb_attr_casefold(ldb, attr);
892                 if (!attr_folded) {
893                         return NULL;
894                 }
895
896                 attr_for_dn = attr_folded;
897
898                 a = ldb_schema_attribute_by_name(ldb, attr);
899                 if (ap) {
900                         *ap = a;
901                 }
902                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
903                 if (r != LDB_SUCCESS) {
904                         const char *errstr = ldb_errstring(ldb);
905                         /* canonicalisation can be refused. For
906                            example, a attribute that takes wildcards
907                            will refuse to canonicalise if the value
908                            contains a wildcard */
909                         ldb_asprintf_errstring(ldb,
910                                                "Failed to create index "
911                                                "key for attribute '%s':%s%s%s",
912                                                attr, ldb_strerror(r),
913                                                (errstr?":":""),
914                                                (errstr?errstr:""));
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918         }
919         attr_len = strlen(attr_for_dn);
920
921         /*
922          * Check if there is any hope this will fit into the DB.
923          * Overflow here is not actually critical the code below
924          * checks again to make the printf and the DB does another
925          * check for too long keys
926          */
927         if (max_key_length - attr_len < min_key_length) {
928                 ldb_asprintf_errstring(
929                         ldb,
930                         __location__ ": max_key_length "
931                         "is too small (%u) < (%u)",
932                         max_key_length,
933                         (unsigned)(min_key_length + attr_len));
934                 talloc_free(attr_folded);
935                 return NULL;
936         }
937
938         /*
939          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
940          * "DN=" and a trailing string terminator
941          */
942         max_key_length -= additional_key_length;
943
944         /*
945          * We do not base 64 encode a DN in a key, it has already been
946          * casefold and lineraized, that is good enough.  That already
947          * avoids embedded NUL etc.
948          */
949         if (ldb_kv->cache->GUID_index_attribute != NULL) {
950                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
951                         should_b64_encode = false;
952                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
953                         /*
954                          * We can only change the behaviour for IDXONE
955                          * when the GUID index is enabled
956                          */
957                         should_b64_encode = false;
958                 } else {
959                         should_b64_encode
960                                 = ldb_should_b64_encode(ldb, &v);
961                 }
962         } else {
963                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
964         }
965
966         if (should_b64_encode) {
967                 size_t vstr_len = 0;
968                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
969                 if (!vstr) {
970                         talloc_free(attr_folded);
971                         return NULL;
972                 }
973                 vstr_len = strlen(vstr);
974                 /*
975                  * Overflow here is not critical as we only use this
976                  * to choose the printf truncation
977                  */
978                 key_len = num_separators + indx_len + attr_len + vstr_len;
979                 if (key_len > max_key_length) {
980                         size_t excess = key_len - max_key_length;
981                         frmt_len = vstr_len - excess;
982                         *truncation = KEY_TRUNCATED;
983                         /*
984                         * Truncated keys are placed in a separate key space
985                         * from the non truncated keys
986                         * Note: the double hash "##" is not a typo and
987                         * indicates that the following value is base64 encoded
988                         */
989                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
990                                              LDB_KV_INDEX, attr_for_dn,
991                                              frmt_len, vstr);
992                 } else {
993                         frmt_len = vstr_len;
994                         *truncation = KEY_NOT_TRUNCATED;
995                         /*
996                          * Note: the double colon "::" is not a typo and
997                          * indicates that the following value is base64 encoded
998                          */
999                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1000                                              LDB_KV_INDEX, attr_for_dn,
1001                                              frmt_len, vstr);
1002                 }
1003                 talloc_free(vstr);
1004         } else {
1005                 /* Only need two seperators */
1006                 num_separators = 2;
1007
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = v.length - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                          * Truncated keys are placed in a separate key space
1019                          * from the non truncated keys
1020                          */
1021                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1022                                              LDB_KV_INDEX, attr_for_dn,
1023                                              frmt_len, (char *)v.data);
1024                 } else {
1025                         frmt_len = v.length;
1026                         *truncation = KEY_NOT_TRUNCATED;
1027                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1028                                              LDB_KV_INDEX, attr_for_dn,
1029                                              frmt_len, (char *)v.data);
1030                 }
1031         }
1032
1033         if (v.data != value->data) {
1034                 talloc_free(v.data);
1035         }
1036         talloc_free(attr_folded);
1037
1038         return ret;
1039 }
1040
1041 /*
1042   see if a attribute value is in the list of indexed attributes
1043 */
1044 static bool ldb_kv_is_indexed(struct ldb_module *module,
1045                               struct ldb_kv_private *ldb_kv,
1046                               const char *attr)
1047 {
1048         struct ldb_context *ldb = ldb_module_get_ctx(module);
1049         unsigned int i;
1050         struct ldb_message_element *el;
1051
1052         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1053             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1054                 /* Implicity covered, this is the index key */
1055                 return false;
1056         }
1057         if (ldb->schema.index_handler_override) {
1058                 const struct ldb_schema_attribute *a
1059                         = ldb_schema_attribute_by_name(ldb, attr);
1060
1061                 if (a == NULL) {
1062                         return false;
1063                 }
1064
1065                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1066                         return true;
1067                 } else {
1068                         return false;
1069                 }
1070         }
1071
1072         if (!ldb_kv->cache->attribute_indexes) {
1073                 return false;
1074         }
1075
1076         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1077         if (el == NULL) {
1078                 return false;
1079         }
1080
1081         /* TODO: this is too expensive! At least use a binary search */
1082         for (i=0; i<el->num_values; i++) {
1083                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1084                         return true;
1085                 }
1086         }
1087         return false;
1088 }
1089
1090 /*
1091   in the following logic functions, the return value is treated as
1092   follows:
1093
1094      LDB_SUCCESS: we found some matching index values
1095
1096      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1097
1098      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1099                                we'll need a full search
1100  */
1101
1102 /*
1103   return a list of dn's that might match a simple indexed search (an
1104   equality search only)
1105  */
1106 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1107                                   struct ldb_kv_private *ldb_kv,
1108                                   const struct ldb_parse_tree *tree,
1109                                   struct dn_list *list)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_dn *dn;
1113         int ret;
1114         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1115
1116         ldb = ldb_module_get_ctx(module);
1117
1118         list->count = 0;
1119         list->dn = NULL;
1120
1121         /* if the attribute isn't in the list of indexed attributes then
1122            this node needs a full search */
1123         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* the attribute is indexed. Pull the list of DNs that match the
1128            search criterion */
1129         dn = ldb_kv_index_key(ldb,
1130                               ldb_kv,
1131                               tree->u.equality.attr,
1132                               &tree->u.equality.value,
1133                               NULL,
1134                               &truncation);
1135         /*
1136          * We ignore truncation here and allow multi-valued matches
1137          * as ltdb_search_indexed will filter out the wrong one in
1138          * ltdb_index_filter() which calls ldb_match_message().
1139          */
1140         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1141
1142         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1143         talloc_free(dn);
1144         return ret;
1145 }
1146
1147 static bool list_union(struct ldb_context *ldb,
1148                        struct ldb_kv_private *ldb_kv,
1149                        struct dn_list *list,
1150                        struct dn_list *list2);
1151
1152 /*
1153   return a list of dn's that might match a leaf indexed search
1154  */
1155 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1156                                 struct ldb_kv_private *ldb_kv,
1157                                 const struct ldb_parse_tree *tree,
1158                                 struct dn_list *list)
1159 {
1160         if (ldb_kv->disallow_dn_filter &&
1161             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1162                 /* in AD mode we do not support "(dn=...)" search filters */
1163                 list->dn = NULL;
1164                 list->count = 0;
1165                 return LDB_SUCCESS;
1166         }
1167         if (tree->u.equality.attr[0] == '@') {
1168                 /* Do not allow a indexed search against an @ */
1169                 list->dn = NULL;
1170                 list->count = 0;
1171                 return LDB_SUCCESS;
1172         }
1173         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1174                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1175                 bool valid_dn = false;
1176                 struct ldb_dn *dn
1177                         = ldb_dn_from_ldb_val(list,
1178                                               ldb_module_get_ctx(module),
1179                                               &tree->u.equality.value);
1180                 if (dn == NULL) {
1181                         /* If we can't parse it, no match */
1182                         list->dn = NULL;
1183                         list->count = 0;
1184                         return LDB_SUCCESS;
1185                 }
1186
1187                 valid_dn = ldb_dn_validate(dn);
1188                 if (valid_dn == false) {
1189                         /* If we can't parse it, no match */
1190                         list->dn = NULL;
1191                         list->count = 0;
1192                         return LDB_SUCCESS;
1193                 }
1194
1195                 /*
1196                  * Re-use the same code we use for a SCOPE_BASE
1197                  * search
1198                  *
1199                  * We can't call TALLOC_FREE(dn) as this must belong
1200                  * to list for the memory to remain valid.
1201                  */
1202                 return ldb_kv_index_dn_base_dn(
1203                     module, ldb_kv, dn, list, &truncation);
1204                 /*
1205                  * We ignore truncation here and allow multi-valued matches
1206                  * as ltdb_search_indexed will filter out the wrong one in
1207                  * ltdb_index_filter() which calls ldb_match_message().
1208                  */
1209
1210         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1211                    (ldb_attr_cmp(tree->u.equality.attr,
1212                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1213                 int ret;
1214                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1215                 list->dn = talloc_array(list, struct ldb_val, 1);
1216                 if (list->dn == NULL) {
1217                         ldb_module_oom(module);
1218                         return LDB_ERR_OPERATIONS_ERROR;
1219                 }
1220                 /*
1221                  * We need to go via the canonicalise_fn() to
1222                  * ensure we get the index in binary, rather
1223                  * than a string
1224                  */
1225                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1226                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1227                 if (ret != LDB_SUCCESS) {
1228                         return LDB_ERR_OPERATIONS_ERROR;
1229                 }
1230                 list->count = 1;
1231                 return LDB_SUCCESS;
1232         }
1233
1234         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1235 }
1236
1237
1238 /*
1239   list intersection
1240   list = list & list2
1241 */
1242 static bool list_intersect(struct ldb_context *ldb,
1243                            struct ldb_kv_private *ldb_kv,
1244                            struct dn_list *list,
1245                            const struct dn_list *list2)
1246 {
1247         const struct dn_list *short_list, *long_list;
1248         struct dn_list *list3;
1249         unsigned int i;
1250
1251         if (list->count == 0) {
1252                 /* 0 & X == 0 */
1253                 return true;
1254         }
1255         if (list2->count == 0) {
1256                 /* X & 0 == 0 */
1257                 list->count = 0;
1258                 list->dn = NULL;
1259                 return true;
1260         }
1261
1262         /* the indexing code is allowed to return a longer list than
1263            what really matches, as all results are filtered by the
1264            full expression at the end - this shortcut avoids a lot of
1265            work in some cases */
1266         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1267                 return true;
1268         }
1269         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1270                 list->count = list2->count;
1271                 list->dn = list2->dn;
1272                 /* note that list2 may not be the parent of list2->dn,
1273                    as list2->dn may be owned by ltdb->idxptr. In that
1274                    case we expect this reparent call to fail, which is
1275                    OK */
1276                 talloc_reparent(list2, list, list2->dn);
1277                 return true;
1278         }
1279
1280         if (list->count > list2->count) {
1281                 short_list = list2;
1282                 long_list = list;
1283         } else {
1284                 short_list = list;
1285                 long_list = list2;
1286         }
1287
1288         list3 = talloc_zero(list, struct dn_list);
1289         if (list3 == NULL) {
1290                 return false;
1291         }
1292
1293         list3->dn = talloc_array(list3, struct ldb_val,
1294                                  MIN(list->count, list2->count));
1295         if (!list3->dn) {
1296                 talloc_free(list3);
1297                 return false;
1298         }
1299         list3->count = 0;
1300
1301         for (i=0;i<short_list->count;i++) {
1302                 /* For the GUID index case, this is a binary search */
1303                 if (ldb_kv_dn_list_find_val(
1304                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1305                         list3->dn[list3->count] = short_list->dn[i];
1306                         list3->count++;
1307                 }
1308         }
1309
1310         list->strict |= list2->strict;
1311         list->dn = talloc_steal(list, list3->dn);
1312         list->count = list3->count;
1313         talloc_free(list3);
1314
1315         return true;
1316 }
1317
1318
1319 /*
1320   list union
1321   list = list | list2
1322 */
1323 static bool list_union(struct ldb_context *ldb,
1324                        struct ldb_kv_private *ldb_kv,
1325                        struct dn_list *list,
1326                        struct dn_list *list2)
1327 {
1328         struct ldb_val *dn3;
1329         unsigned int i = 0, j = 0, k = 0;
1330
1331         if (list2->count == 0) {
1332                 /* X | 0 == X */
1333                 return true;
1334         }
1335
1336         if (list->count == 0) {
1337                 /* 0 | X == X */
1338                 list->count = list2->count;
1339                 list->dn = list2->dn;
1340                 /* note that list2 may not be the parent of list2->dn,
1341                    as list2->dn may be owned by ltdb->idxptr. In that
1342                    case we expect this reparent call to fail, which is
1343                    OK */
1344                 talloc_reparent(list2, list, list2->dn);
1345                 return true;
1346         }
1347
1348         /*
1349          * Sort the lists (if not in GUID DN mode) so we can do
1350          * the de-duplication during the merge
1351          *
1352          * NOTE: This can sort the in-memory index values, as list or
1353          * list2 might not be a copy!
1354          */
1355         ldb_kv_dn_list_sort(ldb_kv, list);
1356         ldb_kv_dn_list_sort(ldb_kv, list2);
1357
1358         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1359         if (!dn3) {
1360                 ldb_oom(ldb);
1361                 return false;
1362         }
1363
1364         while (i < list->count || j < list2->count) {
1365                 int cmp;
1366                 if (i >= list->count) {
1367                         cmp = 1;
1368                 } else if (j >= list2->count) {
1369                         cmp = -1;
1370                 } else {
1371                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1372                                                           &list2->dn[j]);
1373                 }
1374
1375                 if (cmp < 0) {
1376                         /* Take list */
1377                         dn3[k] = list->dn[i];
1378                         i++;
1379                         k++;
1380                 } else if (cmp > 0) {
1381                         /* Take list2 */
1382                         dn3[k] = list2->dn[j];
1383                         j++;
1384                         k++;
1385                 } else {
1386                         /* Equal, take list */
1387                         dn3[k] = list->dn[i];
1388                         i++;
1389                         j++;
1390                         k++;
1391                 }
1392         }
1393
1394         list->dn = dn3;
1395         list->count = k;
1396
1397         return true;
1398 }
1399
1400 static int ldb_kv_index_dn(struct ldb_module *module,
1401                            struct ldb_kv_private *ldb_kv,
1402                            const struct ldb_parse_tree *tree,
1403                            struct dn_list *list);
1404
1405 /*
1406   process an OR list (a union)
1407  */
1408 static int ldb_kv_index_dn_or(struct ldb_module *module,
1409                               struct ldb_kv_private *ldb_kv,
1410                               const struct ldb_parse_tree *tree,
1411                               struct dn_list *list)
1412 {
1413         struct ldb_context *ldb;
1414         unsigned int i;
1415
1416         ldb = ldb_module_get_ctx(module);
1417
1418         list->dn = NULL;
1419         list->count = 0;
1420
1421         for (i=0; i<tree->u.list.num_elements; i++) {
1422                 struct dn_list *list2;
1423                 int ret;
1424
1425                 list2 = talloc_zero(list, struct dn_list);
1426                 if (list2 == NULL) {
1427                         return LDB_ERR_OPERATIONS_ERROR;
1428                 }
1429
1430                 ret = ldb_kv_index_dn(
1431                     module, ldb_kv, tree->u.list.elements[i], list2);
1432
1433                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1434                         /* X || 0 == X */
1435                         talloc_free(list2);
1436                         continue;
1437                 }
1438
1439                 if (ret != LDB_SUCCESS) {
1440                         /* X || * == * */
1441                         talloc_free(list2);
1442                         return ret;
1443                 }
1444
1445                 if (!list_union(ldb, ldb_kv, list, list2)) {
1446                         talloc_free(list2);
1447                         return LDB_ERR_OPERATIONS_ERROR;
1448                 }
1449         }
1450
1451         if (list->count == 0) {
1452                 return LDB_ERR_NO_SUCH_OBJECT;
1453         }
1454
1455         return LDB_SUCCESS;
1456 }
1457
1458
1459 /*
1460   NOT an index results
1461  */
1462 static int ldb_kv_index_dn_not(struct ldb_module *module,
1463                                struct ldb_kv_private *ldb_kv,
1464                                const struct ldb_parse_tree *tree,
1465                                struct dn_list *list)
1466 {
1467         /* the only way to do an indexed not would be if we could
1468            negate the not via another not or if we knew the total
1469            number of database elements so we could know that the
1470            existing expression covered the whole database.
1471
1472            instead, we just give up, and rely on a full index scan
1473            (unless an outer & manages to reduce the list)
1474         */
1475         return LDB_ERR_OPERATIONS_ERROR;
1476 }
1477
1478 /*
1479  * These things are unique, so avoid a full scan if this is a search
1480  * by GUID, DN or a unique attribute
1481  */
1482 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1483                                 struct ldb_kv_private *ldb_kv,
1484                                 const char *attr)
1485 {
1486         const struct ldb_schema_attribute *a;
1487         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1488                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1489                     0) {
1490                         return true;
1491                 }
1492         }
1493         if (ldb_attr_dn(attr) == 0) {
1494                 return true;
1495         }
1496
1497         a = ldb_schema_attribute_by_name(ldb, attr);
1498         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1499                 return true;
1500         }
1501         return false;
1502 }
1503
1504 /*
1505   process an AND expression (intersection)
1506  */
1507 static int ldb_kv_index_dn_and(struct ldb_module *module,
1508                                struct ldb_kv_private *ldb_kv,
1509                                const struct ldb_parse_tree *tree,
1510                                struct dn_list *list)
1511 {
1512         struct ldb_context *ldb;
1513         unsigned int i;
1514         bool found;
1515
1516         ldb = ldb_module_get_ctx(module);
1517
1518         list->dn = NULL;
1519         list->count = 0;
1520
1521         /* in the first pass we only look for unique simple
1522            equality tests, in the hope of avoiding having to look
1523            at any others */
1524         for (i=0; i<tree->u.list.num_elements; i++) {
1525                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1526                 int ret;
1527
1528                 if (subtree->operation != LDB_OP_EQUALITY ||
1529                     !ldb_kv_index_unique(
1530                         ldb, ldb_kv, subtree->u.equality.attr)) {
1531                         continue;
1532                 }
1533
1534                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1535                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1536                         /* 0 && X == 0 */
1537                         return LDB_ERR_NO_SUCH_OBJECT;
1538                 }
1539                 if (ret == LDB_SUCCESS) {
1540                         /* a unique index match means we can
1541                          * stop. Note that we don't care if we return
1542                          * a few too many objects, due to later
1543                          * filtering */
1544                         return LDB_SUCCESS;
1545                 }
1546         }
1547
1548         /* now do a full intersection */
1549         found = false;
1550
1551         for (i=0; i<tree->u.list.num_elements; i++) {
1552                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1553                 struct dn_list *list2;
1554                 int ret;
1555
1556                 list2 = talloc_zero(list, struct dn_list);
1557                 if (list2 == NULL) {
1558                         return ldb_module_oom(module);
1559                 }
1560
1561                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1562
1563                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1564                         /* X && 0 == 0 */
1565                         list->dn = NULL;
1566                         list->count = 0;
1567                         talloc_free(list2);
1568                         return LDB_ERR_NO_SUCH_OBJECT;
1569                 }
1570
1571                 if (ret != LDB_SUCCESS) {
1572                         /* this didn't adding anything */
1573                         talloc_free(list2);
1574                         continue;
1575                 }
1576
1577                 if (!found) {
1578                         talloc_reparent(list2, list, list->dn);
1579                         list->dn = list2->dn;
1580                         list->count = list2->count;
1581                         found = true;
1582                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1583                         talloc_free(list2);
1584                         return LDB_ERR_OPERATIONS_ERROR;
1585                 }
1586
1587                 if (list->count == 0) {
1588                         list->dn = NULL;
1589                         return LDB_ERR_NO_SUCH_OBJECT;
1590                 }
1591
1592                 if (list->count < 2) {
1593                         /* it isn't worth loading the next part of the tree */
1594                         return LDB_SUCCESS;
1595                 }
1596         }
1597
1598         if (!found) {
1599                 /* none of the attributes were indexed */
1600                 return LDB_ERR_OPERATIONS_ERROR;
1601         }
1602
1603         return LDB_SUCCESS;
1604 }
1605
1606 /*
1607   return a list of matching objects using a one-level index
1608  */
1609 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1610                                 struct ldb_kv_private *ldb_kv,
1611                                 const char *attr,
1612                                 struct ldb_dn *dn,
1613                                 struct dn_list *list,
1614                                 enum key_truncation *truncation)
1615 {
1616         struct ldb_context *ldb;
1617         struct ldb_dn *key;
1618         struct ldb_val val;
1619         int ret;
1620
1621         ldb = ldb_module_get_ctx(module);
1622
1623         /* work out the index key from the parent DN */
1624         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1625         if (val.data == NULL) {
1626                 const char *dn_str = ldb_dn_get_linearized(dn);
1627                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1628                                        __location__
1629                                        ": Failed to get casefold DN "
1630                                        "from: %s",
1631                                        dn_str);
1632                 return LDB_ERR_OPERATIONS_ERROR;
1633         }
1634         val.length = strlen((char *)val.data);
1635         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1636         if (!key) {
1637                 ldb_oom(ldb);
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1642         talloc_free(key);
1643         if (ret != LDB_SUCCESS) {
1644                 return ret;
1645         }
1646
1647         if (list->count == 0) {
1648                 return LDB_ERR_NO_SUCH_OBJECT;
1649         }
1650
1651         return LDB_SUCCESS;
1652 }
1653
1654 /*
1655   return a list of matching objects using a one-level index
1656  */
1657 static int ldb_kv_index_dn_one(struct ldb_module *module,
1658                                struct ldb_kv_private *ldb_kv,
1659                                struct ldb_dn *parent_dn,
1660                                struct dn_list *list,
1661                                enum key_truncation *truncation)
1662 {
1663         /* Ensure we do not shortcut on intersection for this list */
1664         list->strict = true;
1665         return ldb_kv_index_dn_attr(
1666             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
1667 }
1668
1669 /*
1670   return a list of matching objects using the DN index
1671  */
1672 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1673                                    struct ldb_kv_private *ldb_kv,
1674                                    struct ldb_dn *base_dn,
1675                                    struct dn_list *dn_list,
1676                                    enum key_truncation *truncation)
1677 {
1678         const struct ldb_val *guid_val = NULL;
1679         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1680                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1681                 if (dn_list->dn == NULL) {
1682                         return ldb_module_oom(module);
1683                 }
1684                 dn_list->dn[0].data = discard_const_p(unsigned char,
1685                                                       ldb_dn_get_linearized(base_dn));
1686                 if (dn_list->dn[0].data == NULL) {
1687                         talloc_free(dn_list->dn);
1688                         return ldb_module_oom(module);
1689                 }
1690                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1691                 dn_list->count = 1;
1692
1693                 return LDB_SUCCESS;
1694         }
1695
1696         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1697                 guid_val = ldb_dn_get_extended_component(
1698                     base_dn, ldb_kv->cache->GUID_index_dn_component);
1699         }
1700
1701         if (guid_val != NULL) {
1702                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1703                 if (dn_list->dn == NULL) {
1704                         return ldb_module_oom(module);
1705                 }
1706                 dn_list->dn[0].data = guid_val->data;
1707                 dn_list->dn[0].length = guid_val->length;
1708                 dn_list->count = 1;
1709
1710                 return LDB_SUCCESS;
1711         }
1712
1713         return ldb_kv_index_dn_attr(
1714             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
1715 }
1716
1717 /*
1718   return a list of dn's that might match a indexed search or
1719   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1720  */
1721 static int ldb_kv_index_dn(struct ldb_module *module,
1722                            struct ldb_kv_private *ldb_kv,
1723                            const struct ldb_parse_tree *tree,
1724                            struct dn_list *list)
1725 {
1726         int ret = LDB_ERR_OPERATIONS_ERROR;
1727
1728         switch (tree->operation) {
1729         case LDB_OP_AND:
1730                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1731                 break;
1732
1733         case LDB_OP_OR:
1734                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1735                 break;
1736
1737         case LDB_OP_NOT:
1738                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1739                 break;
1740
1741         case LDB_OP_EQUALITY:
1742                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1743                 break;
1744
1745         case LDB_OP_SUBSTRING:
1746         case LDB_OP_GREATER:
1747         case LDB_OP_LESS:
1748         case LDB_OP_PRESENT:
1749         case LDB_OP_APPROX:
1750         case LDB_OP_EXTENDED:
1751                 /* we can't index with fancy bitops yet */
1752                 ret = LDB_ERR_OPERATIONS_ERROR;
1753                 break;
1754         }
1755
1756         return ret;
1757 }
1758
1759 /*
1760   filter a candidate dn_list from an indexed search into a set of results
1761   extracting just the given attributes
1762 */
1763 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1764                                const struct dn_list *dn_list,
1765                                struct ldb_kv_context *ac,
1766                                uint32_t *match_count,
1767                                enum key_truncation scope_one_truncation)
1768 {
1769         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1770         struct ldb_message *msg;
1771         struct ldb_message *filtered_msg;
1772         unsigned int i;
1773         unsigned int num_keys = 0;
1774         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
1775         struct ldb_val *keys = NULL;
1776
1777         /*
1778          * We have to allocate the key list (rather than just walk the
1779          * caller supplied list) as the callback could change the list
1780          * (by modifying an indexed attribute hosted in the in-memory
1781          * index cache!)
1782          */
1783         keys = talloc_array(ac, struct ldb_val, dn_list->count);
1784         if (keys == NULL) {
1785                 return ldb_module_oom(ac->module);
1786         }
1787
1788         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1789                 /*
1790                  * We speculate that the keys will be GUID based and so
1791                  * pre-fill in enough space for a GUID (avoiding a pile of
1792                  * small allocations)
1793                  */
1794                 struct guid_tdb_key {
1795                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
1796                 } *key_values = NULL;
1797
1798                 key_values = talloc_array(keys,
1799                                           struct guid_tdb_key,
1800                                           dn_list->count);
1801
1802                 if (key_values == NULL) {
1803                         talloc_free(keys);
1804                         return ldb_module_oom(ac->module);
1805                 }
1806                 for (i = 0; i < dn_list->count; i++) {
1807                         keys[i].data = key_values[i].guid_key;
1808                         keys[i].length = sizeof(key_values[i].guid_key);
1809                 }
1810         } else {
1811                 for (i = 0; i < dn_list->count; i++) {
1812                         keys[i].data = NULL;
1813                         keys[i].length = 0;
1814                 }
1815         }
1816
1817         for (i = 0; i < dn_list->count; i++) {
1818                 int ret;
1819
1820                 ret = ldb_kv_idx_to_key(
1821                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1822                 if (ret != LDB_SUCCESS) {
1823                         talloc_free(keys);
1824                         return ret;
1825                 }
1826
1827                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1828                         /*
1829                          * If we are in GUID index mode, then the dn_list is
1830                          * sorted.  If we got a duplicate, forget about it, as
1831                          * otherwise we would send the same entry back more
1832                          * than once.
1833                          *
1834                          * This is needed in the truncated DN case, or if a
1835                          * duplicate was forced in via
1836                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1837                          */
1838
1839                         if (memcmp(previous_guid_key,
1840                                    keys[num_keys].data,
1841                                    sizeof(previous_guid_key)) == 0) {
1842                                 continue;
1843                         }
1844
1845                         memcpy(previous_guid_key,
1846                                keys[num_keys].data,
1847                                sizeof(previous_guid_key));
1848                 }
1849                 num_keys++;
1850         }
1851
1852
1853         /*
1854          * Now that the list is a safe copy, send the callbacks
1855          */
1856         for (i = 0; i < num_keys; i++) {
1857                 int ret;
1858                 bool matched;
1859                 msg = ldb_msg_new(ac);
1860                 if (!msg) {
1861                         talloc_free(keys);
1862                         return LDB_ERR_OPERATIONS_ERROR;
1863                 }
1864
1865                 ret =
1866                     ldb_kv_search_key(ac->module,
1867                                       ldb_kv,
1868                                       keys[i],
1869                                       msg,
1870                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1871                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1872                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1873                         /*
1874                          * the record has disappeared? yes, this can
1875                          * happen if the entry is deleted by something
1876                          * operating in the callback (not another
1877                          * process, as we have a read lock)
1878                          */
1879                         talloc_free(msg);
1880                         continue;
1881                 }
1882
1883                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1884                         /* an internal error */
1885                         talloc_free(keys);
1886                         talloc_free(msg);
1887                         return LDB_ERR_OPERATIONS_ERROR;
1888                 }
1889
1890                 /*
1891                  * We trust the index for LDB_SCOPE_ONELEVEL
1892                  * unless the index key has been truncated.
1893                  *
1894                  * LDB_SCOPE_BASE is not passed in by our only caller.
1895                  */
1896                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
1897                     ldb_kv->cache->one_level_indexes &&
1898                     scope_one_truncation == KEY_NOT_TRUNCATED) {
1899                         ret = ldb_match_message(ldb, msg, ac->tree,
1900                                                 ac->scope, &matched);
1901                 } else {
1902                         ret = ldb_match_msg_error(ldb, msg,
1903                                                   ac->tree, ac->base,
1904                                                   ac->scope, &matched);
1905                 }
1906
1907                 if (ret != LDB_SUCCESS) {
1908                         talloc_free(keys);
1909                         talloc_free(msg);
1910                         return ret;
1911                 }
1912                 if (!matched) {
1913                         talloc_free(msg);
1914                         continue;
1915                 }
1916
1917                 /* filter the attributes that the user wants */
1918                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1919
1920                 talloc_free(msg);
1921
1922                 if (ret == -1) {
1923                         talloc_free(keys);
1924                         return LDB_ERR_OPERATIONS_ERROR;
1925                 }
1926
1927                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1928                 if (ret != LDB_SUCCESS) {
1929                         /* Regardless of success or failure, the msg
1930                          * is the callbacks responsiblity, and should
1931                          * not be talloc_free()'ed */
1932                         ac->request_terminated = true;
1933                         talloc_free(keys);
1934                         return ret;
1935                 }
1936
1937                 (*match_count)++;
1938         }
1939
1940         TALLOC_FREE(keys);
1941         return LDB_SUCCESS;
1942 }
1943
1944 /*
1945   sort a DN list
1946  */
1947 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
1948                                 struct dn_list *list)
1949 {
1950         if (list->count < 2) {
1951                 return;
1952         }
1953
1954         /* We know the list is sorted when using the GUID index */
1955         if (ltdb->cache->GUID_index_attribute != NULL) {
1956                 return;
1957         }
1958
1959         TYPESAFE_QSORT(list->dn, list->count,
1960                        ldb_val_equal_exact_for_qsort);
1961 }
1962
1963 /*
1964   search the database with a LDAP-like expression using indexes
1965   returns -1 if an indexed search is not possible, in which
1966   case the caller should call ltdb_search_full()
1967 */
1968 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1969 {
1970         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1971         struct ldb_kv_private *ldb_kv = talloc_get_type(
1972             ldb_module_get_private(ac->module), struct ldb_kv_private);
1973         struct dn_list *dn_list;
1974         int ret;
1975         enum ldb_scope index_scope;
1976         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1977
1978         /* see if indexing is enabled */
1979         if (!ldb_kv->cache->attribute_indexes &&
1980             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
1981                 /* fallback to a full search */
1982                 return LDB_ERR_OPERATIONS_ERROR;
1983         }
1984
1985         dn_list = talloc_zero(ac, struct dn_list);
1986         if (dn_list == NULL) {
1987                 return ldb_module_oom(ac->module);
1988         }
1989
1990         /*
1991          * For the purposes of selecting the switch arm below, if we
1992          * don't have a one-level index then treat it like a subtree
1993          * search
1994          */
1995         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1996             !ldb_kv->cache->one_level_indexes) {
1997                 index_scope = LDB_SCOPE_SUBTREE;
1998         } else {
1999                 index_scope = ac->scope;
2000         }
2001
2002         switch (index_scope) {
2003         case LDB_SCOPE_BASE:
2004                 /*
2005                  * The only caller will have filtered the operation out
2006                  * so we should never get here
2007                  */
2008                 return ldb_operr(ldb);
2009
2010         case LDB_SCOPE_ONELEVEL:
2011                 /*
2012                  * If we ever start to also load the index values for
2013                  * the tree, we must ensure we strictly intersect with
2014                  * this list, as we trust the ONELEVEL index
2015                  */
2016                 ret = ldb_kv_index_dn_one(ac->module,
2017                                           ldb_kv,
2018                                           ac->base,
2019                                           dn_list,
2020                                           &scope_one_truncation);
2021                 if (ret != LDB_SUCCESS) {
2022                         talloc_free(dn_list);
2023                         return ret;
2024                 }
2025
2026                 /*
2027                  * If we have too many matches, running the filter
2028                  * tree over the SCOPE_ONELEVEL can be quite expensive
2029                  * so we now check the filter tree index as well.
2030                  *
2031                  * We only do this in the GUID index mode, which is
2032                  * O(n*log(m)) otherwise the intersection below will
2033                  * be too costly at O(n*m).
2034                  *
2035                  * We don't set a heuristic for 'too many' but instead
2036                  * do it always and rely on the index lookup being
2037                  * fast enough in the small case.
2038                  */
2039                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2040                         struct dn_list *idx_one_tree_list
2041                                 = talloc_zero(ac, struct dn_list);
2042                         if (idx_one_tree_list == NULL) {
2043                                 talloc_free(dn_list);
2044                                 return ldb_module_oom(ac->module);
2045                         }
2046
2047                         if (!ldb_kv->cache->attribute_indexes) {
2048                                 talloc_free(idx_one_tree_list);
2049                                 talloc_free(dn_list);
2050                                 return LDB_ERR_OPERATIONS_ERROR;
2051                         }
2052                         /*
2053                          * Here we load the index for the tree.
2054                          *
2055                          * We only care if this is successful, if the
2056                          * index can't trim the result list down then
2057                          * the ONELEVEL index is still good enough.
2058                          */
2059                         ret = ldb_kv_index_dn(
2060                             ac->module, ldb_kv, ac->tree, idx_one_tree_list);
2061                         if (ret == LDB_SUCCESS) {
2062                                 if (!list_intersect(ldb,
2063                                                     ldb_kv,
2064                                                     dn_list,
2065                                                     idx_one_tree_list)) {
2066                                         talloc_free(idx_one_tree_list);
2067                                         talloc_free(dn_list);
2068                                         return LDB_ERR_OPERATIONS_ERROR;
2069                                 }
2070                         }
2071                 }
2072                 break;
2073
2074         case LDB_SCOPE_SUBTREE:
2075         case LDB_SCOPE_DEFAULT:
2076                 if (!ldb_kv->cache->attribute_indexes) {
2077                         talloc_free(dn_list);
2078                         return LDB_ERR_OPERATIONS_ERROR;
2079                 }
2080                 /*
2081                  * Here we load the index for the tree.  We have no
2082                  * index for the subtree.
2083                  */
2084                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2085                 if (ret != LDB_SUCCESS) {
2086                         talloc_free(dn_list);
2087                         return ret;
2088                 }
2089                 break;
2090         }
2091
2092         /*
2093          * It is critical that this function do the re-filter even
2094          * on things found by the index as the index can over-match
2095          * in cases of truncation (as well as when it decides it is
2096          * not worth further filtering)
2097          *
2098          * If this changes, then the index code above would need to
2099          * pass up a flag to say if any index was truncated during
2100          * processing as the truncation here refers only to the
2101          * SCOPE_ONELEVEL index.
2102          */
2103         ret = ldb_kv_index_filter(
2104             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2105         talloc_free(dn_list);
2106         return ret;
2107 }
2108
2109 /**
2110  * @brief Add a DN in the index list of a given attribute name/value pair
2111  *
2112  * This function will add the DN in the index list for the index for
2113  * the given attribute name and value.
2114  *
2115  * @param[in]  module       A ldb_module structure
2116  *
2117  * @param[in]  dn           The string representation of the DN as it
2118  *                          will be stored in the index entry
2119  *
2120  * @param[in]  el           A ldb_message_element array, one of the entry
2121  *                          referred by the v_idx is the attribute name and
2122  *                          value pair which will be used to construct the
2123  *                          index name
2124  *
2125  * @param[in]  v_idx        The index of element in the el array to use
2126  *
2127  * @return                  An ldb error code
2128  */
2129 static int ldb_kv_index_add1(struct ldb_module *module,
2130                              struct ldb_kv_private *ldb_kv,
2131                              const struct ldb_message *msg,
2132                              struct ldb_message_element *el,
2133                              int v_idx)
2134 {
2135         struct ldb_context *ldb;
2136         struct ldb_dn *dn_key;
2137         int ret;
2138         const struct ldb_schema_attribute *a;
2139         struct dn_list *list;
2140         unsigned alloc_len;
2141         enum key_truncation truncation = KEY_TRUNCATED;
2142
2143
2144         ldb = ldb_module_get_ctx(module);
2145
2146         list = talloc_zero(module, struct dn_list);
2147         if (list == NULL) {
2148                 return LDB_ERR_OPERATIONS_ERROR;
2149         }
2150
2151         dn_key = ldb_kv_index_key(
2152             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2153         if (!dn_key) {
2154                 talloc_free(list);
2155                 return LDB_ERR_OPERATIONS_ERROR;
2156         }
2157         /*
2158          * Samba only maintains unique indexes on the objectSID and objectGUID
2159          * so if a unique index key exceeds the maximum length there is a
2160          * problem.
2161          */
2162         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2163                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2164                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2165
2166                 ldb_asprintf_errstring(
2167                     ldb,
2168                     __location__ ": unique index key on %s in %s, "
2169                                  "exceeds maximum key length of %u (encoded).",
2170                     el->name,
2171                     ldb_dn_get_linearized(msg->dn),
2172                     ldb_kv->max_key_length);
2173                 talloc_free(list);
2174                 return LDB_ERR_CONSTRAINT_VIOLATION;
2175         }
2176         talloc_steal(list, dn_key);
2177
2178         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2179         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2180                 talloc_free(list);
2181                 return ret;
2182         }
2183
2184         /*
2185          * Check for duplicates in the @IDXDN DN -> GUID record
2186          *
2187          * This is very normal, it just means a duplicate DN creation
2188          * was attempted, so don't set the error string or print scary
2189          * messages.
2190          */
2191         if (list->count > 0 &&
2192             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2193             truncation == KEY_NOT_TRUNCATED) {
2194
2195                 talloc_free(list);
2196                 return LDB_ERR_CONSTRAINT_VIOLATION;
2197
2198         } else if (list->count > 0
2199                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2200
2201                 /*
2202                  * At least one existing entry in the DN->GUID index, which
2203                  * arises when the DN indexes have been truncated
2204                  *
2205                  * So need to pull the DN's to check if it's really a duplicate
2206                  */
2207                 int i;
2208                 for (i=0; i < list->count; i++) {
2209                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2210                         struct ldb_val key = {
2211                                 .data = guid_key,
2212                                 .length = sizeof(guid_key)
2213                         };
2214                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2215                         struct ldb_message *rec = ldb_msg_new(ldb);
2216                         if (rec == NULL) {
2217                                 return LDB_ERR_OPERATIONS_ERROR;
2218                         }
2219
2220                         ret = ldb_kv_idx_to_key(
2221                             module, ldb_kv, ldb, &list->dn[i], &key);
2222                         if (ret != LDB_SUCCESS) {
2223                                 TALLOC_FREE(list);
2224                                 TALLOC_FREE(rec);
2225                                 return ret;
2226                         }
2227
2228                         ret =
2229                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2230                         if (key.data != guid_key) {
2231                                 TALLOC_FREE(key.data);
2232                         }
2233                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2234                                 /*
2235                                  * the record has disappeared?
2236                                  * yes, this can happen
2237                                  */
2238                                 talloc_free(rec);
2239                                 continue;
2240                         }
2241
2242                         if (ret != LDB_SUCCESS) {
2243                                 /* an internal error */
2244                                 TALLOC_FREE(rec);
2245                                 TALLOC_FREE(list);
2246                                 return LDB_ERR_OPERATIONS_ERROR;
2247                         }
2248                         /*
2249                          * The DN we are trying to add to the DB and index
2250                          * is already here, so we must deny the addition
2251                          */
2252                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2253                                 TALLOC_FREE(rec);
2254                                 TALLOC_FREE(list);
2255                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2256                         }
2257                 }
2258         }
2259
2260         /*
2261          * Check for duplicates in unique indexes
2262          *
2263          * We don't need to do a loop test like the @IDXDN case
2264          * above as we have a ban on long unique index values
2265          * at the start of this function.
2266          */
2267         if (list->count > 0 &&
2268             ((a != NULL
2269               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2270                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2271                 /*
2272                  * We do not want to print info about a possibly
2273                  * confidential DN that the conflict was with in the
2274                  * user-visible error string
2275                  */
2276
2277                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2278                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2279                                   __location__
2280                                   ": unique index violation on %s in %s, "
2281                                   "conficts with %*.*s in %s",
2282                                   el->name, ldb_dn_get_linearized(msg->dn),
2283                                   (int)list->dn[0].length,
2284                                   (int)list->dn[0].length,
2285                                   list->dn[0].data,
2286                                   ldb_dn_get_linearized(dn_key));
2287                 } else {
2288                         /* This can't fail, gives a default at worst */
2289                         const struct ldb_schema_attribute *attr =
2290                             ldb_schema_attribute_by_name(
2291                                 ldb, ldb_kv->cache->GUID_index_attribute);
2292                         struct ldb_val v;
2293                         ret = attr->syntax->ldif_write_fn(ldb, list,
2294                                                           &list->dn[0], &v);
2295                         if (ret == LDB_SUCCESS) {
2296                                 ldb_debug(ldb,
2297                                           LDB_DEBUG_WARNING,
2298                                           __location__
2299                                           ": unique index violation on %s in "
2300                                           "%s, conficts with %s %*.*s in %s",
2301                                           el->name,
2302                                           ldb_dn_get_linearized(msg->dn),
2303                                           ldb_kv->cache->GUID_index_attribute,
2304                                           (int)v.length,
2305                                           (int)v.length,
2306                                           v.data,
2307                                           ldb_dn_get_linearized(dn_key));
2308                         }
2309                 }
2310                 ldb_asprintf_errstring(ldb,
2311                                        __location__ ": unique index violation "
2312                                        "on %s in %s",
2313                                        el->name,
2314                                        ldb_dn_get_linearized(msg->dn));
2315                 talloc_free(list);
2316                 return LDB_ERR_CONSTRAINT_VIOLATION;
2317         }
2318
2319         /* overallocate the list a bit, to reduce the number of
2320          * realloc trigered copies */
2321         alloc_len = ((list->count+1)+7) & ~7;
2322         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2323         if (list->dn == NULL) {
2324                 talloc_free(list);
2325                 return LDB_ERR_OPERATIONS_ERROR;
2326         }
2327
2328         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2329                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2330                 list->dn[list->count].data
2331                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2332                 if (list->dn[list->count].data == NULL) {
2333                         talloc_free(list);
2334                         return LDB_ERR_OPERATIONS_ERROR;
2335                 }
2336                 list->dn[list->count].length = strlen(dn_str);
2337         } else {
2338                 const struct ldb_val *key_val;
2339                 struct ldb_val *exact = NULL, *next = NULL;
2340                 key_val = ldb_msg_find_ldb_val(
2341                     msg, ldb_kv->cache->GUID_index_attribute);
2342                 if (key_val == NULL) {
2343                         talloc_free(list);
2344                         return ldb_module_operr(module);
2345                 }
2346
2347                 if (key_val->length != LDB_KV_GUID_SIZE) {
2348                         talloc_free(list);
2349                         return ldb_module_operr(module);
2350                 }
2351
2352                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2353                                         *key_val, ldb_val_equal_exact_ordered,
2354                                         exact, next);
2355
2356                 /*
2357                  * Give a warning rather than fail, this could be a
2358                  * duplicate value in the record allowed by a caller
2359                  * forcing in the value with
2360                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2361                  */
2362                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2363                         /* This can't fail, gives a default at worst */
2364                         const struct ldb_schema_attribute *attr =
2365                             ldb_schema_attribute_by_name(
2366                                 ldb, ldb_kv->cache->GUID_index_attribute);
2367                         struct ldb_val v;
2368                         ret = attr->syntax->ldif_write_fn(ldb, list,
2369                                                           exact, &v);
2370                         if (ret == LDB_SUCCESS) {
2371                                 ldb_debug(ldb,
2372                                           LDB_DEBUG_WARNING,
2373                                           __location__
2374                                           ": duplicate attribute value in %s "
2375                                           "for index on %s, "
2376                                           "duplicate of %s %*.*s in %s",
2377                                           ldb_dn_get_linearized(msg->dn),
2378                                           el->name,
2379                                           ldb_kv->cache->GUID_index_attribute,
2380                                           (int)v.length,
2381                                           (int)v.length,
2382                                           v.data,
2383                                           ldb_dn_get_linearized(dn_key));
2384                         }
2385                 }
2386
2387                 if (next == NULL) {
2388                         next = &list->dn[list->count];
2389                 } else {
2390                         memmove(&next[1], next,
2391                                 sizeof(*next) * (list->count - (next - list->dn)));
2392                 }
2393                 *next = ldb_val_dup(list->dn, key_val);
2394                 if (next->data == NULL) {
2395                         talloc_free(list);
2396                         return ldb_module_operr(module);
2397                 }
2398         }
2399         list->count++;
2400
2401         ret = ldb_kv_dn_list_store(module, dn_key, list);
2402
2403         talloc_free(list);
2404
2405         return ret;
2406 }
2407
2408 /*
2409   add index entries for one elements in a message
2410  */
2411 static int ldb_kv_index_add_el(struct ldb_module *module,
2412                                struct ldb_kv_private *ldb_kv,
2413                                const struct ldb_message *msg,
2414                                struct ldb_message_element *el)
2415 {
2416         unsigned int i;
2417         for (i = 0; i < el->num_values; i++) {
2418                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2419                 if (ret != LDB_SUCCESS) {
2420                         return ret;
2421                 }
2422         }
2423
2424         return LDB_SUCCESS;
2425 }
2426
2427 /*
2428   add index entries for all elements in a message
2429  */
2430 static int ldb_kv_index_add_all(struct ldb_module *module,
2431                                 struct ldb_kv_private *ldb_kv,
2432                                 const struct ldb_message *msg)
2433 {
2434         struct ldb_message_element *elements = msg->elements;
2435         unsigned int i;
2436         const char *dn_str;
2437         int ret;
2438
2439         if (ldb_dn_is_special(msg->dn)) {
2440                 return LDB_SUCCESS;
2441         }
2442
2443         dn_str = ldb_dn_get_linearized(msg->dn);
2444         if (dn_str == NULL) {
2445                 return LDB_ERR_OPERATIONS_ERROR;
2446         }
2447
2448         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2449         if (ret != LDB_SUCCESS) {
2450                 return ret;
2451         }
2452
2453         if (!ldb_kv->cache->attribute_indexes) {
2454                 /* no indexed fields */
2455                 return LDB_SUCCESS;
2456         }
2457
2458         for (i = 0; i < msg->num_elements; i++) {
2459                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2460                         continue;
2461                 }
2462                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2463                 if (ret != LDB_SUCCESS) {
2464                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2465                         ldb_asprintf_errstring(ldb,
2466                                                __location__ ": Failed to re-index %s in %s - %s",
2467                                                elements[i].name, dn_str,
2468                                                ldb_errstring(ldb));
2469                         return ret;
2470                 }
2471         }
2472
2473         return LDB_SUCCESS;
2474 }
2475
2476
2477 /*
2478   insert a DN index for a message
2479 */
2480 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2481                                   struct ldb_kv_private *ldb_kv,
2482                                   const struct ldb_message *msg,
2483                                   struct ldb_dn *dn,
2484                                   const char *index,
2485                                   int add)
2486 {
2487         struct ldb_message_element el;
2488         struct ldb_val val;
2489         int ret;
2490
2491         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2492         if (val.data == NULL) {
2493                 const char *dn_str = ldb_dn_get_linearized(dn);
2494                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2495                                        __location__ ": Failed to modify %s "
2496                                                     "against %s in %s: failed "
2497                                                     "to get casefold DN",
2498                                        index,
2499                                        ldb_kv->cache->GUID_index_attribute,
2500                                        dn_str);
2501                 return LDB_ERR_OPERATIONS_ERROR;
2502         }
2503
2504         val.length = strlen((char *)val.data);
2505         el.name = index;
2506         el.values = &val;
2507         el.num_values = 1;
2508
2509         if (add) {
2510                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2511         } else { /* delete */
2512                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2513         }
2514
2515         if (ret != LDB_SUCCESS) {
2516                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2517                 const char *dn_str = ldb_dn_get_linearized(dn);
2518                 ldb_asprintf_errstring(ldb,
2519                                        __location__ ": Failed to modify %s "
2520                                                     "against %s in %s - %s",
2521                                        index,
2522                                        ldb_kv->cache->GUID_index_attribute,
2523                                        dn_str,
2524                                        ldb_errstring(ldb));
2525                 return ret;
2526         }
2527         return ret;
2528 }
2529
2530 /*
2531   insert a one level index for a message
2532 */
2533 static int ldb_kv_index_onelevel(struct ldb_module *module,
2534                                  const struct ldb_message *msg,
2535                                  int add)
2536 {
2537         struct ldb_kv_private *ldb_kv = talloc_get_type(
2538             ldb_module_get_private(module), struct ldb_kv_private);
2539         struct ldb_dn *pdn;
2540         int ret;
2541
2542         /* We index for ONE Level only if requested */
2543         if (!ldb_kv->cache->one_level_indexes) {
2544                 return LDB_SUCCESS;
2545         }
2546
2547         pdn = ldb_dn_get_parent(module, msg->dn);
2548         if (pdn == NULL) {
2549                 return LDB_ERR_OPERATIONS_ERROR;
2550         }
2551         ret =
2552             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2553
2554         talloc_free(pdn);
2555
2556         return ret;
2557 }
2558
2559 /*
2560   insert a one level index for a message
2561 */
2562 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2563                                       const struct ldb_message *msg,
2564                                       int add)
2565 {
2566         int ret;
2567         struct ldb_kv_private *ldb_kv = talloc_get_type(
2568             ldb_module_get_private(module), struct ldb_kv_private);
2569
2570         /* We index for DN only if using a GUID index */
2571         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2572                 return LDB_SUCCESS;
2573         }
2574
2575         ret = ldb_kv_modify_index_dn(
2576             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2577
2578         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2579                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2580                                        "Entry %s already exists",
2581                                        ldb_dn_get_linearized(msg->dn));
2582                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2583         }
2584         return ret;
2585 }
2586
2587 /*
2588   add the index entries for a new element in a record
2589   The caller guarantees that these element values are not yet indexed
2590 */
2591 int ldb_kv_index_add_element(struct ldb_module *module,
2592                              struct ldb_kv_private *ldb_kv,
2593                              const struct ldb_message *msg,
2594                              struct ldb_message_element *el)
2595 {
2596         if (ldb_dn_is_special(msg->dn)) {
2597                 return LDB_SUCCESS;
2598         }
2599         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2600                 return LDB_SUCCESS;
2601         }
2602         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2603 }
2604
2605 /*
2606   add the index entries for a new record
2607 */
2608 int ldb_kv_index_add_new(struct ldb_module *module,
2609                          struct ldb_kv_private *ldb_kv,
2610                          const struct ldb_message *msg)
2611 {
2612         int ret;
2613
2614         if (ldb_dn_is_special(msg->dn)) {
2615                 return LDB_SUCCESS;
2616         }
2617
2618         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2619         if (ret != LDB_SUCCESS) {
2620                 /*
2621                  * Because we can't trust the caller to be doing
2622                  * transactions properly, clean up any index for this
2623                  * entry rather than relying on a transaction
2624                  * cleanup
2625                  */
2626
2627                 ldb_kv_index_delete(module, msg);
2628                 return ret;
2629         }
2630
2631         ret = ldb_kv_index_onelevel(module, msg, 1);
2632         if (ret != LDB_SUCCESS) {
2633                 /*
2634                  * Because we can't trust the caller to be doing
2635                  * transactions properly, clean up any index for this
2636                  * entry rather than relying on a transaction
2637                  * cleanup
2638                  */
2639                 ldb_kv_index_delete(module, msg);
2640                 return ret;
2641         }
2642         return ret;
2643 }
2644
2645
2646 /*
2647   delete an index entry for one message element
2648 */
2649 int ldb_kv_index_del_value(struct ldb_module *module,
2650                            struct ldb_kv_private *ldb_kv,
2651                            const struct ldb_message *msg,
2652                            struct ldb_message_element *el,
2653                            unsigned int v_idx)
2654 {
2655         struct ldb_context *ldb;
2656         struct ldb_dn *dn_key;
2657         const char *dn_str;
2658         int ret, i;
2659         unsigned int j;
2660         struct dn_list *list;
2661         struct ldb_dn *dn = msg->dn;
2662         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2663
2664         ldb = ldb_module_get_ctx(module);
2665
2666         dn_str = ldb_dn_get_linearized(dn);
2667         if (dn_str == NULL) {
2668                 return LDB_ERR_OPERATIONS_ERROR;
2669         }
2670
2671         if (dn_str[0] == '@') {
2672                 return LDB_SUCCESS;
2673         }
2674
2675         dn_key = ldb_kv_index_key(
2676             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2677         /*
2678          * We ignore key truncation in ltdb_index_add1() so
2679          * match that by ignoring it here as well
2680          *
2681          * Multiple values are legitimate and accepted
2682          */
2683         if (!dn_key) {
2684                 return LDB_ERR_OPERATIONS_ERROR;
2685         }
2686
2687         list = talloc_zero(dn_key, struct dn_list);
2688         if (list == NULL) {
2689                 talloc_free(dn_key);
2690                 return LDB_ERR_OPERATIONS_ERROR;
2691         }
2692
2693         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2694         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2695                 /* it wasn't indexed. Did we have an earlier error? If we did then
2696                    its gone now */
2697                 talloc_free(dn_key);
2698                 return LDB_SUCCESS;
2699         }
2700
2701         if (ret != LDB_SUCCESS) {
2702                 talloc_free(dn_key);
2703                 return ret;
2704         }
2705
2706         /*
2707          * Find one of the values matching this message to remove
2708          */
2709         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2710         if (i == -1) {
2711                 /* nothing to delete */
2712                 talloc_free(dn_key);
2713                 return LDB_SUCCESS;
2714         }
2715
2716         j = (unsigned int) i;
2717         if (j != list->count - 1) {
2718                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2719         }
2720         list->count--;
2721         if (list->count == 0) {
2722                 talloc_free(list->dn);
2723                 list->dn = NULL;
2724         } else {
2725                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2726         }
2727
2728         ret = ldb_kv_dn_list_store(module, dn_key, list);
2729
2730         talloc_free(dn_key);
2731
2732         return ret;
2733 }
2734
2735 /*
2736   delete the index entries for a element
2737   return -1 on failure
2738 */
2739 int ldb_kv_index_del_element(struct ldb_module *module,
2740                              struct ldb_kv_private *ldb_kv,
2741                              const struct ldb_message *msg,
2742                              struct ldb_message_element *el)
2743 {
2744         const char *dn_str;
2745         int ret;
2746         unsigned int i;
2747
2748         if (!ldb_kv->cache->attribute_indexes) {
2749                 /* no indexed fields */
2750                 return LDB_SUCCESS;
2751         }
2752
2753         dn_str = ldb_dn_get_linearized(msg->dn);
2754         if (dn_str == NULL) {
2755                 return LDB_ERR_OPERATIONS_ERROR;
2756         }
2757
2758         if (dn_str[0] == '@') {
2759                 return LDB_SUCCESS;
2760         }
2761
2762         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2763                 return LDB_SUCCESS;
2764         }
2765         for (i = 0; i < el->num_values; i++) {
2766                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2767                 if (ret != LDB_SUCCESS) {
2768                         return ret;
2769                 }
2770         }
2771
2772         return LDB_SUCCESS;
2773 }
2774
2775 /*
2776   delete the index entries for a record
2777   return -1 on failure
2778 */
2779 int ldb_kv_index_delete(struct ldb_module *module,
2780                         const struct ldb_message *msg)
2781 {
2782         struct ldb_kv_private *ldb_kv = talloc_get_type(
2783             ldb_module_get_private(module), struct ldb_kv_private);
2784         int ret;
2785         unsigned int i;
2786
2787         if (ldb_dn_is_special(msg->dn)) {
2788                 return LDB_SUCCESS;
2789         }
2790
2791         ret = ldb_kv_index_onelevel(module, msg, 0);
2792         if (ret != LDB_SUCCESS) {
2793                 return ret;
2794         }
2795
2796         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2797         if (ret != LDB_SUCCESS) {
2798                 return ret;
2799         }
2800
2801         if (!ldb_kv->cache->attribute_indexes) {
2802                 /* no indexed fields */
2803                 return LDB_SUCCESS;
2804         }
2805
2806         for (i = 0; i < msg->num_elements; i++) {
2807                 ret = ldb_kv_index_del_element(
2808                     module, ldb_kv, msg, &msg->elements[i]);
2809                 if (ret != LDB_SUCCESS) {
2810                         return ret;
2811                 }
2812         }
2813
2814         return LDB_SUCCESS;
2815 }
2816
2817
2818 /*
2819   traversal function that deletes all @INDEX records in the in-memory
2820   TDB.
2821
2822   This does not touch the actual DB, that is done at transaction
2823   commit, which in turn greatly reduces DB churn as we will likely
2824   be able to do a direct update into the old record.
2825 */
2826 static int delete_index(struct ldb_kv_private *ldb_kv,
2827                         struct ldb_val key,
2828                         struct ldb_val data,
2829                         void *state)
2830 {
2831         struct ldb_module *module = state;
2832         const char *dnstr = "DN=" LDB_KV_INDEX ":";
2833         struct dn_list list;
2834         struct ldb_dn *dn;
2835         struct ldb_val v;
2836         int ret;
2837
2838         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2839                 return 0;
2840         }
2841         /* we need to put a empty list in the internal tdb for this
2842          * index entry */
2843         list.dn = NULL;
2844         list.count = 0;
2845
2846         /* the offset of 3 is to remove the DN= prefix. */
2847         v.data = key.data + 3;
2848         v.length = strnlen((char *)key.data, key.length) - 3;
2849
2850         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2851
2852         /*
2853          * This does not actually touch the DB quite yet, just
2854          * the in-memory index cache
2855          */
2856         ret = ldb_kv_dn_list_store(module, dn, &list);
2857         if (ret != LDB_SUCCESS) {
2858                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2859                                        "Unable to store null index for %s\n",
2860                                                 ldb_dn_get_linearized(dn));
2861                 talloc_free(dn);
2862                 return -1;
2863         }
2864         talloc_free(dn);
2865         return 0;
2866 }
2867
2868 /*
2869   traversal function that adds @INDEX records during a re index TODO wrong comment
2870 */
2871 static int re_key(struct ldb_kv_private *ldb_kv,
2872                   struct ldb_val key,
2873                   struct ldb_val val,
2874                   void *state)
2875 {
2876         struct ldb_context *ldb;
2877         struct ldb_kv_reindex_context *ctx =
2878             (struct ldb_kv_reindex_context *)state;
2879         struct ldb_module *module = ctx->module;
2880         struct ldb_message *msg;
2881         unsigned int nb_elements_in_db;
2882         int ret;
2883         struct ldb_val key2;
2884         bool is_record;
2885
2886         ldb = ldb_module_get_ctx(module);
2887
2888         if (key.length > 4 &&
2889             memcmp(key.data, "DN=@", 4) == 0) {
2890                 return 0;
2891         }
2892
2893         is_record = ldb_kv_key_is_record(key);
2894         if (is_record == false) {
2895                 return 0;
2896         }
2897
2898         msg = ldb_msg_new(module);
2899         if (msg == NULL) {
2900                 return -1;
2901         }
2902
2903         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2904                                                    msg,
2905                                                    NULL, 0,
2906                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2907                                                    &nb_elements_in_db);
2908         if (ret != 0) {
2909                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2910                                                 ldb_dn_get_linearized(msg->dn));
2911                 ctx->error = ret;
2912                 talloc_free(msg);
2913                 return -1;
2914         }
2915
2916         if (msg->dn == NULL) {
2917                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2918                           "Refusing to re-index as GUID "
2919                           "key %*.*s with no DN\n",
2920                           (int)key.length, (int)key.length,
2921                           (char *)key.data);
2922                 talloc_free(msg);
2923                 return -1;
2924         }
2925
2926         /* check if the DN key has changed, perhaps due to the case
2927            insensitivity of an element changing, or a change from DN
2928            to GUID keys */
2929         key2 = ldb_kv_key_msg(module, msg, msg);
2930         if (key2.data == NULL) {
2931                 /* probably a corrupt record ... darn */
2932                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2933                                                 ldb_dn_get_linearized(msg->dn));
2934                 talloc_free(msg);
2935                 return 0;
2936         }
2937         if (key.length != key2.length ||
2938             (memcmp(key.data, key2.data, key.length) != 0)) {
2939                 ldb_kv->kv_ops->update_in_iterate(
2940                     ldb_kv, key, key2, val, ctx);
2941         }
2942         talloc_free(key2.data);
2943
2944         talloc_free(msg);
2945
2946         ctx->count++;
2947         if (ctx->count % 10000 == 0) {
2948                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2949                           "Reindexing: re-keyed %u records so far",
2950                           ctx->count);
2951         }
2952
2953         return 0;
2954 }
2955
2956 /*
2957   traversal function that adds @INDEX records during a re index
2958 */
2959 static int re_index(struct ldb_kv_private *ldb_kv,
2960                     struct ldb_val key,
2961                     struct ldb_val val,
2962                     void *state)
2963 {
2964         struct ldb_context *ldb;
2965         struct ldb_kv_reindex_context *ctx =
2966             (struct ldb_kv_reindex_context *)state;
2967         struct ldb_module *module = ctx->module;
2968         struct ldb_message *msg;
2969         unsigned int nb_elements_in_db;
2970         int ret;
2971         bool is_record;
2972
2973         ldb = ldb_module_get_ctx(module);
2974
2975         if (key.length > 4 &&
2976             memcmp(key.data, "DN=@", 4) == 0) {
2977                 return 0;
2978         }
2979