9c65b6fb92a68afd4aaf5b0f1e5105c9b58270bf
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(struct ldb_module *module)
203 {
204         struct ldb_kv_private *ldb_kv = talloc_get_type(
205             ldb_module_get_private(module), struct ldb_kv_private);
206         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
207         if (ldb_kv->idxptr == NULL) {
208                 return ldb_oom(ldb_module_get_ctx(module));
209         }
210
211         return LDB_SUCCESS;
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
219                                          const struct ldb_val *v2)
220 {
221         if (v1->length > v2->length) {
222                 return -1;
223         }
224         if (v1->length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1->data, v2->data, v1->length);
228 }
229
230 /*
231   see if two ldb_val structures contain exactly the same data
232   return -1 or 1 for a mismatch, 0 for match
233 */
234 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
235                                        const struct ldb_val *v2)
236 {
237         if (v1.length > v2->length) {
238                 return -1;
239         }
240         if (v1.length < v2->length) {
241                 return 1;
242         }
243         return memcmp(v1.data, v2->data, v1.length);
244 }
245
246
247 /*
248   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
249   binary-safe comparison for the 'dn' returns -1 if not found
250
251   This is therefore safe when the value is a GUID in the future
252  */
253 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
254                                    const struct dn_list *list,
255                                    const struct ldb_val *v)
256 {
257         unsigned int i;
258         struct ldb_val *exact = NULL, *next = NULL;
259
260         if (ldb_kv->cache->GUID_index_attribute == NULL) {
261                 for (i=0; i<list->count; i++) {
262                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
263                                 return i;
264                         }
265                 }
266                 return -1;
267         }
268
269         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
270                                 *v, ldb_val_equal_exact_ordered,
271                                 exact, next);
272         if (exact == NULL) {
273                 return -1;
274         }
275         /* Not required, but keeps the compiler quiet */
276         if (next != NULL) {
277                 return -1;
278         }
279
280         i = exact - list->dn;
281         return i;
282 }
283
284 /*
285   find a entry in a dn_list. Uses a case sensitive comparison with the dn
286   returns -1 if not found
287  */
288 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
289                                    struct dn_list *list,
290                                    const struct ldb_message *msg)
291 {
292         struct ldb_val v;
293         const struct ldb_val *key_val;
294         if (ldb_kv->cache->GUID_index_attribute == NULL) {
295                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
296                 v.data = discard_const_p(unsigned char, dn_str);
297                 v.length = strlen(dn_str);
298         } else {
299                 key_val = ldb_msg_find_ldb_val(
300                     msg, ldb_kv->cache->GUID_index_attribute);
301                 if (key_val == NULL) {
302                         return -1;
303                 }
304                 v = *key_val;
305         }
306         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
307 }
308
309 /*
310   this is effectively a cast function, but with lots of paranoia
311   checks and also copes with CPUs that are fussy about pointer
312   alignment
313  */
314 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
315                                            TDB_DATA rec,
316                                            bool check_parent)
317 {
318         struct dn_list *list;
319         if (rec.dsize != sizeof(void *)) {
320                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
321                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
322                 return NULL;
323         }
324         /* note that we can't just use a cast here, as rec.dptr may
325            not be aligned sufficiently for a pointer. A cast would cause
326            platforms like some ARM CPUs to crash */
327         memcpy(&list, rec.dptr, sizeof(void *));
328         list = talloc_get_type(list, struct dn_list);
329         if (list == NULL) {
330                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
331                                        "Bad type '%s' for idxptr",
332                                        talloc_get_name(list));
333                 return NULL;
334         }
335         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
336                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
337                                        "Bad parent '%s' for idxptr",
338                                        talloc_get_name(talloc_parent(list->dn)));
339                 return NULL;
340         }
341         return list;
342 }
343
344 /*
345   return the @IDX list in an index entry for a dn as a
346   struct dn_list
347  */
348 static int ldb_kv_dn_list_load(struct ldb_module *module,
349                                struct ldb_kv_private *ldb_kv,
350                                struct ldb_dn *dn,
351                                struct dn_list *list)
352 {
353         struct ldb_message *msg;
354         int ret, version;
355         struct ldb_message_element *el;
356         TDB_DATA rec;
357         struct dn_list *list2;
358         TDB_DATA key;
359
360         list->dn = NULL;
361         list->count = 0;
362
363         /* see if we have any in-memory index entries */
364         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
365                 goto normal_index;
366         }
367
368         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
369         key.dsize = strlen((char *)key.dptr);
370
371         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
372         if (rec.dptr == NULL) {
373                 goto normal_index;
374         }
375
376         /* we've found an in-memory index entry */
377         list2 = ldb_kv_index_idxptr(module, rec, true);
378         if (list2 == NULL) {
379                 free(rec.dptr);
380                 return LDB_ERR_OPERATIONS_ERROR;
381         }
382         free(rec.dptr);
383
384         *list = *list2;
385         return LDB_SUCCESS;
386
387 normal_index:
388         msg = ldb_msg_new(list);
389         if (msg == NULL) {
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392
393         ret = ldb_kv_search_dn1(module,
394                                 dn,
395                                 msg,
396                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
397                                     LDB_UNPACK_DATA_FLAG_NO_DN);
398         if (ret != LDB_SUCCESS) {
399                 talloc_free(msg);
400                 return ret;
401         }
402
403         el = ldb_msg_find_element(msg, LDB_KV_IDX);
404         if (!el) {
405                 talloc_free(msg);
406                 return LDB_SUCCESS;
407         }
408
409         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
410
411         /*
412          * we avoid copying the strings by stealing the list.  We have
413          * to steal msg onto el->values (which looks odd) because we
414          * asked for the memory to be allocated on msg, not on each
415          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
416          */
417         if (ldb_kv->cache->GUID_index_attribute == NULL) {
418                 /* check indexing version number */
419                 if (version != LDB_KV_INDEXING_VERSION) {
420                         ldb_debug_set(ldb_module_get_ctx(module),
421                                       LDB_DEBUG_ERROR,
422                                       "Wrong DN index version %d "
423                                       "expected %d for %s",
424                                       version, LDB_KV_INDEXING_VERSION,
425                                       ldb_dn_get_linearized(dn));
426                         talloc_free(msg);
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 talloc_steal(el->values, msg);
431                 list->dn = talloc_steal(list, el->values);
432                 list->count = el->num_values;
433         } else {
434                 unsigned int i;
435                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
436                         /* This is quite likely during the DB startup
437                            on first upgrade to using a GUID index */
438                         ldb_debug_set(ldb_module_get_ctx(module),
439                                       LDB_DEBUG_ERROR,
440                                       "Wrong GUID index version %d "
441                                       "expected %d for %s",
442                                       version, LDB_KV_GUID_INDEXING_VERSION,
443                                       ldb_dn_get_linearized(dn));
444                         talloc_free(msg);
445                         return LDB_ERR_OPERATIONS_ERROR;
446                 }
447
448                 if (el->num_values == 0) {
449                         talloc_free(msg);
450                         return LDB_ERR_OPERATIONS_ERROR;
451                 }
452
453                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
459                 list->dn = talloc_array(list, struct ldb_val, list->count);
460                 if (list->dn == NULL) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 /*
466                  * The actual data is on msg, due to
467                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
468                  */
469                 talloc_steal(list->dn, msg);
470                 for (i = 0; i < list->count; i++) {
471                         list->dn[i].data
472                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
473                         list->dn[i].length = LDB_KV_GUID_SIZE;
474                 }
475         }
476
477         /* We don't need msg->elements any more */
478         talloc_free(msg->elements);
479         return LDB_SUCCESS;
480 }
481
482 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
483                            struct ldb_kv_private *ldb_kv,
484                            TALLOC_CTX *mem_ctx,
485                            struct ldb_dn *dn,
486                            struct ldb_val *ldb_key)
487 {
488         struct ldb_context *ldb = ldb_module_get_ctx(module);
489         int ret;
490         int index = 0;
491         enum key_truncation truncation = KEY_NOT_TRUNCATED;
492         struct dn_list *list = talloc(mem_ctx, struct dn_list);
493         if (list == NULL) {
494                 ldb_oom(ldb);
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
499         if (ret != LDB_SUCCESS) {
500                 TALLOC_FREE(list);
501                 return ret;
502         }
503
504         if (list->count == 0) {
505                 TALLOC_FREE(list);
506                 return LDB_ERR_NO_SUCH_OBJECT;
507         }
508
509         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
510                 const char *dn_str = ldb_dn_get_linearized(dn);
511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
512                                        __location__
513                                        ": Failed to read DN index "
514                                        "against %s for %s: too many "
515                                        "values (%u > 1)",
516                                        ldb_kv->cache->GUID_index_attribute,
517                                        dn_str,
518                                        list->count);
519                 TALLOC_FREE(list);
520                 return LDB_ERR_CONSTRAINT_VIOLATION;
521         }
522
523         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
524                 /*
525                  * DN key has been truncated, need to inspect the actual
526                  * records to locate the actual DN
527                  */
528                 int i;
529                 index = -1;
530                 for (i=0; i < list->count; i++) {
531                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
532                         struct ldb_val key = {
533                                 .data = guid_key,
534                                 .length = sizeof(guid_key)
535                         };
536                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
537                         struct ldb_message *rec = ldb_msg_new(ldb);
538                         if (rec == NULL) {
539                                 TALLOC_FREE(list);
540                                 return LDB_ERR_OPERATIONS_ERROR;
541                         }
542
543                         ret = ldb_kv_idx_to_key(
544                             module, ldb_kv, ldb, &list->dn[i], &key);
545                         if (ret != LDB_SUCCESS) {
546                                 TALLOC_FREE(list);
547                                 TALLOC_FREE(rec);
548                                 return ret;
549                         }
550
551                         ret =
552                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
553                         if (key.data != guid_key) {
554                                 TALLOC_FREE(key.data);
555                         }
556                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
557                                 /*
558                                  * the record has disappeared?
559                                  * yes, this can happen
560                                  */
561                                 TALLOC_FREE(rec);
562                                 continue;
563                         }
564
565                         if (ret != LDB_SUCCESS) {
566                                 /* an internal error */
567                                 TALLOC_FREE(rec);
568                                 TALLOC_FREE(list);
569                                 return LDB_ERR_OPERATIONS_ERROR;
570                         }
571
572                         /*
573                          * We found the actual DN that we wanted from in the
574                          * multiple values that matched the index
575                          * (due to truncation), so return that.
576                          *
577                          */
578                         if (ldb_dn_compare(dn, rec->dn) == 0) {
579                                 index = i;
580                                 TALLOC_FREE(rec);
581                                 break;
582                         }
583                 }
584
585                 /*
586                  * We matched the index but the actual DN we wanted
587                  * was not here.
588                  */
589                 if (index == -1) {
590                         TALLOC_FREE(list);
591                         return LDB_ERR_NO_SUCH_OBJECT;
592                 }
593         }
594
595         /* The ldb_key memory is allocated by the caller */
596         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
597         TALLOC_FREE(list);
598
599         if (ret != LDB_SUCCESS) {
600                 return LDB_ERR_OPERATIONS_ERROR;
601         }
602
603         return LDB_SUCCESS;
604 }
605
606
607
608 /*
609   save a dn_list into a full @IDX style record
610  */
611 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
612                                      struct ldb_kv_private *ldb_kv,
613                                      struct ldb_dn *dn,
614                                      struct dn_list *list)
615 {
616         struct ldb_message *msg;
617         int ret;
618
619         msg = ldb_msg_new(module);
620         if (!msg) {
621                 return ldb_module_oom(module);
622         }
623
624         msg->dn = dn;
625
626         if (list->count == 0) {
627                 ret = ldb_kv_delete_noindex(module, msg);
628                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
629                         ret = LDB_SUCCESS;
630                 }
631                 talloc_free(msg);
632                 return ret;
633         }
634
635         if (ldb_kv->cache->GUID_index_attribute == NULL) {
636                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
637                                       LDB_KV_INDEXING_VERSION);
638                 if (ret != LDB_SUCCESS) {
639                         talloc_free(msg);
640                         return ldb_module_oom(module);
641                 }
642         } else {
643                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
644                                       LDB_KV_GUID_INDEXING_VERSION);
645                 if (ret != LDB_SUCCESS) {
646                         talloc_free(msg);
647                         return ldb_module_oom(module);
648                 }
649         }
650
651         if (list->count > 0) {
652                 struct ldb_message_element *el;
653
654                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
655                 if (ret != LDB_SUCCESS) {
656                         talloc_free(msg);
657                         return ldb_module_oom(module);
658                 }
659
660                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
661                         el->values = list->dn;
662                         el->num_values = list->count;
663                 } else {
664                         struct ldb_val v;
665                         unsigned int i;
666                         el->values = talloc_array(msg,
667                                                   struct ldb_val, 1);
668                         if (el->values == NULL) {
669                                 talloc_free(msg);
670                                 return ldb_module_oom(module);
671                         }
672
673                         v.data = talloc_array_size(el->values,
674                                                    list->count,
675                                                    LDB_KV_GUID_SIZE);
676                         if (v.data == NULL) {
677                                 talloc_free(msg);
678                                 return ldb_module_oom(module);
679                         }
680
681                         v.length = talloc_get_size(v.data);
682
683                         for (i = 0; i < list->count; i++) {
684                                 if (list->dn[i].length !=
685                                     LDB_KV_GUID_SIZE) {
686                                         talloc_free(msg);
687                                         return ldb_module_operr(module);
688                                 }
689                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
690                                        list->dn[i].data,
691                                        LDB_KV_GUID_SIZE);
692                         }
693                         el->values[0] = v;
694                         el->num_values = 1;
695                 }
696         }
697
698         ret = ldb_kv_store(module, msg, TDB_REPLACE);
699         talloc_free(msg);
700         return ret;
701 }
702
703 /*
704   save a dn_list into the database, in either @IDX or internal format
705  */
706 static int ldb_kv_dn_list_store(struct ldb_module *module,
707                                 struct ldb_dn *dn,
708                                 struct dn_list *list)
709 {
710         struct ldb_kv_private *ldb_kv = talloc_get_type(
711             ldb_module_get_private(module), struct ldb_kv_private);
712         TDB_DATA rec, key;
713         int ret;
714         struct dn_list *list2;
715
716         if (ldb_kv->idxptr == NULL) {
717                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
718         }
719
720         if (ldb_kv->idxptr->itdb == NULL) {
721                 ldb_kv->idxptr->itdb =
722                     tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
723                 if (ldb_kv->idxptr->itdb == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
729         if (key.dptr == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         key.dsize = strlen((char *)key.dptr);
733
734         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
735         if (rec.dptr != NULL) {
736                 list2 = ldb_kv_index_idxptr(module, rec, false);
737                 if (list2 == NULL) {
738                         free(rec.dptr);
739                         return LDB_ERR_OPERATIONS_ERROR;
740                 }
741                 free(rec.dptr);
742                 list2->dn = talloc_steal(list2, list->dn);
743                 list2->count = list->count;
744                 return LDB_SUCCESS;
745         }
746
747         list2 = talloc(ldb_kv->idxptr, struct dn_list);
748         if (list2 == NULL) {
749                 return LDB_ERR_OPERATIONS_ERROR;
750         }
751         list2->dn = talloc_steal(list2, list->dn);
752         list2->count = list->count;
753
754         rec.dptr = (uint8_t *)&list2;
755         rec.dsize = sizeof(void *);
756
757
758         /*
759          * This is not a store into the main DB, but into an in-memory
760          * TDB, so we don't need a guard on ltdb->read_only
761          */
762         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
763         if (ret != 0) {
764                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
765         }
766         return LDB_SUCCESS;
767 }
768
769 /*
770   traverse function for storing the in-memory index entries on disk
771  */
772 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
773                                        TDB_DATA key,
774                                        TDB_DATA data,
775                                        void *state)
776 {
777         struct ldb_module *module = state;
778         struct ldb_kv_private *ldb_kv = talloc_get_type(
779             ldb_module_get_private(module), struct ldb_kv_private);
780         struct ldb_dn *dn;
781         struct ldb_context *ldb = ldb_module_get_ctx(module);
782         struct ldb_val v;
783         struct dn_list *list;
784
785         list = ldb_kv_index_idxptr(module, data, true);
786         if (list == NULL) {
787                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
788                 return -1;
789         }
790
791         v.data = key.dptr;
792         v.length = strnlen((char *)key.dptr, key.dsize);
793
794         dn = ldb_dn_from_ldb_val(module, ldb, &v);
795         if (dn == NULL) {
796                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
797                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
798                 return -1;
799         }
800
801         ldb_kv->idxptr->error =
802             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
803         talloc_free(dn);
804         if (ldb_kv->idxptr->error != 0) {
805                 return -1;
806         }
807         return 0;
808 }
809
810 /* cleanup the idxptr mode when transaction commits */
811 int ldb_kv_index_transaction_commit(struct ldb_module *module)
812 {
813         struct ldb_kv_private *ldb_kv = talloc_get_type(
814             ldb_module_get_private(module), struct ldb_kv_private);
815         int ret;
816
817         struct ldb_context *ldb = ldb_module_get_ctx(module);
818
819         ldb_reset_err_string(ldb);
820
821         if (ldb_kv->idxptr->itdb) {
822                 tdb_traverse(
823                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
824                 tdb_close(ldb_kv->idxptr->itdb);
825         }
826
827         ret = ldb_kv->idxptr->error;
828         if (ret != LDB_SUCCESS) {
829                 if (!ldb_errstring(ldb)) {
830                         ldb_set_errstring(ldb, ldb_strerror(ret));
831                 }
832                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
833         }
834
835         talloc_free(ldb_kv->idxptr);
836         ldb_kv->idxptr = NULL;
837         return ret;
838 }
839
840 /* cleanup the idxptr mode when transaction cancels */
841 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
842 {
843         struct ldb_kv_private *ldb_kv = talloc_get_type(
844             ldb_module_get_private(module), struct ldb_kv_private);
845         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
846                 tdb_close(ldb_kv->idxptr->itdb);
847         }
848         talloc_free(ldb_kv->idxptr);
849         ldb_kv->idxptr = NULL;
850         return LDB_SUCCESS;
851 }
852
853
854 /*
855   return the dn key to be used for an index
856   the caller is responsible for freeing
857 */
858 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
859                                        struct ldb_kv_private *ldb_kv,
860                                        const char *attr,
861                                        const struct ldb_val *value,
862                                        const struct ldb_schema_attribute **ap,
863                                        enum key_truncation *truncation)
864 {
865         struct ldb_dn *ret;
866         struct ldb_val v;
867         const struct ldb_schema_attribute *a = NULL;
868         char *attr_folded = NULL;
869         const char *attr_for_dn = NULL;
870         int r;
871         bool should_b64_encode;
872
873         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
874         size_t key_len = 0;
875         size_t attr_len = 0;
876         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
877         unsigned frmt_len = 0;
878         const size_t additional_key_length = 4;
879         unsigned int num_separators = 3; /* Estimate for overflow check */
880         const size_t min_data = 1;
881         const size_t min_key_length = additional_key_length
882                 + indx_len + num_separators + min_data;
883
884         if (attr[0] == '@') {
885                 attr_for_dn = attr;
886                 v = *value;
887                 if (ap != NULL) {
888                         *ap = NULL;
889                 }
890         } else {
891                 attr_folded = ldb_attr_casefold(ldb, attr);
892                 if (!attr_folded) {
893                         return NULL;
894                 }
895
896                 attr_for_dn = attr_folded;
897
898                 a = ldb_schema_attribute_by_name(ldb, attr);
899                 if (ap) {
900                         *ap = a;
901                 }
902                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
903                 if (r != LDB_SUCCESS) {
904                         const char *errstr = ldb_errstring(ldb);
905                         /* canonicalisation can be refused. For
906                            example, a attribute that takes wildcards
907                            will refuse to canonicalise if the value
908                            contains a wildcard */
909                         ldb_asprintf_errstring(ldb,
910                                                "Failed to create index "
911                                                "key for attribute '%s':%s%s%s",
912                                                attr, ldb_strerror(r),
913                                                (errstr?":":""),
914                                                (errstr?errstr:""));
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918         }
919         attr_len = strlen(attr_for_dn);
920
921         /*
922          * Check if there is any hope this will fit into the DB.
923          * Overflow here is not actually critical the code below
924          * checks again to make the printf and the DB does another
925          * check for too long keys
926          */
927         if (max_key_length - attr_len < min_key_length) {
928                 ldb_asprintf_errstring(
929                         ldb,
930                         __location__ ": max_key_length "
931                         "is too small (%u) < (%u)",
932                         max_key_length,
933                         (unsigned)(min_key_length + attr_len));
934                 talloc_free(attr_folded);
935                 return NULL;
936         }
937
938         /*
939          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
940          * "DN=" and a trailing string terminator
941          */
942         max_key_length -= additional_key_length;
943
944         /*
945          * We do not base 64 encode a DN in a key, it has already been
946          * casefold and lineraized, that is good enough.  That already
947          * avoids embedded NUL etc.
948          */
949         if (ldb_kv->cache->GUID_index_attribute != NULL) {
950                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
951                         should_b64_encode = false;
952                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
953                         /*
954                          * We can only change the behaviour for IDXONE
955                          * when the GUID index is enabled
956                          */
957                         should_b64_encode = false;
958                 } else {
959                         should_b64_encode
960                                 = ldb_should_b64_encode(ldb, &v);
961                 }
962         } else {
963                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
964         }
965
966         if (should_b64_encode) {
967                 size_t vstr_len = 0;
968                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
969                 if (!vstr) {
970                         talloc_free(attr_folded);
971                         return NULL;
972                 }
973                 vstr_len = strlen(vstr);
974                 /*
975                  * Overflow here is not critical as we only use this
976                  * to choose the printf truncation
977                  */
978                 key_len = num_separators + indx_len + attr_len + vstr_len;
979                 if (key_len > max_key_length) {
980                         size_t excess = key_len - max_key_length;
981                         frmt_len = vstr_len - excess;
982                         *truncation = KEY_TRUNCATED;
983                         /*
984                         * Truncated keys are placed in a separate key space
985                         * from the non truncated keys
986                         * Note: the double hash "##" is not a typo and
987                         * indicates that the following value is base64 encoded
988                         */
989                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
990                                              LDB_KV_INDEX, attr_for_dn,
991                                              frmt_len, vstr);
992                 } else {
993                         frmt_len = vstr_len;
994                         *truncation = KEY_NOT_TRUNCATED;
995                         /*
996                          * Note: the double colon "::" is not a typo and
997                          * indicates that the following value is base64 encoded
998                          */
999                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1000                                              LDB_KV_INDEX, attr_for_dn,
1001                                              frmt_len, vstr);
1002                 }
1003                 talloc_free(vstr);
1004         } else {
1005                 /* Only need two seperators */
1006                 num_separators = 2;
1007
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = v.length - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                          * Truncated keys are placed in a separate key space
1019                          * from the non truncated keys
1020                          */
1021                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1022                                              LDB_KV_INDEX, attr_for_dn,
1023                                              frmt_len, (char *)v.data);
1024                 } else {
1025                         frmt_len = v.length;
1026                         *truncation = KEY_NOT_TRUNCATED;
1027                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1028                                              LDB_KV_INDEX, attr_for_dn,
1029                                              frmt_len, (char *)v.data);
1030                 }
1031         }
1032
1033         if (v.data != value->data) {
1034                 talloc_free(v.data);
1035         }
1036         talloc_free(attr_folded);
1037
1038         return ret;
1039 }
1040
1041 /*
1042   see if a attribute value is in the list of indexed attributes
1043 */
1044 static bool ldb_kv_is_indexed(struct ldb_module *module,
1045                               struct ldb_kv_private *ldb_kv,
1046                               const char *attr)
1047 {
1048         struct ldb_context *ldb = ldb_module_get_ctx(module);
1049         unsigned int i;
1050         struct ldb_message_element *el;
1051
1052         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1053             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1054                 /* Implicity covered, this is the index key */
1055                 return false;
1056         }
1057         if (ldb->schema.index_handler_override) {
1058                 const struct ldb_schema_attribute *a
1059                         = ldb_schema_attribute_by_name(ldb, attr);
1060
1061                 if (a == NULL) {
1062                         return false;
1063                 }
1064
1065                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1066                         return true;
1067                 } else {
1068                         return false;
1069                 }
1070         }
1071
1072         if (!ldb_kv->cache->attribute_indexes) {
1073                 return false;
1074         }
1075
1076         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1077         if (el == NULL) {
1078                 return false;
1079         }
1080
1081         /* TODO: this is too expensive! At least use a binary search */
1082         for (i=0; i<el->num_values; i++) {
1083                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1084                         return true;
1085                 }
1086         }
1087         return false;
1088 }
1089
1090 /*
1091   in the following logic functions, the return value is treated as
1092   follows:
1093
1094      LDB_SUCCESS: we found some matching index values
1095
1096      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1097
1098      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1099                                we'll need a full search
1100  */
1101
1102 /*
1103   return a list of dn's that might match a simple indexed search (an
1104   equality search only)
1105  */
1106 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1107                                   struct ldb_kv_private *ldb_kv,
1108                                   const struct ldb_parse_tree *tree,
1109                                   struct dn_list *list)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_dn *dn;
1113         int ret;
1114         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1115
1116         ldb = ldb_module_get_ctx(module);
1117
1118         list->count = 0;
1119         list->dn = NULL;
1120
1121         /* if the attribute isn't in the list of indexed attributes then
1122            this node needs a full search */
1123         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* the attribute is indexed. Pull the list of DNs that match the
1128            search criterion */
1129         dn = ldb_kv_index_key(ldb,
1130                               ldb_kv,
1131                               tree->u.equality.attr,
1132                               &tree->u.equality.value,
1133                               NULL,
1134                               &truncation);
1135         /*
1136          * We ignore truncation here and allow multi-valued matches
1137          * as ltdb_search_indexed will filter out the wrong one in
1138          * ltdb_index_filter() which calls ldb_match_message().
1139          */
1140         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1141
1142         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1143         talloc_free(dn);
1144         return ret;
1145 }
1146
1147 static bool list_union(struct ldb_context *ldb,
1148                        struct ldb_kv_private *ldb_kv,
1149                        struct dn_list *list,
1150                        struct dn_list *list2);
1151
1152 /*
1153   return a list of dn's that might match a leaf indexed search
1154  */
1155 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1156                                 struct ldb_kv_private *ldb_kv,
1157                                 const struct ldb_parse_tree *tree,
1158                                 struct dn_list *list)
1159 {
1160         if (ldb_kv->disallow_dn_filter &&
1161             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1162                 /* in AD mode we do not support "(dn=...)" search filters */
1163                 list->dn = NULL;
1164                 list->count = 0;
1165                 return LDB_SUCCESS;
1166         }
1167         if (tree->u.equality.attr[0] == '@') {
1168                 /* Do not allow a indexed search against an @ */
1169                 list->dn = NULL;
1170                 list->count = 0;
1171                 return LDB_SUCCESS;
1172         }
1173         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1174                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1175                 bool valid_dn = false;
1176                 struct ldb_dn *dn
1177                         = ldb_dn_from_ldb_val(list,
1178                                               ldb_module_get_ctx(module),
1179                                               &tree->u.equality.value);
1180                 if (dn == NULL) {
1181                         /* If we can't parse it, no match */
1182                         list->dn = NULL;
1183                         list->count = 0;
1184                         return LDB_SUCCESS;
1185                 }
1186
1187                 valid_dn = ldb_dn_validate(dn);
1188                 if (valid_dn == false) {
1189                         /* If we can't parse it, no match */
1190                         list->dn = NULL;
1191                         list->count = 0;
1192                         return LDB_SUCCESS;
1193                 }
1194
1195                 /*
1196                  * Re-use the same code we use for a SCOPE_BASE
1197                  * search
1198                  *
1199                  * We can't call TALLOC_FREE(dn) as this must belong
1200                  * to list for the memory to remain valid.
1201                  */
1202                 return ldb_kv_index_dn_base_dn(
1203                     module, ldb_kv, dn, list, &truncation);
1204                 /*
1205                  * We ignore truncation here and allow multi-valued matches
1206                  * as ltdb_search_indexed will filter out the wrong one in
1207                  * ltdb_index_filter() which calls ldb_match_message().
1208                  */
1209
1210         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1211                    (ldb_attr_cmp(tree->u.equality.attr,
1212                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1213                 int ret;
1214                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1215                 list->dn = talloc_array(list, struct ldb_val, 1);
1216                 if (list->dn == NULL) {
1217                         ldb_module_oom(module);
1218                         return LDB_ERR_OPERATIONS_ERROR;
1219                 }
1220                 /*
1221                  * We need to go via the canonicalise_fn() to
1222                  * ensure we get the index in binary, rather
1223                  * than a string
1224                  */
1225                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1226                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1227                 if (ret != LDB_SUCCESS) {
1228                         return LDB_ERR_OPERATIONS_ERROR;
1229                 }
1230                 list->count = 1;
1231                 return LDB_SUCCESS;
1232         }
1233
1234         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1235 }
1236
1237
1238 /*
1239   list intersection
1240   list = list & list2
1241 */
1242 static bool list_intersect(struct ldb_context *ldb,
1243                            struct ldb_kv_private *ldb_kv,
1244                            struct dn_list *list,
1245                            const struct dn_list *list2)
1246 {
1247         const struct dn_list *short_list, *long_list;
1248         struct dn_list *list3;
1249         unsigned int i;
1250
1251         if (list->count == 0) {
1252                 /* 0 & X == 0 */
1253                 return true;
1254         }
1255         if (list2->count == 0) {
1256                 /* X & 0 == 0 */
1257                 list->count = 0;
1258                 list->dn = NULL;
1259                 return true;
1260         }
1261
1262         /* the indexing code is allowed to return a longer list than
1263            what really matches, as all results are filtered by the
1264            full expression at the end - this shortcut avoids a lot of
1265            work in some cases */
1266         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1267                 return true;
1268         }
1269         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1270                 list->count = list2->count;
1271                 list->dn = list2->dn;
1272                 /* note that list2 may not be the parent of list2->dn,
1273                    as list2->dn may be owned by ltdb->idxptr. In that
1274                    case we expect this reparent call to fail, which is
1275                    OK */
1276                 talloc_reparent(list2, list, list2->dn);
1277                 return true;
1278         }
1279
1280         if (list->count > list2->count) {
1281                 short_list = list2;
1282                 long_list = list;
1283         } else {
1284                 short_list = list;
1285                 long_list = list2;
1286         }
1287
1288         list3 = talloc_zero(list, struct dn_list);
1289         if (list3 == NULL) {
1290                 return false;
1291         }
1292
1293         list3->dn = talloc_array(list3, struct ldb_val,
1294                                  MIN(list->count, list2->count));
1295         if (!list3->dn) {
1296                 talloc_free(list3);
1297                 return false;
1298         }
1299         list3->count = 0;
1300
1301         for (i=0;i<short_list->count;i++) {
1302                 /* For the GUID index case, this is a binary search */
1303                 if (ldb_kv_dn_list_find_val(
1304                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1305                         list3->dn[list3->count] = short_list->dn[i];
1306                         list3->count++;
1307                 }
1308         }
1309
1310         list->strict |= list2->strict;
1311         list->dn = talloc_steal(list, list3->dn);
1312         list->count = list3->count;
1313         talloc_free(list3);
1314
1315         return true;
1316 }
1317
1318
1319 /*
1320   list union
1321   list = list | list2
1322 */
1323 static bool list_union(struct ldb_context *ldb,
1324                        struct ldb_kv_private *ldb_kv,
1325                        struct dn_list *list,
1326                        struct dn_list *list2)
1327 {
1328         struct ldb_val *dn3;
1329         unsigned int i = 0, j = 0, k = 0;
1330
1331         if (list2->count == 0) {
1332                 /* X | 0 == X */
1333                 return true;
1334         }
1335
1336         if (list->count == 0) {
1337                 /* 0 | X == X */
1338                 list->count = list2->count;
1339                 list->dn = list2->dn;
1340                 /* note that list2 may not be the parent of list2->dn,
1341                    as list2->dn may be owned by ltdb->idxptr. In that
1342                    case we expect this reparent call to fail, which is
1343                    OK */
1344                 talloc_reparent(list2, list, list2->dn);
1345                 return true;
1346         }
1347
1348         /*
1349          * Sort the lists (if not in GUID DN mode) so we can do
1350          * the de-duplication during the merge
1351          *
1352          * NOTE: This can sort the in-memory index values, as list or
1353          * list2 might not be a copy!
1354          */
1355         ldb_kv_dn_list_sort(ldb_kv, list);
1356         ldb_kv_dn_list_sort(ldb_kv, list2);
1357
1358         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1359         if (!dn3) {
1360                 ldb_oom(ldb);
1361                 return false;
1362         }
1363
1364         while (i < list->count || j < list2->count) {
1365                 int cmp;
1366                 if (i >= list->count) {
1367                         cmp = 1;
1368                 } else if (j >= list2->count) {
1369                         cmp = -1;
1370                 } else {
1371                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1372                                                           &list2->dn[j]);
1373                 }
1374
1375                 if (cmp < 0) {
1376                         /* Take list */
1377                         dn3[k] = list->dn[i];
1378                         i++;
1379                         k++;
1380                 } else if (cmp > 0) {
1381                         /* Take list2 */
1382                         dn3[k] = list2->dn[j];
1383                         j++;
1384                         k++;
1385                 } else {
1386                         /* Equal, take list */
1387                         dn3[k] = list->dn[i];
1388                         i++;
1389                         j++;
1390                         k++;
1391                 }
1392         }
1393
1394         list->dn = dn3;
1395         list->count = k;
1396
1397         return true;
1398 }
1399
1400 static int ldb_kv_index_dn(struct ldb_module *module,
1401                            struct ldb_kv_private *ldb_kv,
1402                            const struct ldb_parse_tree *tree,
1403                            struct dn_list *list);
1404
1405 /*
1406   process an OR list (a union)
1407  */
1408 static int ldb_kv_index_dn_or(struct ldb_module *module,
1409                               struct ldb_kv_private *ldb_kv,
1410                               const struct ldb_parse_tree *tree,
1411                               struct dn_list *list)
1412 {
1413         struct ldb_context *ldb;
1414         unsigned int i;
1415
1416         ldb = ldb_module_get_ctx(module);
1417
1418         list->dn = NULL;
1419         list->count = 0;
1420
1421         for (i=0; i<tree->u.list.num_elements; i++) {
1422                 struct dn_list *list2;
1423                 int ret;
1424
1425                 list2 = talloc_zero(list, struct dn_list);
1426                 if (list2 == NULL) {
1427                         return LDB_ERR_OPERATIONS_ERROR;
1428                 }
1429
1430                 ret = ldb_kv_index_dn(
1431                     module, ldb_kv, tree->u.list.elements[i], list2);
1432
1433                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1434                         /* X || 0 == X */
1435                         talloc_free(list2);
1436                         continue;
1437                 }
1438
1439                 if (ret != LDB_SUCCESS) {
1440                         /* X || * == * */
1441                         talloc_free(list2);
1442                         return ret;
1443                 }
1444
1445                 if (!list_union(ldb, ldb_kv, list, list2)) {
1446                         talloc_free(list2);
1447                         return LDB_ERR_OPERATIONS_ERROR;
1448                 }
1449         }
1450
1451         if (list->count == 0) {
1452                 return LDB_ERR_NO_SUCH_OBJECT;
1453         }
1454
1455         return LDB_SUCCESS;
1456 }
1457
1458
1459 /*
1460   NOT an index results
1461  */
1462 static int ldb_kv_index_dn_not(struct ldb_module *module,
1463                                struct ldb_kv_private *ldb_kv,
1464                                const struct ldb_parse_tree *tree,
1465                                struct dn_list *list)
1466 {
1467         /* the only way to do an indexed not would be if we could
1468            negate the not via another not or if we knew the total
1469            number of database elements so we could know that the
1470            existing expression covered the whole database.
1471
1472            instead, we just give up, and rely on a full index scan
1473            (unless an outer & manages to reduce the list)
1474         */
1475         return LDB_ERR_OPERATIONS_ERROR;
1476 }
1477
1478 /*
1479  * These things are unique, so avoid a full scan if this is a search
1480  * by GUID, DN or a unique attribute
1481  */
1482 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1483                                 struct ldb_kv_private *ldb_kv,
1484                                 const char *attr)
1485 {
1486         const struct ldb_schema_attribute *a;
1487         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1488                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1489                     0) {
1490                         return true;
1491                 }
1492         }
1493         if (ldb_attr_dn(attr) == 0) {
1494                 return true;
1495         }
1496
1497         a = ldb_schema_attribute_by_name(ldb, attr);
1498         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1499                 return true;
1500         }
1501         return false;
1502 }
1503
1504 /*
1505   process an AND expression (intersection)
1506  */
1507 static int ldb_kv_index_dn_and(struct ldb_module *module,
1508                                struct ldb_kv_private *ldb_kv,
1509                                const struct ldb_parse_tree *tree,
1510                                struct dn_list *list)
1511 {
1512         struct ldb_context *ldb;
1513         unsigned int i;
1514         bool found;
1515
1516         ldb = ldb_module_get_ctx(module);
1517
1518         list->dn = NULL;
1519         list->count = 0;
1520
1521         /* in the first pass we only look for unique simple
1522            equality tests, in the hope of avoiding having to look
1523            at any others */
1524         for (i=0; i<tree->u.list.num_elements; i++) {
1525                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1526                 int ret;
1527
1528                 if (subtree->operation != LDB_OP_EQUALITY ||
1529                     !ldb_kv_index_unique(
1530                         ldb, ldb_kv, subtree->u.equality.attr)) {
1531                         continue;
1532                 }
1533
1534                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1535                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1536                         /* 0 && X == 0 */
1537                         return LDB_ERR_NO_SUCH_OBJECT;
1538                 }
1539                 if (ret == LDB_SUCCESS) {
1540                         /* a unique index match means we can
1541                          * stop. Note that we don't care if we return
1542                          * a few too many objects, due to later
1543                          * filtering */
1544                         return LDB_SUCCESS;
1545                 }
1546         }
1547
1548         /* now do a full intersection */
1549         found = false;
1550
1551         for (i=0; i<tree->u.list.num_elements; i++) {
1552                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1553                 struct dn_list *list2;
1554                 int ret;
1555
1556                 list2 = talloc_zero(list, struct dn_list);
1557                 if (list2 == NULL) {
1558                         return ldb_module_oom(module);
1559                 }
1560
1561                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1562
1563                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1564                         /* X && 0 == 0 */
1565                         list->dn = NULL;
1566                         list->count = 0;
1567                         talloc_free(list2);
1568                         return LDB_ERR_NO_SUCH_OBJECT;
1569                 }
1570
1571                 if (ret != LDB_SUCCESS) {
1572                         /* this didn't adding anything */
1573                         talloc_free(list2);
1574                         continue;
1575                 }
1576
1577                 if (!found) {
1578                         talloc_reparent(list2, list, list->dn);
1579                         list->dn = list2->dn;
1580                         list->count = list2->count;
1581                         found = true;
1582                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1583                         talloc_free(list2);
1584                         return LDB_ERR_OPERATIONS_ERROR;
1585                 }
1586
1587                 if (list->count == 0) {
1588                         list->dn = NULL;
1589                         return LDB_ERR_NO_SUCH_OBJECT;
1590                 }
1591
1592                 if (list->count < 2) {
1593                         /* it isn't worth loading the next part of the tree */
1594                         return LDB_SUCCESS;
1595                 }
1596         }
1597
1598         if (!found) {
1599                 /* none of the attributes were indexed */
1600                 return LDB_ERR_OPERATIONS_ERROR;
1601         }
1602
1603         return LDB_SUCCESS;
1604 }
1605
1606 /*
1607   return a list of matching objects using a one-level index
1608  */
1609 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1610                                 struct ldb_kv_private *ldb_kv,
1611                                 const char *attr,
1612                                 struct ldb_dn *dn,
1613                                 struct dn_list *list,
1614                                 enum key_truncation *truncation)
1615 {
1616         struct ldb_context *ldb;
1617         struct ldb_dn *key;
1618         struct ldb_val val;
1619         int ret;
1620
1621         ldb = ldb_module_get_ctx(module);
1622
1623         /* work out the index key from the parent DN */
1624         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1625         if (val.data == NULL) {
1626                 const char *dn_str = ldb_dn_get_linearized(dn);
1627                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1628                                        __location__
1629                                        ": Failed to get casefold DN "
1630                                        "from: %s",
1631                                        dn_str);
1632                 return LDB_ERR_OPERATIONS_ERROR;
1633         }
1634         val.length = strlen((char *)val.data);
1635         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1636         if (!key) {
1637                 ldb_oom(ldb);
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1642         talloc_free(key);
1643         if (ret != LDB_SUCCESS) {
1644                 return ret;
1645         }
1646
1647         if (list->count == 0) {
1648                 return LDB_ERR_NO_SUCH_OBJECT;
1649         }
1650
1651         return LDB_SUCCESS;
1652 }
1653
1654 /*
1655   return a list of matching objects using a one-level index
1656  */
1657 static int ldb_kv_index_dn_one(struct ldb_module *module,
1658                                struct ldb_kv_private *ldb_kv,
1659                                struct ldb_dn *parent_dn,
1660                                struct dn_list *list,
1661                                enum key_truncation *truncation)
1662 {
1663         /* Ensure we do not shortcut on intersection for this list */
1664         list->strict = true;
1665         return ldb_kv_index_dn_attr(
1666             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
1667 }
1668
1669 /*
1670   return a list of matching objects using the DN index
1671  */
1672 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1673                                    struct ldb_kv_private *ldb_kv,
1674                                    struct ldb_dn *base_dn,
1675                                    struct dn_list *dn_list,
1676                                    enum key_truncation *truncation)
1677 {
1678         const struct ldb_val *guid_val = NULL;
1679         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1680                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1681                 if (dn_list->dn == NULL) {
1682                         return ldb_module_oom(module);
1683                 }
1684                 dn_list->dn[0].data = discard_const_p(unsigned char,
1685                                                       ldb_dn_get_linearized(base_dn));
1686                 if (dn_list->dn[0].data == NULL) {
1687                         talloc_free(dn_list->dn);
1688                         return ldb_module_oom(module);
1689                 }
1690                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1691                 dn_list->count = 1;
1692
1693                 return LDB_SUCCESS;
1694         }
1695
1696         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1697                 guid_val = ldb_dn_get_extended_component(
1698                     base_dn, ldb_kv->cache->GUID_index_dn_component);
1699         }
1700
1701         if (guid_val != NULL) {
1702                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1703                 if (dn_list->dn == NULL) {
1704                         return ldb_module_oom(module);
1705                 }
1706                 dn_list->dn[0].data = guid_val->data;
1707                 dn_list->dn[0].length = guid_val->length;
1708                 dn_list->count = 1;
1709
1710                 return LDB_SUCCESS;
1711         }
1712
1713         return ldb_kv_index_dn_attr(
1714             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
1715 }
1716
1717 /*
1718   return a list of dn's that might match a indexed search or
1719   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1720  */
1721 static int ldb_kv_index_dn(struct ldb_module *module,
1722                            struct ldb_kv_private *ldb_kv,
1723                            const struct ldb_parse_tree *tree,
1724                            struct dn_list *list)
1725 {
1726         int ret = LDB_ERR_OPERATIONS_ERROR;
1727
1728         switch (tree->operation) {
1729         case LDB_OP_AND:
1730                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1731                 break;
1732
1733         case LDB_OP_OR:
1734                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1735                 break;
1736
1737         case LDB_OP_NOT:
1738                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1739                 break;
1740
1741         case LDB_OP_EQUALITY:
1742                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1743                 break;
1744
1745         case LDB_OP_SUBSTRING:
1746         case LDB_OP_GREATER:
1747         case LDB_OP_LESS:
1748         case LDB_OP_PRESENT:
1749         case LDB_OP_APPROX:
1750         case LDB_OP_EXTENDED:
1751                 /* we can't index with fancy bitops yet */
1752                 ret = LDB_ERR_OPERATIONS_ERROR;
1753                 break;
1754         }
1755
1756         return ret;
1757 }
1758
1759 /*
1760   filter a candidate dn_list from an indexed search into a set of results
1761   extracting just the given attributes
1762 */
1763 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1764                                const struct dn_list *dn_list,
1765                                struct ldb_kv_context *ac,
1766                                uint32_t *match_count,
1767                                enum key_truncation scope_one_truncation)
1768 {
1769         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1770         struct ldb_message *msg;
1771         struct ldb_message *filtered_msg;
1772         unsigned int i;
1773         unsigned int num_keys = 0;
1774         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
1775         struct ldb_val *keys = NULL;
1776
1777         /*
1778          * We have to allocate the key list (rather than just walk the
1779          * caller supplied list) as the callback could change the list
1780          * (by modifying an indexed attribute hosted in the in-memory
1781          * index cache!)
1782          */
1783         keys = talloc_array(ac, struct ldb_val, dn_list->count);
1784         if (keys == NULL) {
1785                 return ldb_module_oom(ac->module);
1786         }
1787
1788         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1789                 /*
1790                  * We speculate that the keys will be GUID based and so
1791                  * pre-fill in enough space for a GUID (avoiding a pile of
1792                  * small allocations)
1793                  */
1794                 struct guid_tdb_key {
1795                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
1796                 } *key_values = NULL;
1797
1798                 key_values = talloc_array(keys,
1799                                           struct guid_tdb_key,
1800                                           dn_list->count);
1801
1802                 if (key_values == NULL) {
1803                         talloc_free(keys);
1804                         return ldb_module_oom(ac->module);
1805                 }
1806                 for (i = 0; i < dn_list->count; i++) {
1807                         keys[i].data = key_values[i].guid_key;
1808                         keys[i].length = sizeof(key_values[i].guid_key);
1809                 }
1810         } else {
1811                 for (i = 0; i < dn_list->count; i++) {
1812                         keys[i].data = NULL;
1813                         keys[i].length = 0;
1814                 }
1815         }
1816
1817         for (i = 0; i < dn_list->count; i++) {
1818                 int ret;
1819
1820                 ret = ldb_kv_idx_to_key(
1821                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1822                 if (ret != LDB_SUCCESS) {
1823                         talloc_free(keys);
1824                         return ret;
1825                 }
1826
1827                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1828                         /*
1829                          * If we are in GUID index mode, then the dn_list is
1830                          * sorted.  If we got a duplicate, forget about it, as
1831                          * otherwise we would send the same entry back more
1832                          * than once.
1833                          *
1834                          * This is needed in the truncated DN case, or if a
1835                          * duplicate was forced in via
1836                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1837                          */
1838
1839                         if (memcmp(previous_guid_key,
1840                                    keys[num_keys].data,
1841                                    sizeof(previous_guid_key)) == 0) {
1842                                 continue;
1843                         }
1844
1845                         memcpy(previous_guid_key,
1846                                keys[num_keys].data,
1847                                sizeof(previous_guid_key));
1848                 }
1849                 num_keys++;
1850         }
1851
1852
1853         /*
1854          * Now that the list is a safe copy, send the callbacks
1855          */
1856         for (i = 0; i < num_keys; i++) {
1857                 int ret;
1858                 bool matched;
1859                 msg = ldb_msg_new(ac);
1860                 if (!msg) {
1861                         talloc_free(keys);
1862                         return LDB_ERR_OPERATIONS_ERROR;
1863                 }
1864
1865                 ret =
1866                     ldb_kv_search_key(ac->module,
1867                                       ldb_kv,
1868                                       keys[i],
1869                                       msg,
1870                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1871                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1872                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1873                         /*
1874                          * the record has disappeared? yes, this can
1875                          * happen if the entry is deleted by something
1876                          * operating in the callback (not another
1877                          * process, as we have a read lock)
1878                          */
1879                         talloc_free(msg);
1880                         continue;
1881                 }
1882
1883                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1884                         /* an internal error */
1885                         talloc_free(keys);
1886                         talloc_free(msg);
1887                         return LDB_ERR_OPERATIONS_ERROR;
1888                 }
1889
1890                 /*
1891                  * We trust the index for LDB_SCOPE_ONELEVEL
1892                  * unless the index key has been truncated.
1893                  *
1894                  * LDB_SCOPE_BASE is not passed in by our only caller.
1895                  */
1896                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
1897                     ldb_kv->cache->one_level_indexes &&
1898                     scope_one_truncation == KEY_NOT_TRUNCATED) {
1899                         ret = ldb_match_message(ldb, msg, ac->tree,
1900                                                 ac->scope, &matched);
1901                 } else {
1902                         ret = ldb_match_msg_error(ldb, msg,
1903                                                   ac->tree, ac->base,
1904                                                   ac->scope, &matched);
1905                 }
1906
1907                 if (ret != LDB_SUCCESS) {
1908                         talloc_free(keys);
1909                         talloc_free(msg);
1910                         return ret;
1911                 }
1912                 if (!matched) {
1913                         talloc_free(msg);
1914                         continue;
1915                 }
1916
1917                 /* filter the attributes that the user wants */
1918                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1919
1920                 talloc_free(msg);
1921
1922                 if (ret == -1) {
1923                         talloc_free(keys);
1924                         return LDB_ERR_OPERATIONS_ERROR;
1925                 }
1926
1927                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1928                 if (ret != LDB_SUCCESS) {
1929                         /* Regardless of success or failure, the msg
1930                          * is the callbacks responsiblity, and should
1931                          * not be talloc_free()'ed */
1932                         ac->request_terminated = true;
1933                         talloc_free(keys);
1934                         return ret;
1935                 }
1936
1937                 (*match_count)++;
1938         }
1939
1940         TALLOC_FREE(keys);
1941         return LDB_SUCCESS;
1942 }
1943
1944 /*
1945   sort a DN list
1946  */
1947 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
1948                                 struct dn_list *list)
1949 {
1950         if (list->count < 2) {
1951                 return;
1952         }
1953
1954         /* We know the list is sorted when using the GUID index */
1955         if (ltdb->cache->GUID_index_attribute != NULL) {
1956                 return;
1957         }
1958
1959         TYPESAFE_QSORT(list->dn, list->count,
1960                        ldb_val_equal_exact_for_qsort);
1961 }
1962
1963 /*
1964   search the database with a LDAP-like expression using indexes
1965   returns -1 if an indexed search is not possible, in which
1966   case the caller should call ltdb_search_full()
1967 */
1968 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1969 {
1970         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1971         struct ldb_kv_private *ldb_kv = talloc_get_type(
1972             ldb_module_get_private(ac->module), struct ldb_kv_private);
1973         struct dn_list *dn_list;
1974         int ret;
1975         enum ldb_scope index_scope;
1976         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1977
1978         /* see if indexing is enabled */
1979         if (!ldb_kv->cache->attribute_indexes &&
1980             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
1981                 /* fallback to a full search */
1982                 return LDB_ERR_OPERATIONS_ERROR;
1983         }
1984
1985         dn_list = talloc_zero(ac, struct dn_list);
1986         if (dn_list == NULL) {
1987                 return ldb_module_oom(ac->module);
1988         }
1989
1990         /*
1991          * For the purposes of selecting the switch arm below, if we
1992          * don't have a one-level index then treat it like a subtree
1993          * search
1994          */
1995         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1996             !ldb_kv->cache->one_level_indexes) {
1997                 index_scope = LDB_SCOPE_SUBTREE;
1998         } else {
1999                 index_scope = ac->scope;
2000         }
2001
2002         switch (index_scope) {
2003         case LDB_SCOPE_BASE:
2004                 /*
2005                  * The only caller will have filtered the operation out
2006                  * so we should never get here
2007                  */
2008                 return ldb_operr(ldb);
2009
2010         case LDB_SCOPE_ONELEVEL:
2011                 ret = ldb_kv_index_dn_one(ac->module,
2012                                           ldb_kv,
2013                                           ac->base,
2014                                           dn_list,
2015                                           &scope_one_truncation);
2016                 if (ret != LDB_SUCCESS) {
2017                         talloc_free(dn_list);
2018                         return ret;
2019                 }
2020
2021                 /*
2022                  * If we have too many matches, running the filter
2023                  * tree over the SCOPE_ONELEVEL can be quite expensive
2024                  * so we now check the filter tree index as well.
2025                  *
2026                  * We only do this in the GUID index mode, which is
2027                  * O(n*log(m)) otherwise the intersection below will
2028                  * be too costly at O(n*m).
2029                  *
2030                  * We don't set a heuristic for 'too many' but instead
2031                  * do it always and rely on the index lookup being
2032                  * fast enough in the small case.
2033                  */
2034                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2035                         struct dn_list *idx_one_tree_list
2036                                 = talloc_zero(ac, struct dn_list);
2037                         if (idx_one_tree_list == NULL) {
2038                                 talloc_free(dn_list);
2039                                 return ldb_module_oom(ac->module);
2040                         }
2041
2042                         if (!ldb_kv->cache->attribute_indexes) {
2043                                 talloc_free(idx_one_tree_list);
2044                                 talloc_free(dn_list);
2045                                 return LDB_ERR_OPERATIONS_ERROR;
2046                         }
2047                         /*
2048                          * Here we load the index for the tree.
2049                          */
2050                         ret = ldb_kv_index_dn(
2051                             ac->module, ldb_kv, ac->tree, idx_one_tree_list);
2052
2053                         /*
2054                          * We can stop if we're sure the object doesn't exist
2055                          */
2056                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2057                                 talloc_free(idx_one_tree_list);
2058                                 talloc_free(dn_list);
2059                                 return LDB_ERR_NO_SUCH_OBJECT;
2060                         }
2061
2062                         /*
2063                          * We only care if this is successful, if the
2064                          * index can't trim the result list down then
2065                          * the ONELEVEL index is still good enough.
2066                          */
2067                         if (ret == LDB_SUCCESS) {
2068                                 if (!list_intersect(ldb,
2069                                                     ldb_kv,
2070                                                     dn_list,
2071                                                     idx_one_tree_list)) {
2072                                         talloc_free(idx_one_tree_list);
2073                                         talloc_free(dn_list);
2074                                         return LDB_ERR_OPERATIONS_ERROR;
2075                                 }
2076                         }
2077                 }
2078                 break;
2079
2080         case LDB_SCOPE_SUBTREE:
2081         case LDB_SCOPE_DEFAULT:
2082                 if (!ldb_kv->cache->attribute_indexes) {
2083                         talloc_free(dn_list);
2084                         return LDB_ERR_OPERATIONS_ERROR;
2085                 }
2086                 /*
2087                  * Here we load the index for the tree.  We have no
2088                  * index for the subtree.
2089                  */
2090                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2091                 if (ret != LDB_SUCCESS) {
2092                         talloc_free(dn_list);
2093                         return ret;
2094                 }
2095                 break;
2096         }
2097
2098         /*
2099          * It is critical that this function do the re-filter even
2100          * on things found by the index as the index can over-match
2101          * in cases of truncation (as well as when it decides it is
2102          * not worth further filtering)
2103          *
2104          * If this changes, then the index code above would need to
2105          * pass up a flag to say if any index was truncated during
2106          * processing as the truncation here refers only to the
2107          * SCOPE_ONELEVEL index.
2108          */
2109         ret = ldb_kv_index_filter(
2110             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2111         talloc_free(dn_list);
2112         return ret;
2113 }
2114
2115 /**
2116  * @brief Add a DN in the index list of a given attribute name/value pair
2117  *
2118  * This function will add the DN in the index list for the index for
2119  * the given attribute name and value.
2120  *
2121  * @param[in]  module       A ldb_module structure
2122  *
2123  * @param[in]  dn           The string representation of the DN as it
2124  *                          will be stored in the index entry
2125  *
2126  * @param[in]  el           A ldb_message_element array, one of the entry
2127  *                          referred by the v_idx is the attribute name and
2128  *                          value pair which will be used to construct the
2129  *                          index name
2130  *
2131  * @param[in]  v_idx        The index of element in the el array to use
2132  *
2133  * @return                  An ldb error code
2134  */
2135 static int ldb_kv_index_add1(struct ldb_module *module,
2136                              struct ldb_kv_private *ldb_kv,
2137                              const struct ldb_message *msg,
2138                              struct ldb_message_element *el,
2139                              int v_idx)
2140 {
2141         struct ldb_context *ldb;
2142         struct ldb_dn *dn_key;
2143         int ret;
2144         const struct ldb_schema_attribute *a;
2145         struct dn_list *list;
2146         unsigned alloc_len;
2147         enum key_truncation truncation = KEY_TRUNCATED;
2148
2149
2150         ldb = ldb_module_get_ctx(module);
2151
2152         list = talloc_zero(module, struct dn_list);
2153         if (list == NULL) {
2154                 return LDB_ERR_OPERATIONS_ERROR;
2155         }
2156
2157         dn_key = ldb_kv_index_key(
2158             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2159         if (!dn_key) {
2160                 talloc_free(list);
2161                 return LDB_ERR_OPERATIONS_ERROR;
2162         }
2163         /*
2164          * Samba only maintains unique indexes on the objectSID and objectGUID
2165          * so if a unique index key exceeds the maximum length there is a
2166          * problem.
2167          */
2168         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2169                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2170                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2171
2172                 ldb_asprintf_errstring(
2173                     ldb,
2174                     __location__ ": unique index key on %s in %s, "
2175                                  "exceeds maximum key length of %u (encoded).",
2176                     el->name,
2177                     ldb_dn_get_linearized(msg->dn),
2178                     ldb_kv->max_key_length);
2179                 talloc_free(list);
2180                 return LDB_ERR_CONSTRAINT_VIOLATION;
2181         }
2182         talloc_steal(list, dn_key);
2183
2184         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2185         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2186                 talloc_free(list);
2187                 return ret;
2188         }
2189
2190         /*
2191          * Check for duplicates in the @IDXDN DN -> GUID record
2192          *
2193          * This is very normal, it just means a duplicate DN creation
2194          * was attempted, so don't set the error string or print scary
2195          * messages.
2196          */
2197         if (list->count > 0 &&
2198             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2199             truncation == KEY_NOT_TRUNCATED) {
2200
2201                 talloc_free(list);
2202                 return LDB_ERR_CONSTRAINT_VIOLATION;
2203
2204         } else if (list->count > 0
2205                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2206
2207                 /*
2208                  * At least one existing entry in the DN->GUID index, which
2209                  * arises when the DN indexes have been truncated
2210                  *
2211                  * So need to pull the DN's to check if it's really a duplicate
2212                  */
2213                 int i;
2214                 for (i=0; i < list->count; i++) {
2215                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2216                         struct ldb_val key = {
2217                                 .data = guid_key,
2218                                 .length = sizeof(guid_key)
2219                         };
2220                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2221                         struct ldb_message *rec = ldb_msg_new(ldb);
2222                         if (rec == NULL) {
2223                                 return LDB_ERR_OPERATIONS_ERROR;
2224                         }
2225
2226                         ret = ldb_kv_idx_to_key(
2227                             module, ldb_kv, ldb, &list->dn[i], &key);
2228                         if (ret != LDB_SUCCESS) {
2229                                 TALLOC_FREE(list);
2230                                 TALLOC_FREE(rec);
2231                                 return ret;
2232                         }
2233
2234                         ret =
2235                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2236                         if (key.data != guid_key) {
2237                                 TALLOC_FREE(key.data);
2238                         }
2239                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2240                                 /*
2241                                  * the record has disappeared?
2242                                  * yes, this can happen
2243                                  */
2244                                 talloc_free(rec);
2245                                 continue;
2246                         }
2247
2248                         if (ret != LDB_SUCCESS) {
2249                                 /* an internal error */
2250                                 TALLOC_FREE(rec);
2251                                 TALLOC_FREE(list);
2252                                 return LDB_ERR_OPERATIONS_ERROR;
2253                         }
2254                         /*
2255                          * The DN we are trying to add to the DB and index
2256                          * is already here, so we must deny the addition
2257                          */
2258                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2259                                 TALLOC_FREE(rec);
2260                                 TALLOC_FREE(list);
2261                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2262                         }
2263                 }
2264         }
2265
2266         /*
2267          * Check for duplicates in unique indexes
2268          *
2269          * We don't need to do a loop test like the @IDXDN case
2270          * above as we have a ban on long unique index values
2271          * at the start of this function.
2272          */
2273         if (list->count > 0 &&
2274             ((a != NULL
2275               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2276                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2277                 /*
2278                  * We do not want to print info about a possibly
2279                  * confidential DN that the conflict was with in the
2280                  * user-visible error string
2281                  */
2282
2283                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2284                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2285                                   __location__
2286                                   ": unique index violation on %s in %s, "
2287                                   "conficts with %*.*s in %s",
2288                                   el->name, ldb_dn_get_linearized(msg->dn),
2289                                   (int)list->dn[0].length,
2290                                   (int)list->dn[0].length,
2291                                   list->dn[0].data,
2292                                   ldb_dn_get_linearized(dn_key));
2293                 } else {
2294                         /* This can't fail, gives a default at worst */
2295                         const struct ldb_schema_attribute *attr =
2296                             ldb_schema_attribute_by_name(
2297                                 ldb, ldb_kv->cache->GUID_index_attribute);
2298                         struct ldb_val v;
2299                         ret = attr->syntax->ldif_write_fn(ldb, list,
2300                                                           &list->dn[0], &v);
2301                         if (ret == LDB_SUCCESS) {
2302                                 ldb_debug(ldb,
2303                                           LDB_DEBUG_WARNING,
2304                                           __location__
2305                                           ": unique index violation on %s in "
2306                                           "%s, conficts with %s %*.*s in %s",
2307                                           el->name,
2308                                           ldb_dn_get_linearized(msg->dn),
2309                                           ldb_kv->cache->GUID_index_attribute,
2310                                           (int)v.length,
2311                                           (int)v.length,
2312                                           v.data,
2313                                           ldb_dn_get_linearized(dn_key));
2314                         }
2315                 }
2316                 ldb_asprintf_errstring(ldb,
2317                                        __location__ ": unique index violation "
2318                                        "on %s in %s",
2319                                        el->name,
2320                                        ldb_dn_get_linearized(msg->dn));
2321                 talloc_free(list);
2322                 return LDB_ERR_CONSTRAINT_VIOLATION;
2323         }
2324
2325         /* overallocate the list a bit, to reduce the number of
2326          * realloc trigered copies */
2327         alloc_len = ((list->count+1)+7) & ~7;
2328         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2329         if (list->dn == NULL) {
2330                 talloc_free(list);
2331                 return LDB_ERR_OPERATIONS_ERROR;
2332         }
2333
2334         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2335                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2336                 list->dn[list->count].data
2337                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2338                 if (list->dn[list->count].data == NULL) {
2339                         talloc_free(list);
2340                         return LDB_ERR_OPERATIONS_ERROR;
2341                 }
2342                 list->dn[list->count].length = strlen(dn_str);
2343         } else {
2344                 const struct ldb_val *key_val;
2345                 struct ldb_val *exact = NULL, *next = NULL;
2346                 key_val = ldb_msg_find_ldb_val(
2347                     msg, ldb_kv->cache->GUID_index_attribute);
2348                 if (key_val == NULL) {
2349                         talloc_free(list);
2350                         return ldb_module_operr(module);
2351                 }
2352
2353                 if (key_val->length != LDB_KV_GUID_SIZE) {
2354                         talloc_free(list);
2355                         return ldb_module_operr(module);
2356                 }
2357
2358                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2359                                         *key_val, ldb_val_equal_exact_ordered,
2360                                         exact, next);
2361
2362                 /*
2363                  * Give a warning rather than fail, this could be a
2364                  * duplicate value in the record allowed by a caller
2365                  * forcing in the value with
2366                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2367                  */
2368                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2369                         /* This can't fail, gives a default at worst */
2370                         const struct ldb_schema_attribute *attr =
2371                             ldb_schema_attribute_by_name(
2372                                 ldb, ldb_kv->cache->GUID_index_attribute);
2373                         struct ldb_val v;
2374                         ret = attr->syntax->ldif_write_fn(ldb, list,
2375                                                           exact, &v);
2376                         if (ret == LDB_SUCCESS) {
2377                                 ldb_debug(ldb,
2378                                           LDB_DEBUG_WARNING,
2379                                           __location__
2380                                           ": duplicate attribute value in %s "
2381                                           "for index on %s, "
2382                                           "duplicate of %s %*.*s in %s",
2383                                           ldb_dn_get_linearized(msg->dn),
2384                                           el->name,
2385                                           ldb_kv->cache->GUID_index_attribute,
2386                                           (int)v.length,
2387                                           (int)v.length,
2388                                           v.data,
2389                                           ldb_dn_get_linearized(dn_key));
2390                         }
2391                 }
2392
2393                 if (next == NULL) {
2394                         next = &list->dn[list->count];
2395                 } else {
2396                         memmove(&next[1], next,
2397                                 sizeof(*next) * (list->count - (next - list->dn)));
2398                 }
2399                 *next = ldb_val_dup(list->dn, key_val);
2400                 if (next->data == NULL) {
2401                         talloc_free(list);
2402                         return ldb_module_operr(module);
2403                 }
2404         }
2405         list->count++;
2406
2407         ret = ldb_kv_dn_list_store(module, dn_key, list);
2408
2409         talloc_free(list);
2410
2411         return ret;
2412 }
2413
2414 /*
2415   add index entries for one elements in a message
2416  */
2417 static int ldb_kv_index_add_el(struct ldb_module *module,
2418                                struct ldb_kv_private *ldb_kv,
2419                                const struct ldb_message *msg,
2420                                struct ldb_message_element *el)
2421 {
2422         unsigned int i;
2423         for (i = 0; i < el->num_values; i++) {
2424                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2425                 if (ret != LDB_SUCCESS) {
2426                         return ret;
2427                 }
2428         }
2429
2430         return LDB_SUCCESS;
2431 }
2432
2433 /*
2434   add index entries for all elements in a message
2435  */
2436 static int ldb_kv_index_add_all(struct ldb_module *module,
2437                                 struct ldb_kv_private *ldb_kv,
2438                                 const struct ldb_message *msg)
2439 {
2440         struct ldb_message_element *elements = msg->elements;
2441         unsigned int i;
2442         const char *dn_str;
2443         int ret;
2444
2445         if (ldb_dn_is_special(msg->dn)) {
2446                 return LDB_SUCCESS;
2447         }
2448
2449         dn_str = ldb_dn_get_linearized(msg->dn);
2450         if (dn_str == NULL) {
2451                 return LDB_ERR_OPERATIONS_ERROR;
2452         }
2453
2454         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2455         if (ret != LDB_SUCCESS) {
2456                 return ret;
2457         }
2458
2459         if (!ldb_kv->cache->attribute_indexes) {
2460                 /* no indexed fields */
2461                 return LDB_SUCCESS;
2462         }
2463
2464         for (i = 0; i < msg->num_elements; i++) {
2465                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2466                         continue;
2467                 }
2468                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2469                 if (ret != LDB_SUCCESS) {
2470                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2471                         ldb_asprintf_errstring(ldb,
2472                                                __location__ ": Failed to re-index %s in %s - %s",
2473                                                elements[i].name, dn_str,
2474                                                ldb_errstring(ldb));
2475                         return ret;
2476                 }
2477         }
2478
2479         return LDB_SUCCESS;
2480 }
2481
2482
2483 /*
2484   insert a DN index for a message
2485 */
2486 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2487                                   struct ldb_kv_private *ldb_kv,
2488                                   const struct ldb_message *msg,
2489                                   struct ldb_dn *dn,
2490                                   const char *index,
2491                                   int add)
2492 {
2493         struct ldb_message_element el;
2494         struct ldb_val val;
2495         int ret;
2496
2497         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2498         if (val.data == NULL) {
2499                 const char *dn_str = ldb_dn_get_linearized(dn);
2500                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2501                                        __location__ ": Failed to modify %s "
2502                                                     "against %s in %s: failed "
2503                                                     "to get casefold DN",
2504                                        index,
2505                                        ldb_kv->cache->GUID_index_attribute,
2506                                        dn_str);
2507                 return LDB_ERR_OPERATIONS_ERROR;
2508         }
2509
2510         val.length = strlen((char *)val.data);
2511         el.name = index;
2512         el.values = &val;
2513         el.num_values = 1;
2514
2515         if (add) {
2516                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2517         } else { /* delete */
2518                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2519         }
2520
2521         if (ret != LDB_SUCCESS) {
2522                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2523                 const char *dn_str = ldb_dn_get_linearized(dn);
2524                 ldb_asprintf_errstring(ldb,
2525                                        __location__ ": Failed to modify %s "
2526                                                     "against %s in %s - %s",
2527                                        index,
2528                                        ldb_kv->cache->GUID_index_attribute,
2529                                        dn_str,
2530                                        ldb_errstring(ldb));
2531                 return ret;
2532         }
2533         return ret;
2534 }
2535
2536 /*
2537   insert a one level index for a message
2538 */
2539 static int ldb_kv_index_onelevel(struct ldb_module *module,
2540                                  const struct ldb_message *msg,
2541                                  int add)
2542 {
2543         struct ldb_kv_private *ldb_kv = talloc_get_type(
2544             ldb_module_get_private(module), struct ldb_kv_private);
2545         struct ldb_dn *pdn;
2546         int ret;
2547
2548         /* We index for ONE Level only if requested */
2549         if (!ldb_kv->cache->one_level_indexes) {
2550                 return LDB_SUCCESS;
2551         }
2552
2553         pdn = ldb_dn_get_parent(module, msg->dn);
2554         if (pdn == NULL) {
2555                 return LDB_ERR_OPERATIONS_ERROR;
2556         }
2557         ret =
2558             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2559
2560         talloc_free(pdn);
2561
2562         return ret;
2563 }
2564
2565 /*
2566   insert a one level index for a message
2567 */
2568 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2569                                       const struct ldb_message *msg,
2570                                       int add)
2571 {
2572         int ret;
2573         struct ldb_kv_private *ldb_kv = talloc_get_type(
2574             ldb_module_get_private(module), struct ldb_kv_private);
2575
2576         /* We index for DN only if using a GUID index */
2577         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2578                 return LDB_SUCCESS;
2579         }
2580
2581         ret = ldb_kv_modify_index_dn(
2582             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2583
2584         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2585                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2586                                        "Entry %s already exists",
2587                                        ldb_dn_get_linearized(msg->dn));
2588                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2589         }
2590         return ret;
2591 }
2592
2593 /*
2594   add the index entries for a new element in a record
2595   The caller guarantees that these element values are not yet indexed
2596 */
2597 int ldb_kv_index_add_element(struct ldb_module *module,
2598                              struct ldb_kv_private *ldb_kv,
2599                              const struct ldb_message *msg,
2600                              struct ldb_message_element *el)
2601 {
2602         if (ldb_dn_is_special(msg->dn)) {
2603                 return LDB_SUCCESS;
2604         }
2605         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2606                 return LDB_SUCCESS;
2607         }
2608         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2609 }
2610
2611 /*
2612   add the index entries for a new record
2613 */
2614 int ldb_kv_index_add_new(struct ldb_module *module,
2615                          struct ldb_kv_private *ldb_kv,
2616                          const struct ldb_message *msg)
2617 {
2618         int ret;
2619
2620         if (ldb_dn_is_special(msg->dn)) {
2621                 return LDB_SUCCESS;
2622         }
2623
2624         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2625         if (ret != LDB_SUCCESS) {
2626                 /*
2627                  * Because we can't trust the caller to be doing
2628                  * transactions properly, clean up any index for this
2629                  * entry rather than relying on a transaction
2630                  * cleanup
2631                  */
2632
2633                 ldb_kv_index_delete(module, msg);
2634                 return ret;
2635         }
2636
2637         ret = ldb_kv_index_onelevel(module, msg, 1);
2638         if (ret != LDB_SUCCESS) {
2639                 /*
2640                  * Because we can't trust the caller to be doing
2641                  * transactions properly, clean up any index for this
2642                  * entry rather than relying on a transaction
2643                  * cleanup
2644                  */
2645                 ldb_kv_index_delete(module, msg);
2646                 return ret;
2647         }
2648         return ret;
2649 }
2650
2651
2652 /*
2653   delete an index entry for one message element
2654 */
2655 int ldb_kv_index_del_value(struct ldb_module *module,
2656                            struct ldb_kv_private *ldb_kv,
2657                            const struct ldb_message *msg,
2658                            struct ldb_message_element *el,
2659                            unsigned int v_idx)
2660 {
2661         struct ldb_context *ldb;
2662         struct ldb_dn *dn_key;
2663         const char *dn_str;
2664         int ret, i;
2665         unsigned int j;
2666         struct dn_list *list;
2667         struct ldb_dn *dn = msg->dn;
2668         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2669
2670         ldb = ldb_module_get_ctx(module);
2671
2672         dn_str = ldb_dn_get_linearized(dn);
2673         if (dn_str == NULL) {
2674                 return LDB_ERR_OPERATIONS_ERROR;
2675         }
2676
2677         if (dn_str[0] == '@') {
2678                 return LDB_SUCCESS;
2679         }
2680
2681         dn_key = ldb_kv_index_key(
2682             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2683         /*
2684          * We ignore key truncation in ltdb_index_add1() so
2685          * match that by ignoring it here as well
2686          *
2687          * Multiple values are legitimate and accepted
2688          */
2689         if (!dn_key) {
2690                 return LDB_ERR_OPERATIONS_ERROR;
2691         }
2692
2693         list = talloc_zero(dn_key, struct dn_list);
2694         if (list == NULL) {
2695                 talloc_free(dn_key);
2696                 return LDB_ERR_OPERATIONS_ERROR;
2697         }
2698
2699         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2700         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2701                 /* it wasn't indexed. Did we have an earlier error? If we did then
2702                    its gone now */
2703                 talloc_free(dn_key);
2704                 return LDB_SUCCESS;
2705         }
2706
2707         if (ret != LDB_SUCCESS) {
2708                 talloc_free(dn_key);
2709                 return ret;
2710         }
2711
2712         /*
2713          * Find one of the values matching this message to remove
2714          */
2715         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2716         if (i == -1) {
2717                 /* nothing to delete */
2718                 talloc_free(dn_key);
2719                 return LDB_SUCCESS;
2720         }
2721
2722         j = (unsigned int) i;
2723         if (j != list->count - 1) {
2724                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2725         }
2726         list->count--;
2727         if (list->count == 0) {
2728                 talloc_free(list->dn);
2729                 list->dn = NULL;
2730         } else {
2731                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2732         }
2733
2734         ret = ldb_kv_dn_list_store(module, dn_key, list);
2735
2736         talloc_free(dn_key);
2737
2738         return ret;
2739 }
2740
2741 /*
2742   delete the index entries for a element
2743   return -1 on failure
2744 */
2745 int ldb_kv_index_del_element(struct ldb_module *module,
2746                              struct ldb_kv_private *ldb_kv,
2747                              const struct ldb_message *msg,
2748                              struct ldb_message_element *el)
2749 {
2750         const char *dn_str;
2751         int ret;
2752         unsigned int i;
2753
2754         if (!ldb_kv->cache->attribute_indexes) {
2755                 /* no indexed fields */
2756                 return LDB_SUCCESS;
2757         }
2758
2759         dn_str = ldb_dn_get_linearized(msg->dn);
2760         if (dn_str == NULL) {
2761                 return LDB_ERR_OPERATIONS_ERROR;
2762         }
2763
2764         if (dn_str[0] == '@') {
2765                 return LDB_SUCCESS;
2766         }
2767
2768         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2769                 return LDB_SUCCESS;
2770         }
2771         for (i = 0; i < el->num_values; i++) {
2772                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2773                 if (ret != LDB_SUCCESS) {
2774                         return ret;
2775                 }
2776         }
2777
2778         return LDB_SUCCESS;
2779 }
2780
2781 /*
2782   delete the index entries for a record
2783   return -1 on failure
2784 */
2785 int ldb_kv_index_delete(struct ldb_module *module,
2786                         const struct ldb_message *msg)
2787 {
2788         struct ldb_kv_private *ldb_kv = talloc_get_type(
2789             ldb_module_get_private(module), struct ldb_kv_private);
2790         int ret;
2791         unsigned int i;
2792
2793         if (ldb_dn_is_special(msg->dn)) {
2794                 return LDB_SUCCESS;
2795         }
2796
2797         ret = ldb_kv_index_onelevel(module, msg, 0);
2798         if (ret != LDB_SUCCESS) {
2799                 return ret;
2800         }
2801
2802         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2803         if (ret != LDB_SUCCESS) {
2804                 return ret;
2805         }
2806
2807         if (!ldb_kv->cache->attribute_indexes) {
2808                 /* no indexed fields */
2809                 return LDB_SUCCESS;
2810         }
2811
2812         for (i = 0; i < msg->num_elements; i++) {
2813                 ret = ldb_kv_index_del_element(
2814                     module, ldb_kv, msg, &msg->elements[i]);
2815                 if (ret != LDB_SUCCESS) {
2816                         return ret;
2817                 }
2818         }
2819
2820         return LDB_SUCCESS;
2821 }
2822
2823
2824 /*
2825   traversal function that deletes all @INDEX records in the in-memory
2826   TDB.
2827
2828   This does not touch the actual DB, that is done at transaction
2829   commit, which in turn greatly reduces DB churn as we will likely
2830   be able to do a direct update into the old record.
2831 */
2832 static int delete_index(struct ldb_kv_private *ldb_kv,
2833                         struct ldb_val key,
2834                         struct ldb_val data,
2835                         void *state)
2836 {
2837         struct ldb_module *module = state;
2838         const char *dnstr = "DN=" LDB_KV_INDEX ":";
2839         struct dn_list list;
2840         struct ldb_dn *dn;
2841         struct ldb_val v;
2842         int ret;
2843
2844         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2845                 return 0;
2846         }
2847         /* we need to put a empty list in the internal tdb for this
2848          * index entry */
2849         list.dn = NULL;
2850         list.count = 0;
2851
2852         /* the offset of 3 is to remove the DN= prefix. */
2853         v.data = key.data + 3;
2854         v.length = strnlen((char *)key.data, key.length) - 3;
2855
2856         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2857
2858         /*
2859          * This does not actually touch the DB quite yet, just
2860          * the in-memory index cache
2861          */
2862         ret = ldb_kv_dn_list_store(module, dn, &list);
2863         if (ret != LDB_SUCCESS) {
2864                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2865                                        "Unable to store null index for %s\n",
2866                                                 ldb_dn_get_linearized(dn));
2867                 talloc_free(dn);
2868                 return -1;
2869         }
2870         talloc_free(dn);
2871         return 0;
2872 }
2873
2874 /*
2875   traversal function that adds @INDEX records during a re index TODO wrong comment
2876 */
2877 static int re_key(struct ldb_kv_private *ldb_kv,
2878                   struct ldb_val key,
2879                   struct ldb_val val,
2880                   void *state)
2881 {
2882         struct ldb_context *ldb;
2883         struct ldb_kv_reindex_context *ctx =
2884             (struct ldb_kv_reindex_context *)state;
2885         struct ldb_module *module = ctx->module;
2886         struct ldb_message *msg;
2887         unsigned int nb_elements_in_db;
2888         int ret;
2889         struct ldb_val key2;
2890         bool is_record;
2891
2892         ldb = ldb_module_get_ctx(module);
2893
2894         if (key.length > 4 &&
2895             memcmp(key.data, "DN=@", 4) == 0) {
2896                 return 0;
2897         }
2898
2899         is_record = ldb_kv_key_is_record(key);
2900         if (is_record == false) {
2901                 return 0;
2902         }
2903
2904         msg = ldb_msg_new(module);
2905         if (msg == NULL) {
2906                 return -1;
2907         }
2908
2909         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2910                                                    msg,
2911                                                    NULL, 0,
2912                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2913                                                    &nb_elements_in_db);
2914         if (ret != 0) {
2915                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2916                                                 ldb_dn_get_linearized(msg->dn));
2917                 ctx->error = ret;
2918                 talloc_free(msg);
2919                 return -1;
2920         }
2921
2922         if (msg->dn == NULL) {
2923                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2924                           "Refusing to re-index as GUID "
2925                           "key %*.*s with no DN\n",
2926                           (int)key.length, (int)key.length,
2927                           (char *)key.data);
2928                 talloc_free(msg);
2929                 return -1;
2930         }
2931
2932         /* check if the DN key has changed, perhaps due to the case
2933            insensitivity of an element changing, or a change from DN
2934            to GUID keys */
2935         key2 = ldb_kv_key_msg(module, msg, msg);
2936         if (key2.data == NULL) {
2937                 /* probably a corrupt record ... darn */
2938                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2939                                                 ldb_dn_get_linearized(msg->dn));
2940                 talloc_free(msg);
2941                 return 0;
2942         }
2943         if (key.length != key2.length ||
2944             (memcmp(key.data, key2.data, key.length) != 0)) {
2945                 ldb_kv->kv_ops->update_in_iterate(
2946                     ldb_kv, key, key2, val, ctx);
2947         }
2948         talloc_free(key2.data);
2949
2950         talloc_free(msg);
2951
2952         ctx->count++;
2953         if (ctx->count % 10000 == 0) {
2954                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2955                           "Reindexing: re-keyed %u records so far",
2956                           ctx->count);
2957         }
2958
2959         return 0;
2960 }
2961
2962 /*
2963   traversal function that adds @INDEX records during a re index
2964 */
2965 static int re_index(struct ldb_kv_private *ldb_kv,
2966                     struct ldb_val key,
2967                     struct ldb_val val,
2968                     void *state)
2969 {
2970         struct ldb_context *ldb;
2971         struct ldb_kv_reindex_context *ctx =
2972             (struct ldb_kv_reindex_context *)state;
2973         struct ldb_module *module = ctx->module;
2974         struct ldb_message *msg;
2975         unsigned int nb_elements_in_db;
2976         int ret;
2977         bool is_record;
2978
2979         ldb = ldb_module_get_ctx(module);
2980
2981         if (key.length > 4 &&
2982             memcmp(key.data, "DN=@", 4) == 0) {
2983                 return 0;
2984         }
2985
2986         is_record = ldb_kv_key_is_record(key);
2987         if (is_record == false) {
2988                 return 0;
2989         }
2990
2991         msg = ldb_msg_new(module);
2992         if (msg == NULL) {
2993                 return -1;
2994         }
2995
2996         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2997                                                    msg,
2998                                                    NULL, 0,
2999                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
3000                                                    &nb_elements_in_db);
3001         if (ret != 0) {
3002                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3003                                                 ldb_dn_get_linearized(msg->dn));
3004                 ctx->error = ret;
3005                 talloc_free(msg);
3006                 return -1;
3007         }
3008
3009         if (msg->dn == NULL) {
3010                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3011                           "Refusing to re-index as GUID "
3012                           "key %*.*s with no DN\n",
3013                           (int)key.length, (int)key.length,
3014                           (char *)key.data);
3015                 talloc_free(msg);
3016                 return -1;
3017         }
3018
3019         ret = ldb_kv_index_onelevel(module, msg, 1);
3020         if (ret != LDB_SUCCESS) {
3021                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3022                           "Adding special ONE LEVEL index failed (%s)!",
3023                                                 ldb_dn_get_linearized(msg->dn));
3024                 talloc_free(msg);
3025                 return -1;
3026         }
3027
3028         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3029
3030         if (ret != LDB_SUCCESS) {
3031                 ctx->error = ret;
3032                 talloc_free(msg);
3033                 return -1;
3034         }
3035
3036         talloc_free(msg);
3037
3038         ctx->count++;
3039         if (ctx->count % 10000 == 0) {
3040                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3041                           "Reindexing: re-indexed %u records so far",
3042                           ctx->count);
3043         }
3044
3045         return 0;
3046 }
3047
3048 /*
3049   force a complete reindex of the database
3050 */
3051 int ldb_kv_reindex(struct ldb_module *module)
3052 {
3053         struct ldb_kv_private *ldb_kv = talloc_get_type(
3054             ldb_module_get_private(module), struct ldb_kv_private);
3055         int ret;
3056         struct ldb_kv_reindex_context ctx;
3057
3058         /*
3059          * Only triggered after a modification, but make clear we do
3060          * not re-index a read-only DB
3061          */
3062         if (ldb_kv->read_only) {
3063                 return LDB_ERR_UNWILLING_TO_PERFORM;
3064         }
3065
3066         if (ldb_kv_cache_reload(module) != 0) {
3067                 return LDB_ERR_OPERATIONS_ERROR;
3068         }
3069
3070         /*
3071          * Ensure we read (and so remove) the entries from the real
3072          * DB, no values stored so far are any use as we want to do a
3073          * re-index
3074          */
3075         ldb_kv_index_transaction_cancel(module);
3076
3077         ret = ldb_kv_index_transaction_start(module);
3078         if (ret != LDB_SUCCESS) {
3079                 return ret;
3080         }
3081
3082         /* first traverse the database deleting any @INDEX records by
3083          * putting NULL entries in the in-memory tdb
3084          */
3085         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3086         if (ret < 0) {
3087                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3088                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3089                                        ldb_errstring(ldb));
3090                 return LDB_ERR_OPERATIONS_ERROR;
3091         }
3092
3093         ctx.module = module;
3094         ctx.error = 0;
3095         ctx.count = 0;
3096
3097         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3098         if (ret < 0) {
3099                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3100                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3101                                        ldb_errstring(ldb));
3102                 return LDB_ERR_OPERATIONS_ERROR;
3103         }
3104
3105         if (ctx.error != LDB_SUCCESS) {
3106                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3107                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3108                 return ctx.error;
3109         }
3110
3111         ctx.error = 0;
3112         ctx.count = 0;
3113
3114         /* now traverse adding any indexes for normal LDB records */
3115         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3116         if (ret < 0) {
3117                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3118                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3119                                        ldb_errstring(ldb));
3120                 return LDB_ERR_OPERATIONS_ERROR;
3121         }
3122
3123         if (ctx.error != LDB_SUCCESS) {
3124                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3125                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3126                 return ctx.error;
3127         }
3128
3129         if (ctx.count > 10000) {
3130                 ldb_debug(ldb_module_get_ctx(module),
3131                           LDB_DEBUG_WARNING,
3132                           "Reindexing: re_index successful on %s, "
3133                           "final index write-out will be in transaction commit",
3134                           ldb_kv->kv_ops->name(ldb_kv));
3135         }
3136         return LDB_SUCCESS;
3137 }