ldb_kv_index: Add a giant comment in regards to index_format_fn
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(
203         struct ldb_module *module,
204         size_t cache_size)
205 {
206         struct ldb_kv_private *ldb_kv = talloc_get_type(
207             ldb_module_get_private(module), struct ldb_kv_private);
208         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
209         if (ldb_kv->idxptr == NULL) {
210                 return ldb_oom(ldb_module_get_ctx(module));
211         }
212
213         ldb_kv->idxptr->itdb = tdb_open(
214                 NULL,
215                 cache_size,
216                 TDB_INTERNAL,
217                 O_RDWR,
218                 0);
219         if (ldb_kv->idxptr->itdb == NULL) {
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         return LDB_SUCCESS;
224 }
225
226 /*
227   see if two ldb_val structures contain exactly the same data
228   return -1 or 1 for a mismatch, 0 for match
229 */
230 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
231                                          const struct ldb_val *v2)
232 {
233         if (v1->length > v2->length) {
234                 return -1;
235         }
236         if (v1->length < v2->length) {
237                 return 1;
238         }
239         return memcmp(v1->data, v2->data, v1->length);
240 }
241
242 /*
243   see if two ldb_val structures contain exactly the same data
244   return -1 or 1 for a mismatch, 0 for match
245 */
246 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
247                                        const struct ldb_val *v2)
248 {
249         if (v1.length > v2->length) {
250                 return -1;
251         }
252         if (v1.length < v2->length) {
253                 return 1;
254         }
255         return memcmp(v1.data, v2->data, v1.length);
256 }
257
258
259 /*
260   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
261   binary-safe comparison for the 'dn' returns -1 if not found
262
263   This is therefore safe when the value is a GUID in the future
264  */
265 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
266                                    const struct dn_list *list,
267                                    const struct ldb_val *v)
268 {
269         unsigned int i;
270         struct ldb_val *exact = NULL, *next = NULL;
271
272         if (ldb_kv->cache->GUID_index_attribute == NULL) {
273                 for (i=0; i<list->count; i++) {
274                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
275                                 return i;
276                         }
277                 }
278                 return -1;
279         }
280
281         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
282                                 *v, ldb_val_equal_exact_ordered,
283                                 exact, next);
284         if (exact == NULL) {
285                 return -1;
286         }
287         /* Not required, but keeps the compiler quiet */
288         if (next != NULL) {
289                 return -1;
290         }
291
292         i = exact - list->dn;
293         return i;
294 }
295
296 /*
297   find a entry in a dn_list. Uses a case sensitive comparison with the dn
298   returns -1 if not found
299  */
300 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
301                                    struct dn_list *list,
302                                    const struct ldb_message *msg)
303 {
304         struct ldb_val v;
305         const struct ldb_val *key_val;
306         if (ldb_kv->cache->GUID_index_attribute == NULL) {
307                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
308                 v.data = discard_const_p(unsigned char, dn_str);
309                 v.length = strlen(dn_str);
310         } else {
311                 key_val = ldb_msg_find_ldb_val(
312                     msg, ldb_kv->cache->GUID_index_attribute);
313                 if (key_val == NULL) {
314                         return -1;
315                 }
316                 v = *key_val;
317         }
318         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
319 }
320
321 /*
322   this is effectively a cast function, but with lots of paranoia
323   checks and also copes with CPUs that are fussy about pointer
324   alignment
325  */
326 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
327                                            TDB_DATA rec,
328                                            bool check_parent)
329 {
330         struct dn_list *list;
331         if (rec.dsize != sizeof(void *)) {
332                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
333                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
334                 return NULL;
335         }
336         /* note that we can't just use a cast here, as rec.dptr may
337            not be aligned sufficiently for a pointer. A cast would cause
338            platforms like some ARM CPUs to crash */
339         memcpy(&list, rec.dptr, sizeof(void *));
340         list = talloc_get_type(list, struct dn_list);
341         if (list == NULL) {
342                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
343                                        "Bad type '%s' for idxptr",
344                                        talloc_get_name(list));
345                 return NULL;
346         }
347         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
348                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
349                                        "Bad parent '%s' for idxptr",
350                                        talloc_get_name(talloc_parent(list->dn)));
351                 return NULL;
352         }
353         return list;
354 }
355
356 /*
357   return the @IDX list in an index entry for a dn as a
358   struct dn_list
359  */
360 static int ldb_kv_dn_list_load(struct ldb_module *module,
361                                struct ldb_kv_private *ldb_kv,
362                                struct ldb_dn *dn,
363                                struct dn_list *list)
364 {
365         struct ldb_message *msg;
366         int ret, version;
367         struct ldb_message_element *el;
368         TDB_DATA rec;
369         struct dn_list *list2;
370         TDB_DATA key;
371
372         list->dn = NULL;
373         list->count = 0;
374
375         /* see if we have any in-memory index entries */
376         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
377                 goto normal_index;
378         }
379
380         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
381         key.dsize = strlen((char *)key.dptr);
382
383         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
384         if (rec.dptr == NULL) {
385                 goto normal_index;
386         }
387
388         /* we've found an in-memory index entry */
389         list2 = ldb_kv_index_idxptr(module, rec, true);
390         if (list2 == NULL) {
391                 free(rec.dptr);
392                 return LDB_ERR_OPERATIONS_ERROR;
393         }
394         free(rec.dptr);
395
396         *list = *list2;
397         return LDB_SUCCESS;
398
399 normal_index:
400         msg = ldb_msg_new(list);
401         if (msg == NULL) {
402                 return LDB_ERR_OPERATIONS_ERROR;
403         }
404
405         ret = ldb_kv_search_dn1(module,
406                                 dn,
407                                 msg,
408                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
409                                     LDB_UNPACK_DATA_FLAG_NO_DN);
410         if (ret != LDB_SUCCESS) {
411                 talloc_free(msg);
412                 return ret;
413         }
414
415         el = ldb_msg_find_element(msg, LDB_KV_IDX);
416         if (!el) {
417                 talloc_free(msg);
418                 return LDB_SUCCESS;
419         }
420
421         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
422
423         /*
424          * we avoid copying the strings by stealing the list.  We have
425          * to steal msg onto el->values (which looks odd) because we
426          * asked for the memory to be allocated on msg, not on each
427          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
428          */
429         if (ldb_kv->cache->GUID_index_attribute == NULL) {
430                 /* check indexing version number */
431                 if (version != LDB_KV_INDEXING_VERSION) {
432                         ldb_debug_set(ldb_module_get_ctx(module),
433                                       LDB_DEBUG_ERROR,
434                                       "Wrong DN index version %d "
435                                       "expected %d for %s",
436                                       version, LDB_KV_INDEXING_VERSION,
437                                       ldb_dn_get_linearized(dn));
438                         talloc_free(msg);
439                         return LDB_ERR_OPERATIONS_ERROR;
440                 }
441
442                 talloc_steal(el->values, msg);
443                 list->dn = talloc_steal(list, el->values);
444                 list->count = el->num_values;
445         } else {
446                 unsigned int i;
447                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
448                         /* This is quite likely during the DB startup
449                            on first upgrade to using a GUID index */
450                         ldb_debug_set(ldb_module_get_ctx(module),
451                                       LDB_DEBUG_ERROR,
452                                       "Wrong GUID index version %d "
453                                       "expected %d for %s",
454                                       version, LDB_KV_GUID_INDEXING_VERSION,
455                                       ldb_dn_get_linearized(dn));
456                         talloc_free(msg);
457                         return LDB_ERR_OPERATIONS_ERROR;
458                 }
459
460                 if (el->num_values == 0) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
466                         talloc_free(msg);
467                         return LDB_ERR_OPERATIONS_ERROR;
468                 }
469
470                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
471                 list->dn = talloc_array(list, struct ldb_val, list->count);
472                 if (list->dn == NULL) {
473                         talloc_free(msg);
474                         return LDB_ERR_OPERATIONS_ERROR;
475                 }
476
477                 /*
478                  * The actual data is on msg, due to
479                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
480                  */
481                 talloc_steal(list->dn, msg);
482                 for (i = 0; i < list->count; i++) {
483                         list->dn[i].data
484                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
485                         list->dn[i].length = LDB_KV_GUID_SIZE;
486                 }
487         }
488
489         /* We don't need msg->elements any more */
490         talloc_free(msg->elements);
491         return LDB_SUCCESS;
492 }
493
494 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
495                            struct ldb_kv_private *ldb_kv,
496                            TALLOC_CTX *mem_ctx,
497                            struct ldb_dn *dn,
498                            struct ldb_val *ldb_key)
499 {
500         struct ldb_context *ldb = ldb_module_get_ctx(module);
501         int ret;
502         int index = 0;
503         enum key_truncation truncation = KEY_NOT_TRUNCATED;
504         struct dn_list *list = talloc(mem_ctx, struct dn_list);
505         if (list == NULL) {
506                 ldb_oom(ldb);
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
511         if (ret != LDB_SUCCESS) {
512                 TALLOC_FREE(list);
513                 return ret;
514         }
515
516         if (list->count == 0) {
517                 TALLOC_FREE(list);
518                 return LDB_ERR_NO_SUCH_OBJECT;
519         }
520
521         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
522                 const char *dn_str = ldb_dn_get_linearized(dn);
523                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
524                                        __location__
525                                        ": Failed to read DN index "
526                                        "against %s for %s: too many "
527                                        "values (%u > 1)",
528                                        ldb_kv->cache->GUID_index_attribute,
529                                        dn_str,
530                                        list->count);
531                 TALLOC_FREE(list);
532                 return LDB_ERR_CONSTRAINT_VIOLATION;
533         }
534
535         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
536                 /*
537                  * DN key has been truncated, need to inspect the actual
538                  * records to locate the actual DN
539                  */
540                 int i;
541                 index = -1;
542                 for (i=0; i < list->count; i++) {
543                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
544                         struct ldb_val key = {
545                                 .data = guid_key,
546                                 .length = sizeof(guid_key)
547                         };
548                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
549                         struct ldb_message *rec = ldb_msg_new(ldb);
550                         if (rec == NULL) {
551                                 TALLOC_FREE(list);
552                                 return LDB_ERR_OPERATIONS_ERROR;
553                         }
554
555                         ret = ldb_kv_idx_to_key(
556                             module, ldb_kv, ldb, &list->dn[i], &key);
557                         if (ret != LDB_SUCCESS) {
558                                 TALLOC_FREE(list);
559                                 TALLOC_FREE(rec);
560                                 return ret;
561                         }
562
563                         ret =
564                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
565                         if (key.data != guid_key) {
566                                 TALLOC_FREE(key.data);
567                         }
568                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
569                                 /*
570                                  * the record has disappeared?
571                                  * yes, this can happen
572                                  */
573                                 TALLOC_FREE(rec);
574                                 continue;
575                         }
576
577                         if (ret != LDB_SUCCESS) {
578                                 /* an internal error */
579                                 TALLOC_FREE(rec);
580                                 TALLOC_FREE(list);
581                                 return LDB_ERR_OPERATIONS_ERROR;
582                         }
583
584                         /*
585                          * We found the actual DN that we wanted from in the
586                          * multiple values that matched the index
587                          * (due to truncation), so return that.
588                          *
589                          */
590                         if (ldb_dn_compare(dn, rec->dn) == 0) {
591                                 index = i;
592                                 TALLOC_FREE(rec);
593                                 break;
594                         }
595                 }
596
597                 /*
598                  * We matched the index but the actual DN we wanted
599                  * was not here.
600                  */
601                 if (index == -1) {
602                         TALLOC_FREE(list);
603                         return LDB_ERR_NO_SUCH_OBJECT;
604                 }
605         }
606
607         /* The ldb_key memory is allocated by the caller */
608         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
609         TALLOC_FREE(list);
610
611         if (ret != LDB_SUCCESS) {
612                 return LDB_ERR_OPERATIONS_ERROR;
613         }
614
615         return LDB_SUCCESS;
616 }
617
618
619
620 /*
621   save a dn_list into a full @IDX style record
622  */
623 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
624                                      struct ldb_kv_private *ldb_kv,
625                                      struct ldb_dn *dn,
626                                      struct dn_list *list)
627 {
628         struct ldb_message *msg;
629         int ret;
630
631         msg = ldb_msg_new(module);
632         if (!msg) {
633                 return ldb_module_oom(module);
634         }
635
636         msg->dn = dn;
637
638         if (list->count == 0) {
639                 ret = ldb_kv_delete_noindex(module, msg);
640                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
641                         ret = LDB_SUCCESS;
642                 }
643                 talloc_free(msg);
644                 return ret;
645         }
646
647         if (ldb_kv->cache->GUID_index_attribute == NULL) {
648                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
649                                       LDB_KV_INDEXING_VERSION);
650                 if (ret != LDB_SUCCESS) {
651                         talloc_free(msg);
652                         return ldb_module_oom(module);
653                 }
654         } else {
655                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
656                                       LDB_KV_GUID_INDEXING_VERSION);
657                 if (ret != LDB_SUCCESS) {
658                         talloc_free(msg);
659                         return ldb_module_oom(module);
660                 }
661         }
662
663         if (list->count > 0) {
664                 struct ldb_message_element *el;
665
666                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
667                 if (ret != LDB_SUCCESS) {
668                         talloc_free(msg);
669                         return ldb_module_oom(module);
670                 }
671
672                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
673                         el->values = list->dn;
674                         el->num_values = list->count;
675                 } else {
676                         struct ldb_val v;
677                         unsigned int i;
678                         el->values = talloc_array(msg,
679                                                   struct ldb_val, 1);
680                         if (el->values == NULL) {
681                                 talloc_free(msg);
682                                 return ldb_module_oom(module);
683                         }
684
685                         v.data = talloc_array_size(el->values,
686                                                    list->count,
687                                                    LDB_KV_GUID_SIZE);
688                         if (v.data == NULL) {
689                                 talloc_free(msg);
690                                 return ldb_module_oom(module);
691                         }
692
693                         v.length = talloc_get_size(v.data);
694
695                         for (i = 0; i < list->count; i++) {
696                                 if (list->dn[i].length !=
697                                     LDB_KV_GUID_SIZE) {
698                                         talloc_free(msg);
699                                         return ldb_module_operr(module);
700                                 }
701                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
702                                        list->dn[i].data,
703                                        LDB_KV_GUID_SIZE);
704                         }
705                         el->values[0] = v;
706                         el->num_values = 1;
707                 }
708         }
709
710         ret = ldb_kv_store(module, msg, TDB_REPLACE);
711         talloc_free(msg);
712         return ret;
713 }
714
715 /*
716   save a dn_list into the database, in either @IDX or internal format
717  */
718 static int ldb_kv_dn_list_store(struct ldb_module *module,
719                                 struct ldb_dn *dn,
720                                 struct dn_list *list)
721 {
722         struct ldb_kv_private *ldb_kv = talloc_get_type(
723             ldb_module_get_private(module), struct ldb_kv_private);
724         TDB_DATA rec, key;
725         int ret;
726         struct dn_list *list2;
727
728         if (ldb_kv->idxptr == NULL) {
729                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
730         }
731
732         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
733         if (key.dptr == NULL) {
734                 return LDB_ERR_OPERATIONS_ERROR;
735         }
736         key.dsize = strlen((char *)key.dptr);
737
738         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
739         if (rec.dptr != NULL) {
740                 list2 = ldb_kv_index_idxptr(module, rec, false);
741                 if (list2 == NULL) {
742                         free(rec.dptr);
743                         return LDB_ERR_OPERATIONS_ERROR;
744                 }
745                 free(rec.dptr);
746                 list2->dn = talloc_steal(list2, list->dn);
747                 list2->count = list->count;
748                 return LDB_SUCCESS;
749         }
750
751         list2 = talloc(ldb_kv->idxptr, struct dn_list);
752         if (list2 == NULL) {
753                 return LDB_ERR_OPERATIONS_ERROR;
754         }
755         list2->dn = talloc_steal(list2, list->dn);
756         list2->count = list->count;
757
758         rec.dptr = (uint8_t *)&list2;
759         rec.dsize = sizeof(void *);
760
761
762         /*
763          * This is not a store into the main DB, but into an in-memory
764          * TDB, so we don't need a guard on ltdb->read_only
765          */
766         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
767         if (ret != 0) {
768                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
769         }
770         return LDB_SUCCESS;
771 }
772
773 /*
774   traverse function for storing the in-memory index entries on disk
775  */
776 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
777                                        TDB_DATA key,
778                                        TDB_DATA data,
779                                        void *state)
780 {
781         struct ldb_module *module = state;
782         struct ldb_kv_private *ldb_kv = talloc_get_type(
783             ldb_module_get_private(module), struct ldb_kv_private);
784         struct ldb_dn *dn;
785         struct ldb_context *ldb = ldb_module_get_ctx(module);
786         struct ldb_val v;
787         struct dn_list *list;
788
789         list = ldb_kv_index_idxptr(module, data, true);
790         if (list == NULL) {
791                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
792                 return -1;
793         }
794
795         v.data = key.dptr;
796         v.length = strnlen((char *)key.dptr, key.dsize);
797
798         dn = ldb_dn_from_ldb_val(module, ldb, &v);
799         if (dn == NULL) {
800                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
801                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
802                 return -1;
803         }
804
805         ldb_kv->idxptr->error =
806             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
807         talloc_free(dn);
808         if (ldb_kv->idxptr->error != 0) {
809                 return -1;
810         }
811         return 0;
812 }
813
814 /* cleanup the idxptr mode when transaction commits */
815 int ldb_kv_index_transaction_commit(struct ldb_module *module)
816 {
817         struct ldb_kv_private *ldb_kv = talloc_get_type(
818             ldb_module_get_private(module), struct ldb_kv_private);
819         int ret;
820
821         struct ldb_context *ldb = ldb_module_get_ctx(module);
822
823         ldb_reset_err_string(ldb);
824
825         if (ldb_kv->idxptr->itdb) {
826                 tdb_traverse(
827                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
828                 tdb_close(ldb_kv->idxptr->itdb);
829         }
830
831         ret = ldb_kv->idxptr->error;
832         if (ret != LDB_SUCCESS) {
833                 if (!ldb_errstring(ldb)) {
834                         ldb_set_errstring(ldb, ldb_strerror(ret));
835                 }
836                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
837         }
838
839         talloc_free(ldb_kv->idxptr);
840         ldb_kv->idxptr = NULL;
841         return ret;
842 }
843
844 /* cleanup the idxptr mode when transaction cancels */
845 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
846 {
847         struct ldb_kv_private *ldb_kv = talloc_get_type(
848             ldb_module_get_private(module), struct ldb_kv_private);
849         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
850                 tdb_close(ldb_kv->idxptr->itdb);
851         }
852         talloc_free(ldb_kv->idxptr);
853         ldb_kv->idxptr = NULL;
854         return LDB_SUCCESS;
855 }
856
857
858 /*
859   return the dn key to be used for an index
860   the caller is responsible for freeing
861 */
862 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
863                                        struct ldb_kv_private *ldb_kv,
864                                        const char *attr,
865                                        const struct ldb_val *value,
866                                        const struct ldb_schema_attribute **ap,
867                                        enum key_truncation *truncation)
868 {
869         struct ldb_dn *ret;
870         struct ldb_val v;
871         const struct ldb_schema_attribute *a = NULL;
872         char *attr_folded = NULL;
873         const char *attr_for_dn = NULL;
874         int r;
875         bool should_b64_encode;
876
877         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
878         size_t key_len = 0;
879         size_t attr_len = 0;
880         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
881         unsigned frmt_len = 0;
882         const size_t additional_key_length = 4;
883         unsigned int num_separators = 3; /* Estimate for overflow check */
884         const size_t min_data = 1;
885         const size_t min_key_length = additional_key_length
886                 + indx_len + num_separators + min_data;
887         struct ldb_val empty;
888
889         /*
890          * Accept a NULL value as a request for a key with no value.  This is
891          * different from passing an empty value, which might be given
892          * significance by some canonicalise functions.
893          */
894         bool empty_val = value == NULL;
895         if (empty_val) {
896                 empty.length = 0;
897                 empty.data = discard_const_p(unsigned char, "");
898                 value = &empty;
899         }
900
901         if (attr[0] == '@') {
902                 attr_for_dn = attr;
903                 v = *value;
904                 if (ap != NULL) {
905                         *ap = NULL;
906                 }
907         } else {
908                 attr_folded = ldb_attr_casefold(ldb, attr);
909                 if (!attr_folded) {
910                         return NULL;
911                 }
912
913                 attr_for_dn = attr_folded;
914
915                 a = ldb_schema_attribute_by_name(ldb, attr);
916                 if (ap) {
917                         *ap = a;
918                 }
919
920                 if (empty_val) {
921                         v = *value;
922                 } else {
923                         ldb_attr_handler_t fn;
924                         if (a->syntax->index_format_fn) {
925                                 fn = a->syntax->index_format_fn;
926                         } else {
927                                 fn = a->syntax->canonicalise_fn;
928                         }
929                         r = fn(ldb, ldb, value, &v);
930                         if (r != LDB_SUCCESS) {
931                                 const char *errstr = ldb_errstring(ldb);
932                                 /* canonicalisation can be refused. For
933                                    example, a attribute that takes wildcards
934                                    will refuse to canonicalise if the value
935                                    contains a wildcard */
936                                 ldb_asprintf_errstring(ldb,
937                                                        "Failed to create "
938                                                        "index key for "
939                                                        "attribute '%s':%s%s%s",
940                                                        attr, ldb_strerror(r),
941                                                        (errstr?":":""),
942                                                        (errstr?errstr:""));
943                                 talloc_free(attr_folded);
944                                 return NULL;
945                         }
946                 }
947         }
948         attr_len = strlen(attr_for_dn);
949
950         /*
951          * Check if there is any hope this will fit into the DB.
952          * Overflow here is not actually critical the code below
953          * checks again to make the printf and the DB does another
954          * check for too long keys
955          */
956         if (max_key_length - attr_len < min_key_length) {
957                 ldb_asprintf_errstring(
958                         ldb,
959                         __location__ ": max_key_length "
960                         "is too small (%u) < (%u)",
961                         max_key_length,
962                         (unsigned)(min_key_length + attr_len));
963                 talloc_free(attr_folded);
964                 return NULL;
965         }
966
967         /*
968          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
969          * "DN=" and a trailing string terminator
970          */
971         max_key_length -= additional_key_length;
972
973         /*
974          * We do not base 64 encode a DN in a key, it has already been
975          * casefold and lineraized, that is good enough.  That already
976          * avoids embedded NUL etc.
977          */
978         if (ldb_kv->cache->GUID_index_attribute != NULL) {
979                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
980                         should_b64_encode = false;
981                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
982                         /*
983                          * We can only change the behaviour for IDXONE
984                          * when the GUID index is enabled
985                          */
986                         should_b64_encode = false;
987                 } else {
988                         should_b64_encode
989                                 = ldb_should_b64_encode(ldb, &v);
990                 }
991         } else {
992                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
993         }
994
995         if (should_b64_encode) {
996                 size_t vstr_len = 0;
997                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
998                 if (!vstr) {
999                         talloc_free(attr_folded);
1000                         return NULL;
1001                 }
1002                 vstr_len = strlen(vstr);
1003                 /*
1004                  * Overflow here is not critical as we only use this
1005                  * to choose the printf truncation
1006                  */
1007                 key_len = num_separators + indx_len + attr_len + vstr_len;
1008                 if (key_len > max_key_length) {
1009                         size_t excess = key_len - max_key_length;
1010                         frmt_len = vstr_len - excess;
1011                         *truncation = KEY_TRUNCATED;
1012                         /*
1013                         * Truncated keys are placed in a separate key space
1014                         * from the non truncated keys
1015                         * Note: the double hash "##" is not a typo and
1016                         * indicates that the following value is base64 encoded
1017                         */
1018                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1019                                              LDB_KV_INDEX, attr_for_dn,
1020                                              frmt_len, vstr);
1021                 } else {
1022                         frmt_len = vstr_len;
1023                         *truncation = KEY_NOT_TRUNCATED;
1024                         /*
1025                          * Note: the double colon "::" is not a typo and
1026                          * indicates that the following value is base64 encoded
1027                          */
1028                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1029                                              LDB_KV_INDEX, attr_for_dn,
1030                                              frmt_len, vstr);
1031                 }
1032                 talloc_free(vstr);
1033         } else {
1034                 /* Only need two seperators */
1035                 num_separators = 2;
1036
1037                 /*
1038                  * Overflow here is not critical as we only use this
1039                  * to choose the printf truncation
1040                  */
1041                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1042                 if (key_len > max_key_length) {
1043                         size_t excess = key_len - max_key_length;
1044                         frmt_len = v.length - excess;
1045                         *truncation = KEY_TRUNCATED;
1046                         /*
1047                          * Truncated keys are placed in a separate key space
1048                          * from the non truncated keys
1049                          */
1050                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1051                                              LDB_KV_INDEX, attr_for_dn,
1052                                              frmt_len, (char *)v.data);
1053                 } else {
1054                         frmt_len = v.length;
1055                         *truncation = KEY_NOT_TRUNCATED;
1056                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1057                                              LDB_KV_INDEX, attr_for_dn,
1058                                              frmt_len, (char *)v.data);
1059                 }
1060         }
1061
1062         if (v.data != value->data && !empty_val) {
1063                 talloc_free(v.data);
1064         }
1065         talloc_free(attr_folded);
1066
1067         return ret;
1068 }
1069
1070 /*
1071   see if a attribute value is in the list of indexed attributes
1072 */
1073 static bool ldb_kv_is_indexed(struct ldb_module *module,
1074                               struct ldb_kv_private *ldb_kv,
1075                               const char *attr)
1076 {
1077         struct ldb_context *ldb = ldb_module_get_ctx(module);
1078         unsigned int i;
1079         struct ldb_message_element *el;
1080
1081         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1082             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1083                 /* Implicity covered, this is the index key */
1084                 return false;
1085         }
1086         if (ldb->schema.index_handler_override) {
1087                 const struct ldb_schema_attribute *a
1088                         = ldb_schema_attribute_by_name(ldb, attr);
1089
1090                 if (a == NULL) {
1091                         return false;
1092                 }
1093
1094                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1095                         return true;
1096                 } else {
1097                         return false;
1098                 }
1099         }
1100
1101         if (!ldb_kv->cache->attribute_indexes) {
1102                 return false;
1103         }
1104
1105         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1106         if (el == NULL) {
1107                 return false;
1108         }
1109
1110         /* TODO: this is too expensive! At least use a binary search */
1111         for (i=0; i<el->num_values; i++) {
1112                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1113                         return true;
1114                 }
1115         }
1116         return false;
1117 }
1118
1119 /*
1120   in the following logic functions, the return value is treated as
1121   follows:
1122
1123      LDB_SUCCESS: we found some matching index values
1124
1125      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1126
1127      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1128                                we'll need a full search
1129  */
1130
1131 /*
1132   return a list of dn's that might match a simple indexed search (an
1133   equality search only)
1134  */
1135 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1136                                   struct ldb_kv_private *ldb_kv,
1137                                   const struct ldb_parse_tree *tree,
1138                                   struct dn_list *list)
1139 {
1140         struct ldb_context *ldb;
1141         struct ldb_dn *dn;
1142         int ret;
1143         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1144
1145         ldb = ldb_module_get_ctx(module);
1146
1147         list->count = 0;
1148         list->dn = NULL;
1149
1150         /* if the attribute isn't in the list of indexed attributes then
1151            this node needs a full search */
1152         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1153                 return LDB_ERR_OPERATIONS_ERROR;
1154         }
1155
1156         /* the attribute is indexed. Pull the list of DNs that match the
1157            search criterion */
1158         dn = ldb_kv_index_key(ldb,
1159                               ldb_kv,
1160                               tree->u.equality.attr,
1161                               &tree->u.equality.value,
1162                               NULL,
1163                               &truncation);
1164         /*
1165          * We ignore truncation here and allow multi-valued matches
1166          * as ltdb_search_indexed will filter out the wrong one in
1167          * ltdb_index_filter() which calls ldb_match_message().
1168          */
1169         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1170
1171         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1172         talloc_free(dn);
1173         return ret;
1174 }
1175
1176 static bool list_union(struct ldb_context *ldb,
1177                        struct ldb_kv_private *ldb_kv,
1178                        struct dn_list *list,
1179                        struct dn_list *list2);
1180
1181 /*
1182   return a list of dn's that might match a leaf indexed search
1183  */
1184 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1185                                 struct ldb_kv_private *ldb_kv,
1186                                 const struct ldb_parse_tree *tree,
1187                                 struct dn_list *list)
1188 {
1189         if (ldb_kv->disallow_dn_filter &&
1190             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1191                 /* in AD mode we do not support "(dn=...)" search filters */
1192                 list->dn = NULL;
1193                 list->count = 0;
1194                 return LDB_SUCCESS;
1195         }
1196         if (tree->u.equality.attr[0] == '@') {
1197                 /* Do not allow a indexed search against an @ */
1198                 list->dn = NULL;
1199                 list->count = 0;
1200                 return LDB_SUCCESS;
1201         }
1202         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1203                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1204                 bool valid_dn = false;
1205                 struct ldb_dn *dn
1206                         = ldb_dn_from_ldb_val(list,
1207                                               ldb_module_get_ctx(module),
1208                                               &tree->u.equality.value);
1209                 if (dn == NULL) {
1210                         /* If we can't parse it, no match */
1211                         list->dn = NULL;
1212                         list->count = 0;
1213                         return LDB_SUCCESS;
1214                 }
1215
1216                 valid_dn = ldb_dn_validate(dn);
1217                 if (valid_dn == false) {
1218                         /* If we can't parse it, no match */
1219                         list->dn = NULL;
1220                         list->count = 0;
1221                         return LDB_SUCCESS;
1222                 }
1223
1224                 /*
1225                  * Re-use the same code we use for a SCOPE_BASE
1226                  * search
1227                  *
1228                  * We can't call TALLOC_FREE(dn) as this must belong
1229                  * to list for the memory to remain valid.
1230                  */
1231                 return ldb_kv_index_dn_base_dn(
1232                     module, ldb_kv, dn, list, &truncation);
1233                 /*
1234                  * We ignore truncation here and allow multi-valued matches
1235                  * as ltdb_search_indexed will filter out the wrong one in
1236                  * ltdb_index_filter() which calls ldb_match_message().
1237                  */
1238
1239         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1240                    (ldb_attr_cmp(tree->u.equality.attr,
1241                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1242                 int ret;
1243                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1244                 list->dn = talloc_array(list, struct ldb_val, 1);
1245                 if (list->dn == NULL) {
1246                         ldb_module_oom(module);
1247                         return LDB_ERR_OPERATIONS_ERROR;
1248                 }
1249                 /*
1250                  * We need to go via the canonicalise_fn() to
1251                  * ensure we get the index in binary, rather
1252                  * than a string
1253                  */
1254                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1255                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1256                 if (ret != LDB_SUCCESS) {
1257                         return LDB_ERR_OPERATIONS_ERROR;
1258                 }
1259                 list->count = 1;
1260                 return LDB_SUCCESS;
1261         }
1262
1263         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1264 }
1265
1266
1267 /*
1268   list intersection
1269   list = list & list2
1270 */
1271 static bool list_intersect(struct ldb_context *ldb,
1272                            struct ldb_kv_private *ldb_kv,
1273                            struct dn_list *list,
1274                            const struct dn_list *list2)
1275 {
1276         const struct dn_list *short_list, *long_list;
1277         struct dn_list *list3;
1278         unsigned int i;
1279
1280         if (list->count == 0) {
1281                 /* 0 & X == 0 */
1282                 return true;
1283         }
1284         if (list2->count == 0) {
1285                 /* X & 0 == 0 */
1286                 list->count = 0;
1287                 list->dn = NULL;
1288                 return true;
1289         }
1290
1291         /*
1292          * In both of the below we check for strict and in that
1293          * case do not optimise the intersection of this list,
1294          * we must never return an entry not in this
1295          * list.  This allows the index for
1296          * SCOPE_ONELEVEL to be trusted.
1297          */
1298
1299         /* the indexing code is allowed to return a longer list than
1300            what really matches, as all results are filtered by the
1301            full expression at the end - this shortcut avoids a lot of
1302            work in some cases */
1303         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1304                 return true;
1305         }
1306         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1307                 list->count = list2->count;
1308                 list->dn = list2->dn;
1309                 /* note that list2 may not be the parent of list2->dn,
1310                    as list2->dn may be owned by ltdb->idxptr. In that
1311                    case we expect this reparent call to fail, which is
1312                    OK */
1313                 talloc_reparent(list2, list, list2->dn);
1314                 return true;
1315         }
1316
1317         if (list->count > list2->count) {
1318                 short_list = list2;
1319                 long_list = list;
1320         } else {
1321                 short_list = list;
1322                 long_list = list2;
1323         }
1324
1325         list3 = talloc_zero(list, struct dn_list);
1326         if (list3 == NULL) {
1327                 return false;
1328         }
1329
1330         list3->dn = talloc_array(list3, struct ldb_val,
1331                                  MIN(list->count, list2->count));
1332         if (!list3->dn) {
1333                 talloc_free(list3);
1334                 return false;
1335         }
1336         list3->count = 0;
1337
1338         for (i=0;i<short_list->count;i++) {
1339                 /* For the GUID index case, this is a binary search */
1340                 if (ldb_kv_dn_list_find_val(
1341                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1342                         list3->dn[list3->count] = short_list->dn[i];
1343                         list3->count++;
1344                 }
1345         }
1346
1347         list->strict |= list2->strict;
1348         list->dn = talloc_steal(list, list3->dn);
1349         list->count = list3->count;
1350         talloc_free(list3);
1351
1352         return true;
1353 }
1354
1355
1356 /*
1357   list union
1358   list = list | list2
1359 */
1360 static bool list_union(struct ldb_context *ldb,
1361                        struct ldb_kv_private *ldb_kv,
1362                        struct dn_list *list,
1363                        struct dn_list *list2)
1364 {
1365         struct ldb_val *dn3;
1366         unsigned int i = 0, j = 0, k = 0;
1367
1368         if (list2->count == 0) {
1369                 /* X | 0 == X */
1370                 return true;
1371         }
1372
1373         if (list->count == 0) {
1374                 /* 0 | X == X */
1375                 list->count = list2->count;
1376                 list->dn = list2->dn;
1377                 /* note that list2 may not be the parent of list2->dn,
1378                    as list2->dn may be owned by ltdb->idxptr. In that
1379                    case we expect this reparent call to fail, which is
1380                    OK */
1381                 talloc_reparent(list2, list, list2->dn);
1382                 return true;
1383         }
1384
1385         /*
1386          * Sort the lists (if not in GUID DN mode) so we can do
1387          * the de-duplication during the merge
1388          *
1389          * NOTE: This can sort the in-memory index values, as list or
1390          * list2 might not be a copy!
1391          */
1392         ldb_kv_dn_list_sort(ldb_kv, list);
1393         ldb_kv_dn_list_sort(ldb_kv, list2);
1394
1395         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1396         if (!dn3) {
1397                 ldb_oom(ldb);
1398                 return false;
1399         }
1400
1401         while (i < list->count || j < list2->count) {
1402                 int cmp;
1403                 if (i >= list->count) {
1404                         cmp = 1;
1405                 } else if (j >= list2->count) {
1406                         cmp = -1;
1407                 } else {
1408                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1409                                                           &list2->dn[j]);
1410                 }
1411
1412                 if (cmp < 0) {
1413                         /* Take list */
1414                         dn3[k] = list->dn[i];
1415                         i++;
1416                         k++;
1417                 } else if (cmp > 0) {
1418                         /* Take list2 */
1419                         dn3[k] = list2->dn[j];
1420                         j++;
1421                         k++;
1422                 } else {
1423                         /* Equal, take list */
1424                         dn3[k] = list->dn[i];
1425                         i++;
1426                         j++;
1427                         k++;
1428                 }
1429         }
1430
1431         list->dn = dn3;
1432         list->count = k;
1433
1434         return true;
1435 }
1436
1437 static int ldb_kv_index_dn(struct ldb_module *module,
1438                            struct ldb_kv_private *ldb_kv,
1439                            const struct ldb_parse_tree *tree,
1440                            struct dn_list *list);
1441
1442 /*
1443   process an OR list (a union)
1444  */
1445 static int ldb_kv_index_dn_or(struct ldb_module *module,
1446                               struct ldb_kv_private *ldb_kv,
1447                               const struct ldb_parse_tree *tree,
1448                               struct dn_list *list)
1449 {
1450         struct ldb_context *ldb;
1451         unsigned int i;
1452
1453         ldb = ldb_module_get_ctx(module);
1454
1455         list->dn = NULL;
1456         list->count = 0;
1457
1458         for (i=0; i<tree->u.list.num_elements; i++) {
1459                 struct dn_list *list2;
1460                 int ret;
1461
1462                 list2 = talloc_zero(list, struct dn_list);
1463                 if (list2 == NULL) {
1464                         return LDB_ERR_OPERATIONS_ERROR;
1465                 }
1466
1467                 ret = ldb_kv_index_dn(
1468                     module, ldb_kv, tree->u.list.elements[i], list2);
1469
1470                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1471                         /* X || 0 == X */
1472                         talloc_free(list2);
1473                         continue;
1474                 }
1475
1476                 if (ret != LDB_SUCCESS) {
1477                         /* X || * == * */
1478                         talloc_free(list2);
1479                         return ret;
1480                 }
1481
1482                 if (!list_union(ldb, ldb_kv, list, list2)) {
1483                         talloc_free(list2);
1484                         return LDB_ERR_OPERATIONS_ERROR;
1485                 }
1486         }
1487
1488         if (list->count == 0) {
1489                 return LDB_ERR_NO_SUCH_OBJECT;
1490         }
1491
1492         return LDB_SUCCESS;
1493 }
1494
1495
1496 /*
1497   NOT an index results
1498  */
1499 static int ldb_kv_index_dn_not(struct ldb_module *module,
1500                                struct ldb_kv_private *ldb_kv,
1501                                const struct ldb_parse_tree *tree,
1502                                struct dn_list *list)
1503 {
1504         /* the only way to do an indexed not would be if we could
1505            negate the not via another not or if we knew the total
1506            number of database elements so we could know that the
1507            existing expression covered the whole database.
1508
1509            instead, we just give up, and rely on a full index scan
1510            (unless an outer & manages to reduce the list)
1511         */
1512         return LDB_ERR_OPERATIONS_ERROR;
1513 }
1514
1515 /*
1516  * These things are unique, so avoid a full scan if this is a search
1517  * by GUID, DN or a unique attribute
1518  */
1519 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1520                                 struct ldb_kv_private *ldb_kv,
1521                                 const char *attr)
1522 {
1523         const struct ldb_schema_attribute *a;
1524         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1525                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1526                     0) {
1527                         return true;
1528                 }
1529         }
1530         if (ldb_attr_dn(attr) == 0) {
1531                 return true;
1532         }
1533
1534         a = ldb_schema_attribute_by_name(ldb, attr);
1535         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1536                 return true;
1537         }
1538         return false;
1539 }
1540
1541 /*
1542   process an AND expression (intersection)
1543  */
1544 static int ldb_kv_index_dn_and(struct ldb_module *module,
1545                                struct ldb_kv_private *ldb_kv,
1546                                const struct ldb_parse_tree *tree,
1547                                struct dn_list *list)
1548 {
1549         struct ldb_context *ldb;
1550         unsigned int i;
1551         bool found;
1552
1553         ldb = ldb_module_get_ctx(module);
1554
1555         list->dn = NULL;
1556         list->count = 0;
1557
1558         /* in the first pass we only look for unique simple
1559            equality tests, in the hope of avoiding having to look
1560            at any others */
1561         for (i=0; i<tree->u.list.num_elements; i++) {
1562                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1563                 int ret;
1564
1565                 if (subtree->operation != LDB_OP_EQUALITY ||
1566                     !ldb_kv_index_unique(
1567                         ldb, ldb_kv, subtree->u.equality.attr)) {
1568                         continue;
1569                 }
1570
1571                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1572                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1573                         /* 0 && X == 0 */
1574                         return LDB_ERR_NO_SUCH_OBJECT;
1575                 }
1576                 if (ret == LDB_SUCCESS) {
1577                         /* a unique index match means we can
1578                          * stop. Note that we don't care if we return
1579                          * a few too many objects, due to later
1580                          * filtering */
1581                         return LDB_SUCCESS;
1582                 }
1583         }
1584
1585         /* now do a full intersection */
1586         found = false;
1587
1588         for (i=0; i<tree->u.list.num_elements; i++) {
1589                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1590                 struct dn_list *list2;
1591                 int ret;
1592
1593                 list2 = talloc_zero(list, struct dn_list);
1594                 if (list2 == NULL) {
1595                         return ldb_module_oom(module);
1596                 }
1597
1598                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1599
1600                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1601                         /* X && 0 == 0 */
1602                         list->dn = NULL;
1603                         list->count = 0;
1604                         talloc_free(list2);
1605                         return LDB_ERR_NO_SUCH_OBJECT;
1606                 }
1607
1608                 if (ret != LDB_SUCCESS) {
1609                         /* this didn't adding anything */
1610                         talloc_free(list2);
1611                         continue;
1612                 }
1613
1614                 if (!found) {
1615                         talloc_reparent(list2, list, list->dn);
1616                         list->dn = list2->dn;
1617                         list->count = list2->count;
1618                         found = true;
1619                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1620                         talloc_free(list2);
1621                         return LDB_ERR_OPERATIONS_ERROR;
1622                 }
1623
1624                 if (list->count == 0) {
1625                         list->dn = NULL;
1626                         return LDB_ERR_NO_SUCH_OBJECT;
1627                 }
1628
1629                 if (list->count < 2) {
1630                         /* it isn't worth loading the next part of the tree */
1631                         return LDB_SUCCESS;
1632                 }
1633         }
1634
1635         if (!found) {
1636                 /* none of the attributes were indexed */
1637                 return LDB_ERR_OPERATIONS_ERROR;
1638         }
1639
1640         return LDB_SUCCESS;
1641 }
1642
1643 struct ldb_kv_ordered_index_context {
1644         struct ldb_module *module;
1645         int error;
1646         struct dn_list *dn_list;
1647 };
1648
1649 static int traverse_range_index(struct ldb_kv_private *ldb_kv,
1650                                 struct ldb_val key,
1651                                 struct ldb_val data,
1652                                 void *state)
1653 {
1654
1655         struct ldb_context *ldb;
1656         struct ldb_kv_ordered_index_context *ctx =
1657             (struct ldb_kv_ordered_index_context *)state;
1658         struct ldb_module *module = ctx->module;
1659         struct ldb_message_element *el = NULL;
1660         struct ldb_message *msg = NULL;
1661         int version;
1662         size_t dn_array_size, additional_length;
1663         unsigned int i;
1664
1665         ldb = ldb_module_get_ctx(module);
1666
1667         msg = ldb_msg_new(module);
1668
1669         ctx->error = ldb_unpack_data_only_attr_list_flags(ldb, &data,
1670                                                           msg,
1671                                                           NULL, 0,
1672                                                           LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1673                                                           LDB_UNPACK_DATA_FLAG_NO_DN,
1674                                                           NULL);
1675
1676         if (ctx->error != LDB_SUCCESS) {
1677                 talloc_free(msg);
1678                 return ctx->error;
1679         }
1680
1681         el = ldb_msg_find_element(msg, LDB_KV_IDX);
1682         if (!el) {
1683                 talloc_free(msg);
1684                 return LDB_SUCCESS;
1685         }
1686
1687         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1688
1689         /*
1690          * we avoid copying the strings by stealing the list.  We have
1691          * to steal msg onto el->values (which looks odd) because we
1692          * asked for the memory to be allocated on msg, not on each
1693          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
1694          */
1695         if (version != LDB_KV_GUID_INDEXING_VERSION) {
1696                 /* This is quite likely during the DB startup
1697                    on first upgrade to using a GUID index */
1698                 ldb_debug_set(ldb_module_get_ctx(module),
1699                               LDB_DEBUG_ERROR, __location__
1700                               ": Wrong GUID index version %d expected %d",
1701                               version, LDB_KV_GUID_INDEXING_VERSION);
1702                 talloc_free(msg);
1703                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1704                 return ctx->error;
1705         }
1706
1707         if (el->num_values == 0) {
1708                 talloc_free(msg);
1709                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1710                 return ctx->error;
1711         }
1712
1713         if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1714             || el->values[0].length == 0) {
1715                 talloc_free(msg);
1716                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1717                 return ctx->error;
1718         }
1719
1720         dn_array_size = talloc_array_length(ctx->dn_list->dn);
1721
1722         additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1723
1724         if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1725                 talloc_free(msg);
1726                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1727                 return ctx->error;
1728         }
1729
1730         if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1731                 size_t new_array_length;
1732
1733                 if (dn_array_size * 2 < dn_array_size) {
1734                         talloc_free(msg);
1735                         ctx->error = LDB_ERR_OPERATIONS_ERROR;
1736                         return ctx->error;
1737                 }
1738
1739                 new_array_length = MAX(ctx->dn_list->count + additional_length,
1740                                        dn_array_size * 2);
1741
1742                 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1743                                                   ctx->dn_list->dn,
1744                                                   struct ldb_val,
1745                                                   new_array_length);
1746         }
1747
1748         if (ctx->dn_list->dn == NULL) {
1749                 talloc_free(msg);
1750                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1751                 return ctx->error;
1752         }
1753
1754         /*
1755          * The actual data is on msg, due to
1756          * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
1757          */
1758         talloc_steal(ctx->dn_list->dn, msg);
1759         for (i = 0; i < additional_length; i++) {
1760                 ctx->dn_list->dn[i + ctx->dn_list->count].data
1761                         = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1762                 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1763
1764         }
1765
1766         ctx->dn_list->count += additional_length;
1767
1768         talloc_free(msg->elements);
1769
1770         return LDB_SUCCESS;
1771 }
1772
1773 /*
1774  * >= and <= indexing implemented using lexicographically sorted keys
1775  *
1776  * We only run this in GUID indexing mode and when there is no write
1777  * transaction (only implicit read locks are being held). Otherwise, we would
1778  * have to deal with the in-memory index cache.
1779  *
1780  * We rely on the implementation of index_format_fn on a schema syntax which
1781  * will can help us to construct keys which can be ordered correctly, and we
1782  * terminate using schema agnostic start and end keys.
1783  *
1784  * index_format_fn must output values which can be memcmp-able to produce the
1785  * correct ordering as defined by the schema syntax class.
1786  */
1787 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1788                                    struct ldb_kv_private *ldb_kv,
1789                                    const struct ldb_parse_tree *tree,
1790                                    struct dn_list *list, bool ascending)
1791 {
1792         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1793         struct ldb_context *ldb = ldb_module_get_ctx(module);
1794
1795         struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1796         struct ldb_val start_key, end_key;
1797         struct ldb_dn *key_dn = NULL;
1798         const struct ldb_schema_attribute *a = NULL;
1799
1800         struct ldb_kv_ordered_index_context ctx;
1801         int ret;
1802
1803         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1804
1805         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1806                 return LDB_ERR_OPERATIONS_ERROR;
1807         }
1808
1809         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1810                 return LDB_ERR_OPERATIONS_ERROR;
1811         }
1812
1813         /* bail out if we're in a transaction, full search instead. */
1814         if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1815                 return LDB_ERR_OPERATIONS_ERROR;
1816         }
1817
1818         if (ldb_kv->disallow_dn_filter &&
1819             (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1820                 /* in AD mode we do not support "(dn=...)" search filters */
1821                 list->dn = NULL;
1822                 list->count = 0;
1823                 return LDB_SUCCESS;
1824         }
1825         if (tree->u.comparison.attr[0] == '@') {
1826                 /* Do not allow a indexed search against an @ */
1827                 list->dn = NULL;
1828                 list->count = 0;
1829                 return LDB_SUCCESS;
1830         }
1831
1832         a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1833
1834         /*
1835          * If there's no index format function defined for this attr, then
1836          * the lexicographic order in the database doesn't correspond to the
1837          * attr's ordering, so we can't use the iterate_range op.
1838          */
1839         if (a->syntax->index_format_fn == NULL) {
1840                 return LDB_ERR_OPERATIONS_ERROR;
1841         }
1842
1843         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1844                                   &tree->u.comparison.value,
1845                                   NULL, &truncation);
1846         if (!key_dn) {
1847                 return LDB_ERR_OPERATIONS_ERROR;
1848         } else if (truncation == KEY_TRUNCATED) {
1849                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1850                           __location__
1851                           ": ordered index violation: key dn truncated: %s\n",
1852                           ldb_dn_get_linearized(key_dn));
1853                 return LDB_ERR_OPERATIONS_ERROR;
1854         }
1855         ldb_key = ldb_kv_key_dn(module, tmp_ctx, key_dn);
1856         talloc_free(key_dn);
1857         if (ldb_key.data == NULL) {
1858                 return LDB_ERR_OPERATIONS_ERROR;
1859         }
1860
1861         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1862                                   NULL, NULL, &truncation);
1863         if (!key_dn) {
1864                 return LDB_ERR_OPERATIONS_ERROR;
1865         } else if (truncation == KEY_TRUNCATED) {
1866                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1867                           __location__
1868                           ": ordered index violation: key dn truncated: %s\n",
1869                           ldb_dn_get_linearized(key_dn));
1870                 return LDB_ERR_OPERATIONS_ERROR;
1871         }
1872
1873         ldb_key2 = ldb_kv_key_dn(module, tmp_ctx, key_dn);
1874         talloc_free(key_dn);
1875         if (ldb_key2.data == NULL) {
1876                 return LDB_ERR_OPERATIONS_ERROR;
1877         }
1878
1879         /*
1880          * In order to avoid defining a start and end key for the search, we
1881          * notice that each index key is of the form:
1882          *
1883          *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1884          *
1885          * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1886          * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1887          * particular attribute.
1888          *
1889          * Our LMDB backend uses the default memcmp for key comparison.
1890          */
1891
1892         /* Eliminate NUL byte at the end of the empty key */
1893         ldb_key2.length--;
1894
1895         if (ascending) {
1896                 /* : becomes ; for pseudo end-key */
1897                 ldb_key2.data[ldb_key2.length-1]++;
1898                 start_key = ldb_key;
1899                 end_key = ldb_key2;
1900         } else {
1901                 start_key = ldb_key2;
1902                 end_key = ldb_key;
1903         }
1904
1905         ctx.module = module;
1906         ctx.error = 0;
1907         ctx.dn_list = list;
1908         ctx.dn_list->count = 0;
1909         ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
1910
1911         ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
1912                                             traverse_range_index, &ctx);
1913
1914         if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
1915                 return LDB_ERR_OPERATIONS_ERROR;
1916         }
1917
1918         TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
1919                        ldb_val_equal_exact_for_qsort);
1920
1921         talloc_free(tmp_ctx);
1922
1923         return LDB_SUCCESS;
1924 }
1925
1926 static int ldb_kv_index_dn_greater(struct ldb_module *module,
1927                                    struct ldb_kv_private *ldb_kv,
1928                                    const struct ldb_parse_tree *tree,
1929                                    struct dn_list *list)
1930 {
1931         return ldb_kv_index_dn_ordered(module,
1932                                        ldb_kv,
1933                                        tree,
1934                                        list, true);
1935 }
1936
1937 static int ldb_kv_index_dn_less(struct ldb_module *module,
1938                                    struct ldb_kv_private *ldb_kv,
1939                                    const struct ldb_parse_tree *tree,
1940                                    struct dn_list *list)
1941 {
1942         return ldb_kv_index_dn_ordered(module,
1943                                        ldb_kv,
1944                                        tree,
1945                                        list, false);
1946 }
1947
1948 /*
1949   return a list of matching objects using a one-level index
1950  */
1951 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1952                                 struct ldb_kv_private *ldb_kv,
1953                                 const char *attr,
1954                                 struct ldb_dn *dn,
1955                                 struct dn_list *list,
1956                                 enum key_truncation *truncation)
1957 {
1958         struct ldb_context *ldb;
1959         struct ldb_dn *key;
1960         struct ldb_val val;
1961         int ret;
1962
1963         ldb = ldb_module_get_ctx(module);
1964
1965         /* work out the index key from the parent DN */
1966         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1967         if (val.data == NULL) {
1968                 const char *dn_str = ldb_dn_get_linearized(dn);
1969                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1970                                        __location__
1971                                        ": Failed to get casefold DN "
1972                                        "from: %s",
1973                                        dn_str);
1974                 return LDB_ERR_OPERATIONS_ERROR;
1975         }
1976         val.length = strlen((char *)val.data);
1977         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1978         if (!key) {
1979                 ldb_oom(ldb);
1980                 return LDB_ERR_OPERATIONS_ERROR;
1981         }
1982
1983         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1984         talloc_free(key);
1985         if (ret != LDB_SUCCESS) {
1986                 return ret;
1987         }
1988
1989         if (list->count == 0) {
1990                 return LDB_ERR_NO_SUCH_OBJECT;
1991         }
1992
1993         return LDB_SUCCESS;
1994 }
1995
1996 /*
1997   return a list of matching objects using a one-level index
1998  */
1999 static int ldb_kv_index_dn_one(struct ldb_module *module,
2000                                struct ldb_kv_private *ldb_kv,
2001                                struct ldb_dn *parent_dn,
2002                                struct dn_list *list,
2003                                enum key_truncation *truncation)
2004 {
2005         /*
2006          * Ensure we do not shortcut on intersection for this list.
2007          * We must never be lazy and return an entry not in this
2008          * list.  This allows the index for
2009          * SCOPE_ONELEVEL to be trusted.
2010          */
2011
2012         list->strict = true;
2013         return ldb_kv_index_dn_attr(
2014             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2015 }
2016
2017 /*
2018   return a list of matching objects using the DN index
2019  */
2020 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2021                                    struct ldb_kv_private *ldb_kv,
2022                                    struct ldb_dn *base_dn,
2023                                    struct dn_list *dn_list,
2024                                    enum key_truncation *truncation)
2025 {
2026         const struct ldb_val *guid_val = NULL;
2027         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2028                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2029                 if (dn_list->dn == NULL) {
2030                         return ldb_module_oom(module);
2031                 }
2032                 dn_list->dn[0].data = discard_const_p(unsigned char,
2033                                                       ldb_dn_get_linearized(base_dn));
2034                 if (dn_list->dn[0].data == NULL) {
2035                         talloc_free(dn_list->dn);
2036                         return ldb_module_oom(module);
2037                 }
2038                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2039                 dn_list->count = 1;
2040
2041                 return LDB_SUCCESS;
2042         }
2043
2044         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2045                 guid_val = ldb_dn_get_extended_component(
2046                     base_dn, ldb_kv->cache->GUID_index_dn_component);
2047         }
2048
2049         if (guid_val != NULL) {
2050                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2051                 if (dn_list->dn == NULL) {
2052                         return ldb_module_oom(module);
2053                 }
2054                 dn_list->dn[0].data = guid_val->data;
2055                 dn_list->dn[0].length = guid_val->length;
2056                 dn_list->count = 1;
2057
2058                 return LDB_SUCCESS;
2059         }
2060
2061         return ldb_kv_index_dn_attr(
2062             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2063 }
2064
2065 /*
2066   return a list of dn's that might match a indexed search or
2067   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2068  */
2069 static int ldb_kv_index_dn(struct ldb_module *module,
2070                            struct ldb_kv_private *ldb_kv,
2071                            const struct ldb_parse_tree *tree,
2072                            struct dn_list *list)
2073 {
2074         int ret = LDB_ERR_OPERATIONS_ERROR;
2075
2076         switch (tree->operation) {
2077         case LDB_OP_AND:
2078                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2079                 break;
2080
2081         case LDB_OP_OR:
2082                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2083                 break;
2084
2085         case LDB_OP_NOT:
2086                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2087                 break;
2088
2089         case LDB_OP_EQUALITY:
2090                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2091                 break;
2092
2093         case LDB_OP_GREATER:
2094                 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2095                 break;
2096
2097         case LDB_OP_LESS:
2098                 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2099                 break;
2100
2101         case LDB_OP_SUBSTRING:
2102         case LDB_OP_PRESENT:
2103         case LDB_OP_APPROX:
2104         case LDB_OP_EXTENDED:
2105                 /* we can't index with fancy bitops yet */
2106                 ret = LDB_ERR_OPERATIONS_ERROR;
2107                 break;
2108         }
2109
2110         return ret;
2111 }
2112
2113 /*
2114   filter a candidate dn_list from an indexed search into a set of results
2115   extracting just the given attributes
2116 */
2117 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2118                                const struct dn_list *dn_list,
2119                                struct ldb_kv_context *ac,
2120                                uint32_t *match_count,
2121                                enum key_truncation scope_one_truncation)
2122 {
2123         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2124         struct ldb_message *msg;
2125         struct ldb_message *filtered_msg;
2126         unsigned int i;
2127         unsigned int num_keys = 0;
2128         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2129         struct ldb_val *keys = NULL;
2130
2131         /*
2132          * We have to allocate the key list (rather than just walk the
2133          * caller supplied list) as the callback could change the list
2134          * (by modifying an indexed attribute hosted in the in-memory
2135          * index cache!)
2136          */
2137         keys = talloc_array(ac, struct ldb_val, dn_list->count);
2138         if (keys == NULL) {
2139                 return ldb_module_oom(ac->module);
2140         }
2141
2142         if (ldb_kv->cache->GUID_index_attribute != NULL) {
2143                 /*
2144                  * We speculate that the keys will be GUID based and so
2145                  * pre-fill in enough space for a GUID (avoiding a pile of
2146                  * small allocations)
2147                  */
2148                 struct guid_tdb_key {
2149                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2150                 } *key_values = NULL;
2151
2152                 key_values = talloc_array(keys,
2153                                           struct guid_tdb_key,
2154                                           dn_list->count);
2155
2156                 if (key_values == NULL) {
2157                         talloc_free(keys);
2158                         return ldb_module_oom(ac->module);
2159                 }
2160                 for (i = 0; i < dn_list->count; i++) {
2161                         keys[i].data = key_values[i].guid_key;
2162                         keys[i].length = sizeof(key_values[i].guid_key);
2163                 }
2164         } else {
2165                 for (i = 0; i < dn_list->count; i++) {
2166                         keys[i].data = NULL;
2167                         keys[i].length = 0;
2168                 }
2169         }
2170
2171         for (i = 0; i < dn_list->count; i++) {
2172                 int ret;
2173
2174                 ret = ldb_kv_idx_to_key(
2175                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2176                 if (ret != LDB_SUCCESS) {
2177                         talloc_free(keys);
2178                         return ret;
2179                 }
2180
2181                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2182                         /*
2183                          * If we are in GUID index mode, then the dn_list is
2184                          * sorted.  If we got a duplicate, forget about it, as
2185                          * otherwise we would send the same entry back more
2186                          * than once.
2187                          *
2188                          * This is needed in the truncated DN case, or if a
2189                          * duplicate was forced in via
2190                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2191                          */
2192
2193                         if (memcmp(previous_guid_key,
2194                                    keys[num_keys].data,
2195                                    sizeof(previous_guid_key)) == 0) {
2196                                 continue;
2197                         }
2198
2199                         memcpy(previous_guid_key,
2200                                keys[num_keys].data,
2201                                sizeof(previous_guid_key));
2202                 }
2203                 num_keys++;
2204         }
2205
2206
2207         /*
2208          * Now that the list is a safe copy, send the callbacks
2209          */
2210         for (i = 0; i < num_keys; i++) {
2211                 int ret;
2212                 bool matched;
2213                 msg = ldb_msg_new(ac);
2214                 if (!msg) {
2215                         talloc_free(keys);
2216                         return LDB_ERR_OPERATIONS_ERROR;
2217                 }
2218
2219                 ret =
2220                     ldb_kv_search_key(ac->module,
2221                                       ldb_kv,
2222                                       keys[i],
2223                                       msg,
2224                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
2225                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
2226                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2227                         /*
2228                          * the record has disappeared? yes, this can
2229                          * happen if the entry is deleted by something
2230                          * operating in the callback (not another
2231                          * process, as we have a read lock)
2232                          */
2233                         talloc_free(msg);
2234                         continue;
2235                 }
2236
2237                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2238                         /* an internal error */
2239                         talloc_free(keys);
2240                         talloc_free(msg);
2241                         return LDB_ERR_OPERATIONS_ERROR;
2242                 }
2243
2244                 /*
2245                  * We trust the index for LDB_SCOPE_ONELEVEL
2246                  * unless the index key has been truncated.
2247                  *
2248                  * LDB_SCOPE_BASE is not passed in by our only caller.
2249                  */
2250                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2251                     ldb_kv->cache->one_level_indexes &&
2252                     scope_one_truncation == KEY_NOT_TRUNCATED) {
2253                         ret = ldb_match_message(ldb, msg, ac->tree,
2254                                                 ac->scope, &matched);
2255                 } else {
2256                         ret = ldb_match_msg_error(ldb, msg,
2257                                                   ac->tree, ac->base,
2258                                                   ac->scope, &matched);
2259                 }
2260
2261                 if (ret != LDB_SUCCESS) {
2262                         talloc_free(keys);
2263                         talloc_free(msg);
2264                         return ret;
2265                 }
2266                 if (!matched) {
2267                         talloc_free(msg);
2268                         continue;
2269                 }
2270
2271                 /* filter the attributes that the user wants */
2272                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
2273
2274                 talloc_free(msg);
2275
2276                 if (ret == -1) {
2277                         talloc_free(keys);
2278                         return LDB_ERR_OPERATIONS_ERROR;
2279                 }
2280
2281                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2282                 if (ret != LDB_SUCCESS) {
2283                         /* Regardless of success or failure, the msg
2284                          * is the callbacks responsiblity, and should
2285                          * not be talloc_free()'ed */
2286                         ac->request_terminated = true;
2287                         talloc_free(keys);
2288                         return ret;
2289                 }
2290
2291                 (*match_count)++;
2292         }
2293
2294         TALLOC_FREE(keys);
2295         return LDB_SUCCESS;
2296 }
2297
2298 /*
2299   sort a DN list
2300  */
2301 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2302                                 struct dn_list *list)
2303 {
2304         if (list->count < 2) {
2305                 return;
2306         }
2307
2308         /* We know the list is sorted when using the GUID index */
2309         if (ltdb->cache->GUID_index_attribute != NULL) {
2310                 return;
2311         }
2312
2313         TYPESAFE_QSORT(list->dn, list->count,
2314                        ldb_val_equal_exact_for_qsort);
2315 }
2316
2317 /*
2318   search the database with a LDAP-like expression using indexes
2319   returns -1 if an indexed search is not possible, in which
2320   case the caller should call ltdb_search_full()
2321 */
2322 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2323 {
2324         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2325         struct ldb_kv_private *ldb_kv = talloc_get_type(
2326             ldb_module_get_private(ac->module), struct ldb_kv_private);
2327         struct dn_list *dn_list;
2328         int ret;
2329         enum ldb_scope index_scope;
2330         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2331
2332         /* see if indexing is enabled */
2333         if (!ldb_kv->cache->attribute_indexes &&
2334             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2335                 /* fallback to a full search */
2336                 return LDB_ERR_OPERATIONS_ERROR;
2337         }
2338
2339         dn_list = talloc_zero(ac, struct dn_list);
2340         if (dn_list == NULL) {
2341                 return ldb_module_oom(ac->module);
2342         }
2343
2344         /*
2345          * For the purposes of selecting the switch arm below, if we
2346          * don't have a one-level index then treat it like a subtree
2347          * search
2348          */
2349         if (ac->scope == LDB_SCOPE_ONELEVEL &&
2350             !ldb_kv->cache->one_level_indexes) {
2351                 index_scope = LDB_SCOPE_SUBTREE;
2352         } else {
2353                 index_scope = ac->scope;
2354         }
2355
2356         switch (index_scope) {
2357         case LDB_SCOPE_BASE:
2358                 /*
2359                  * The only caller will have filtered the operation out
2360                  * so we should never get here
2361                  */
2362                 return ldb_operr(ldb);
2363
2364         case LDB_SCOPE_ONELEVEL:
2365
2366                 /*
2367                  * First, load all the one-level child objects (regardless of
2368                  * whether they match the search filter or not). The database
2369                  * maintains a one-level index, so retrieving this is quick.
2370                  */
2371                 ret = ldb_kv_index_dn_one(ac->module,
2372                                           ldb_kv,
2373                                           ac->base,
2374                                           dn_list,
2375                                           &scope_one_truncation);
2376                 if (ret != LDB_SUCCESS) {
2377                         talloc_free(dn_list);
2378                         return ret;
2379                 }
2380
2381                 /*
2382                  * If we have too many children, running ldb_kv_index_filter()
2383                  * over all the child objects can be quite expensive. So next
2384                  * we do a separate indexed query using the search filter.
2385                  *
2386                  * This should be quick, but it may return objects that are not
2387                  * the direct one-level child objects we're interested in.
2388                  *
2389                  * We only do this in the GUID index mode, which is
2390                  * O(n*log(m)) otherwise the intersection below will
2391                  * be too costly at O(n*m).
2392                  *
2393                  * We don't set a heuristic for 'too many' but instead
2394                  * do it always and rely on the index lookup being
2395                  * fast enough in the small case.
2396                  */
2397                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2398                         struct dn_list *indexed_search_result
2399                                 = talloc_zero(ac, struct dn_list);
2400                         if (indexed_search_result == NULL) {
2401                                 talloc_free(dn_list);
2402                                 return ldb_module_oom(ac->module);
2403                         }
2404
2405                         if (!ldb_kv->cache->attribute_indexes) {
2406                                 talloc_free(indexed_search_result);
2407                                 talloc_free(dn_list);
2408                                 return LDB_ERR_OPERATIONS_ERROR;
2409                         }
2410
2411                         /*
2412                          * Try to do an indexed database search
2413                          */
2414                         ret = ldb_kv_index_dn(
2415                             ac->module, ldb_kv, ac->tree,
2416                             indexed_search_result);
2417
2418                         /*
2419                          * We can stop if we're sure the object doesn't exist
2420                          */
2421                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2422                                 talloc_free(indexed_search_result);
2423                                 talloc_free(dn_list);
2424                                 return LDB_ERR_NO_SUCH_OBJECT;
2425                         }
2426
2427                         /*
2428                          * Once we have a successful search result, we
2429                          * intersect it with the one-level children (dn_list).
2430                          * This should give us exactly the result we're after
2431                          * (we still need to run ldb_kv_index_filter() to
2432                          * handle potential index truncation cases).
2433                          *
2434                          * The indexed search may fail because we don't support
2435                          * indexing on that type of search operation, e.g.
2436                          * matching against '*'. In which case we fall through
2437                          * and run ldb_kv_index_filter() over all the one-level
2438                          * children (which is still better than bailing out here
2439                          * and falling back to a full DB scan).
2440                          */
2441                         if (ret == LDB_SUCCESS) {
2442                                 if (!list_intersect(ldb,
2443                                                     ldb_kv,
2444                                                     dn_list,
2445                                                     indexed_search_result)) {
2446                                         talloc_free(indexed_search_result);
2447                                         talloc_free(dn_list);
2448                                         return LDB_ERR_OPERATIONS_ERROR;
2449                                 }
2450                         }
2451                 }
2452                 break;
2453
2454         case LDB_SCOPE_SUBTREE:
2455         case LDB_SCOPE_DEFAULT:
2456                 if (!ldb_kv->cache->attribute_indexes) {
2457                         talloc_free(dn_list);
2458                         return LDB_ERR_OPERATIONS_ERROR;
2459                 }
2460                 /*
2461                  * Here we load the index for the tree.  We have no
2462                  * index for the subtree.
2463                  */
2464                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2465                 if (ret != LDB_SUCCESS) {
2466                         talloc_free(dn_list);
2467                         return ret;
2468                 }
2469                 break;
2470         }
2471
2472         /*
2473          * It is critical that this function do the re-filter even
2474          * on things found by the index as the index can over-match
2475          * in cases of truncation (as well as when it decides it is
2476          * not worth further filtering)
2477          *
2478          * If this changes, then the index code above would need to
2479          * pass up a flag to say if any index was truncated during
2480          * processing as the truncation here refers only to the
2481          * SCOPE_ONELEVEL index.
2482          */
2483         ret = ldb_kv_index_filter(
2484             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2485         talloc_free(dn_list);
2486         return ret;
2487 }
2488
2489 /**
2490  * @brief Add a DN in the index list of a given attribute name/value pair
2491  *
2492  * This function will add the DN in the index list for the index for
2493  * the given attribute name and value.
2494  *
2495  * @param[in]  module       A ldb_module structure
2496  *
2497  * @param[in]  dn           The string representation of the DN as it
2498  *                          will be stored in the index entry
2499  *
2500  * @param[in]  el           A ldb_message_element array, one of the entry
2501  *                          referred by the v_idx is the attribute name and
2502  *                          value pair which will be used to construct the
2503  *                          index name
2504  *
2505  * @param[in]  v_idx        The index of element in the el array to use
2506  *
2507  * @return                  An ldb error code
2508  */
2509 static int ldb_kv_index_add1(struct ldb_module *module,
2510                              struct ldb_kv_private *ldb_kv,
2511                              const struct ldb_message *msg,
2512                              struct ldb_message_element *el,
2513                              int v_idx)
2514 {
2515         struct ldb_context *ldb;
2516         struct ldb_dn *dn_key;
2517         int ret;
2518         const struct ldb_schema_attribute *a;
2519         struct dn_list *list;
2520         unsigned alloc_len;
2521         enum key_truncation truncation = KEY_TRUNCATED;
2522
2523
2524         ldb = ldb_module_get_ctx(module);
2525
2526         list = talloc_zero(module, struct dn_list);
2527         if (list == NULL) {
2528                 return LDB_ERR_OPERATIONS_ERROR;
2529         }
2530
2531         dn_key = ldb_kv_index_key(
2532             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2533         if (!dn_key) {
2534                 talloc_free(list);
2535                 return LDB_ERR_OPERATIONS_ERROR;
2536         }
2537         /*
2538          * Samba only maintains unique indexes on the objectSID and objectGUID
2539          * so if a unique index key exceeds the maximum length there is a
2540          * problem.
2541          */
2542         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2543                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2544                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2545
2546                 ldb_asprintf_errstring(
2547                     ldb,
2548                     __location__ ": unique index key on %s in %s, "
2549                                  "exceeds maximum key length of %u (encoded).",
2550                     el->name,
2551                     ldb_dn_get_linearized(msg->dn),
2552                     ldb_kv->max_key_length);
2553                 talloc_free(list);
2554                 return LDB_ERR_CONSTRAINT_VIOLATION;
2555         }
2556         talloc_steal(list, dn_key);
2557
2558         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2559         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2560                 talloc_free(list);
2561                 return ret;
2562         }
2563
2564         /*
2565          * Check for duplicates in the @IDXDN DN -> GUID record
2566          *
2567          * This is very normal, it just means a duplicate DN creation
2568          * was attempted, so don't set the error string or print scary
2569          * messages.
2570          */
2571         if (list->count > 0 &&
2572             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2573             truncation == KEY_NOT_TRUNCATED) {
2574
2575                 talloc_free(list);
2576                 return LDB_ERR_CONSTRAINT_VIOLATION;
2577
2578         } else if (list->count > 0
2579                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2580
2581                 /*
2582                  * At least one existing entry in the DN->GUID index, which
2583                  * arises when the DN indexes have been truncated
2584                  *
2585                  * So need to pull the DN's to check if it's really a duplicate
2586                  */
2587                 int i;
2588                 for (i=0; i < list->count; i++) {
2589                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2590                         struct ldb_val key = {
2591                                 .data = guid_key,
2592                                 .length = sizeof(guid_key)
2593                         };
2594                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2595                         struct ldb_message *rec = ldb_msg_new(ldb);
2596                         if (rec == NULL) {
2597                                 return LDB_ERR_OPERATIONS_ERROR;
2598                         }
2599
2600                         ret = ldb_kv_idx_to_key(
2601                             module, ldb_kv, ldb, &list->dn[i], &key);
2602                         if (ret != LDB_SUCCESS) {
2603                                 TALLOC_FREE(list);
2604                                 TALLOC_FREE(rec);
2605                                 return ret;
2606                         }
2607
2608                         ret =
2609                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2610                         if (key.data != guid_key) {
2611                                 TALLOC_FREE(key.data);
2612                         }
2613                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2614                                 /*
2615                                  * the record has disappeared?
2616                                  * yes, this can happen
2617                                  */
2618                                 talloc_free(rec);
2619                                 continue;
2620                         }
2621
2622                         if (ret != LDB_SUCCESS) {
2623                                 /* an internal error */
2624                                 TALLOC_FREE(rec);
2625                                 TALLOC_FREE(list);
2626                                 return LDB_ERR_OPERATIONS_ERROR;
2627                         }
2628                         /*
2629                          * The DN we are trying to add to the DB and index
2630                          * is already here, so we must deny the addition
2631                          */
2632                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2633                                 TALLOC_FREE(rec);
2634                                 TALLOC_FREE(list);
2635                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2636                         }
2637                 }
2638         }
2639
2640         /*
2641          * Check for duplicates in unique indexes
2642          *
2643          * We don't need to do a loop test like the @IDXDN case
2644          * above as we have a ban on long unique index values
2645          * at the start of this function.
2646          */
2647         if (list->count > 0 &&
2648             ((a != NULL
2649               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2650                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2651                 /*
2652                  * We do not want to print info about a possibly
2653                  * confidential DN that the conflict was with in the
2654                  * user-visible error string
2655                  */
2656
2657                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2658                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2659                                   __location__
2660                                   ": unique index violation on %s in %s, "
2661                                   "conflicts with %*.*s in %s",
2662                                   el->name, ldb_dn_get_linearized(msg->dn),
2663                                   (int)list->dn[0].length,
2664                                   (int)list->dn[0].length,
2665                                   list->dn[0].data,
2666                                   ldb_dn_get_linearized(dn_key));
2667                 } else {
2668                         /* This can't fail, gives a default at worst */
2669                         const struct ldb_schema_attribute *attr =
2670                             ldb_schema_attribute_by_name(
2671                                 ldb, ldb_kv->cache->GUID_index_attribute);
2672                         struct ldb_val v;
2673                         ret = attr->syntax->ldif_write_fn(ldb, list,
2674                                                           &list->dn[0], &v);
2675                         if (ret == LDB_SUCCESS) {
2676                                 ldb_debug(ldb,
2677                                           LDB_DEBUG_WARNING,
2678                                           __location__
2679                                           ": unique index violation on %s in "
2680                                           "%s, conflicts with %s %*.*s in %s",
2681                                           el->name,
2682                                           ldb_dn_get_linearized(msg->dn),
2683                                           ldb_kv->cache->GUID_index_attribute,
2684                                           (int)v.length,
2685                                           (int)v.length,
2686                                           v.data,
2687                                           ldb_dn_get_linearized(dn_key));
2688                         }
2689                 }
2690                 ldb_asprintf_errstring(ldb,
2691                                        __location__ ": unique index violation "
2692                                        "on %s in %s",
2693                                        el->name,
2694                                        ldb_dn_get_linearized(msg->dn));
2695                 talloc_free(list);
2696                 return LDB_ERR_CONSTRAINT_VIOLATION;
2697         }
2698
2699         /* overallocate the list a bit, to reduce the number of
2700          * realloc trigered copies */
2701         alloc_len = ((list->count+1)+7) & ~7;
2702         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2703         if (list->dn == NULL) {
2704                 talloc_free(list);
2705                 return LDB_ERR_OPERATIONS_ERROR;
2706         }
2707
2708         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2709                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2710                 list->dn[list->count].data
2711                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2712                 if (list->dn[list->count].data == NULL) {
2713                         talloc_free(list);
2714                         return LDB_ERR_OPERATIONS_ERROR;
2715                 }
2716                 list->dn[list->count].length = strlen(dn_str);
2717         } else {
2718                 const struct ldb_val *key_val;
2719                 struct ldb_val *exact = NULL, *next = NULL;
2720                 key_val = ldb_msg_find_ldb_val(
2721                     msg, ldb_kv->cache->GUID_index_attribute);
2722                 if (key_val == NULL) {
2723                         talloc_free(list);
2724                         return ldb_module_operr(module);
2725                 }
2726
2727                 if (key_val->length != LDB_KV_GUID_SIZE) {
2728                         talloc_free(list);
2729                         return ldb_module_operr(module);
2730                 }
2731
2732                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2733                                         *key_val, ldb_val_equal_exact_ordered,
2734                                         exact, next);
2735
2736                 /*
2737                  * Give a warning rather than fail, this could be a
2738                  * duplicate value in the record allowed by a caller
2739                  * forcing in the value with
2740                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2741                  */
2742                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2743                         /* This can't fail, gives a default at worst */
2744                         const struct ldb_schema_attribute *attr =
2745                             ldb_schema_attribute_by_name(
2746                                 ldb, ldb_kv->cache->GUID_index_attribute);
2747                         struct ldb_val v;
2748                         ret = attr->syntax->ldif_write_fn(ldb, list,
2749                                                           exact, &v);
2750                         if (ret == LDB_SUCCESS) {
2751                                 ldb_debug(ldb,
2752                                           LDB_DEBUG_WARNING,
2753                                           __location__
2754                                           ": duplicate attribute value in %s "
2755                                           "for index on %s, "
2756                                           "duplicate of %s %*.*s in %s",
2757                                           ldb_dn_get_linearized(msg->dn),
2758                                           el->name,
2759                                           ldb_kv->cache->GUID_index_attribute,
2760                                           (int)v.length,
2761                                           (int)v.length,
2762                                           v.data,
2763                                           ldb_dn_get_linearized(dn_key));
2764                         }
2765                 }
2766
2767                 if (next == NULL) {
2768                         next = &list->dn[list->count];
2769                 } else {
2770                         memmove(&next[1], next,
2771                                 sizeof(*next) * (list->count - (next - list->dn)));
2772                 }
2773                 *next = ldb_val_dup(list->dn, key_val);
2774                 if (next->data == NULL) {
2775                         talloc_free(list);
2776                         return ldb_module_operr(module);
2777                 }
2778         }
2779         list->count++;
2780
2781         ret = ldb_kv_dn_list_store(module, dn_key, list);
2782
2783         talloc_free(list);
2784
2785         return ret;
2786 }
2787
2788 /*
2789   add index entries for one elements in a message
2790  */
2791 static int ldb_kv_index_add_el(struct ldb_module *module,
2792                                struct ldb_kv_private *ldb_kv,
2793                                const struct ldb_message *msg,
2794                                struct ldb_message_element *el)
2795 {
2796         unsigned int i;
2797         for (i = 0; i < el->num_values; i++) {
2798                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2799                 if (ret != LDB_SUCCESS) {
2800                         return ret;
2801                 }
2802         }
2803
2804         return LDB_SUCCESS;
2805 }
2806
2807 /*
2808   add index entries for all elements in a message
2809  */
2810 static int ldb_kv_index_add_all(struct ldb_module *module,
2811                                 struct ldb_kv_private *ldb_kv,
2812                                 const struct ldb_message *msg)
2813 {
2814         struct ldb_message_element *elements = msg->elements;
2815         unsigned int i;
2816         const char *dn_str;
2817         int ret;
2818
2819         if (ldb_dn_is_special(msg->dn)) {
2820                 return LDB_SUCCESS;
2821         }
2822
2823         dn_str = ldb_dn_get_linearized(msg->dn);
2824         if (dn_str == NULL) {
2825                 return LDB_ERR_OPERATIONS_ERROR;
2826         }
2827
2828         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2829         if (ret != LDB_SUCCESS) {
2830                 return ret;
2831         }
2832
2833         if (!ldb_kv->cache->attribute_indexes) {
2834                 /* no indexed fields */
2835                 return LDB_SUCCESS;
2836         }
2837
2838         for (i = 0; i < msg->num_elements; i++) {
2839                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2840                         continue;
2841                 }
2842                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2843                 if (ret != LDB_SUCCESS) {
2844                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2845                         ldb_asprintf_errstring(ldb,
2846                                                __location__ ": Failed to re-index %s in %s - %s",
2847                                                elements[i].name, dn_str,
2848                                                ldb_errstring(ldb));
2849                         return ret;
2850                 }
2851         }
2852
2853         return LDB_SUCCESS;
2854 }
2855
2856
2857 /*
2858   insert a DN index for a message
2859 */
2860 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2861                                   struct ldb_kv_private *ldb_kv,
2862                                   const struct ldb_message *msg,
2863                                   struct ldb_dn *dn,
2864                                   const char *index,
2865                                   int add)
2866 {
2867         struct ldb_message_element el;
2868         struct ldb_val val;
2869         int ret;
2870
2871         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2872         if (val.data == NULL) {
2873                 const char *dn_str = ldb_dn_get_linearized(dn);
2874                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2875                                        __location__ ": Failed to modify %s "
2876                                                     "against %s in %s: failed "
2877                                                     "to get casefold DN",
2878                                        index,
2879                                        ldb_kv->cache->GUID_index_attribute,
2880                                        dn_str);
2881                 return LDB_ERR_OPERATIONS_ERROR;
2882         }
2883
2884         val.length = strlen((char *)val.data);
2885         el.name = index;
2886         el.values = &val;
2887         el.num_values = 1;
2888
2889         if (add) {
2890                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2891         } else { /* delete */
2892                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2893         }
2894
2895         if (ret != LDB_SUCCESS) {
2896                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2897                 const char *dn_str = ldb_dn_get_linearized(dn);
2898                 ldb_asprintf_errstring(ldb,
2899                                        __location__ ": Failed to modify %s "
2900                                                     "against %s in %s - %s",
2901                                        index,
2902                                        ldb_kv->cache->GUID_index_attribute,
2903                                        dn_str,
2904                                        ldb_errstring(ldb));
2905                 return ret;
2906         }
2907         return ret;
2908 }
2909
2910 /*
2911   insert a one level index for a message
2912 */
2913 static int ldb_kv_index_onelevel(struct ldb_module *module,
2914                                  const struct ldb_message *msg,
2915                                  int add)
2916 {
2917         struct ldb_kv_private *ldb_kv = talloc_get_type(
2918             ldb_module_get_private(module), struct ldb_kv_private);
2919         struct ldb_dn *pdn;
2920         int ret;
2921
2922         /* We index for ONE Level only if requested */
2923         if (!ldb_kv->cache->one_level_indexes) {
2924                 return LDB_SUCCESS;
2925         }
2926
2927         pdn = ldb_dn_get_parent(module, msg->dn);
2928         if (pdn == NULL) {
2929                 return LDB_ERR_OPERATIONS_ERROR;
2930         }
2931         ret =
2932             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2933
2934         talloc_free(pdn);
2935
2936         return ret;
2937 }
2938
2939 /*
2940   insert a one level index for a message
2941 */
2942 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2943                                       const struct ldb_message *msg,
2944                                       int add)
2945 {
2946         int ret;
2947         struct ldb_kv_private *ldb_kv = talloc_get_type(
2948             ldb_module_get_private(module), struct ldb_kv_private);
2949
2950         /* We index for DN only if using a GUID index */
2951         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2952                 return LDB_SUCCESS;
2953         }
2954
2955         ret = ldb_kv_modify_index_dn(
2956             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2957
2958         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2959                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2960                                        "Entry %s already exists",
2961                                        ldb_dn_get_linearized(msg->dn));
2962                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2963         }
2964         return ret;
2965 }
2966
2967 /*
2968   add the index entries for a new element in a record
2969   The caller guarantees that these element values are not yet indexed
2970 */
2971 int ldb_kv_index_add_element(struct ldb_module *module,
2972                              struct ldb_kv_private *ldb_kv,
2973                              const struct ldb_message *msg,
2974                              struct ldb_message_element *el)
2975 {
2976         if (ldb_dn_is_special(msg->dn)) {
2977                 return LDB_SUCCESS;
2978         }
2979         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2980                 return LDB_SUCCESS;
2981         }
2982         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2983 }
2984
2985 /*
2986   add the index entries for a new record
2987 */
2988 int ldb_kv_index_add_new(struct ldb_module *module,
2989                          struct ldb_kv_private *ldb_kv,
2990                          const struct ldb_message *msg)
2991 {
2992         int ret;
2993
2994         if (ldb_dn_is_special(msg->dn)) {
2995                 return LDB_SUCCESS;
2996         }
2997
2998         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2999         if (ret != LDB_SUCCESS) {
3000                 /*
3001                  * Because we can't trust the caller to be doing
3002                  * transactions properly, clean up any index for this
3003                  * entry rather than relying on a transaction
3004                  * cleanup
3005                  */
3006
3007                 ldb_kv_index_delete(module, msg);
3008                 return ret;
3009         }
3010
3011         ret = ldb_kv_index_onelevel(module, msg, 1);
3012         if (ret != LDB_SUCCESS) {
3013                 /*
3014                  * Because we can't trust the caller to be doing
3015                  * transactions properly, clean up any index for this
3016                  * entry rather than relying on a transaction
3017                  * cleanup
3018                  */
3019                 ldb_kv_index_delete(module, msg);
3020                 return ret;
3021         }
3022         return ret;
3023 }
3024
3025
3026 /*
3027   delete an index entry for one message element
3028 */
3029 int ldb_kv_index_del_value(struct ldb_module *module,
3030                            struct ldb_kv_private *ldb_kv,
3031                            const struct ldb_message *msg,
3032                            struct ldb_message_element *el,
3033                            unsigned int v_idx)
3034 {
3035         struct ldb_context *ldb;
3036         struct ldb_dn *dn_key;
3037         const char *dn_str;
3038         int ret, i;
3039         unsigned int j;
3040         struct dn_list *list;
3041         struct ldb_dn *dn = msg->dn;
3042         enum key_truncation truncation = KEY_NOT_TRUNCATED;
3043
3044         ldb = ldb_module_get_ctx(module);
3045
3046         dn_str = ldb_dn_get_linearized(dn);
3047         if (dn_str == NULL) {
3048                 return LDB_ERR_OPERATIONS_ERROR;
3049         }
3050
3051         if (dn_str[0] == '@') {
3052                 return LDB_SUCCESS;
3053         }
3054
3055         dn_key = ldb_kv_index_key(
3056             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3057         /*
3058          * We ignore key truncation in ltdb_index_add1() so
3059          * match that by ignoring it here as well
3060          *
3061          * Multiple values are legitimate and accepted
3062          */
3063         if (!dn_key) {
3064                 return LDB_ERR_OPERATIONS_ERROR;
3065         }
3066
3067         list = talloc_zero(dn_key, struct dn_list);
3068         if (list == NULL) {
3069                 talloc_free(dn_key);
3070                 return LDB_ERR_OPERATIONS_ERROR;
3071         }
3072
3073         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
3074         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3075                 /* it wasn't indexed. Did we have an earlier error? If we did then
3076                    its gone now */
3077                 talloc_free(dn_key);
3078                 return LDB_SUCCESS;
3079         }
3080
3081         if (ret != LDB_SUCCESS) {
3082                 talloc_free(dn_key);
3083                 return ret;
3084         }
3085
3086         /*
3087          * Find one of the values matching this message to remove
3088          */
3089         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3090         if (i == -1) {
3091                 /* nothing to delete */
3092                 talloc_free(dn_key);
3093                 return LDB_SUCCESS;
3094         }
3095
3096         j = (unsigned int) i;
3097         if (j != list->count - 1) {
3098                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3099         }
3100         list->count--;
3101         if (list->count == 0) {
3102                 talloc_free(list->dn);
3103                 list->dn = NULL;
3104         } else {
3105                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3106         }
3107
3108         ret = ldb_kv_dn_list_store(module, dn_key, list);
3109
3110         talloc_free(dn_key);
3111
3112         return ret;
3113 }
3114
3115 /*
3116   delete the index entries for a element
3117   return -1 on failure
3118 */
3119 int ldb_kv_index_del_element(struct ldb_module *module,
3120                              struct ldb_kv_private *ldb_kv,
3121                              const struct ldb_message *msg,
3122                              struct ldb_message_element *el)
3123 {
3124         const char *dn_str;
3125         int ret;
3126         unsigned int i;
3127
3128         if (!ldb_kv->cache->attribute_indexes) {
3129                 /* no indexed fields */
3130                 return LDB_SUCCESS;
3131         }
3132
3133         dn_str = ldb_dn_get_linearized(msg->dn);
3134         if (dn_str == NULL) {
3135                 return LDB_ERR_OPERATIONS_ERROR;
3136         }
3137
3138         if (dn_str[0] == '@') {
3139                 return LDB_SUCCESS;
3140         }
3141
3142         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3143                 return LDB_SUCCESS;
3144         }
3145         for (i = 0; i < el->num_values; i++) {
3146                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3147                 if (ret != LDB_SUCCESS) {
3148                         return ret;
3149                 }
3150         }
3151
3152         return LDB_SUCCESS;
3153 }
3154
3155 /*
3156   delete the index entries for a record
3157   return -1 on failure
3158 */
3159 int ldb_kv_index_delete(struct ldb_module *module,
3160                         const struct ldb_message *msg)
3161 {
3162         struct ldb_kv_private *ldb_kv = talloc_get_type(
3163             ldb_module_get_private(module), struct ldb_kv_private);
3164         int ret;
3165         unsigned int i;
3166
3167         if (ldb_dn_is_special(msg->dn)) {
3168                 return LDB_SUCCESS;
3169         }
3170
3171         ret = ldb_kv_index_onelevel(module, msg, 0);
3172         if (ret != LDB_SUCCESS) {
3173                 return ret;
3174         }
3175
3176         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3177         if (ret != LDB_SUCCESS) {
3178                 return ret;
3179         }
3180
3181         if (!ldb_kv->cache->attribute_indexes) {
3182                 /* no indexed fields */
3183                 return LDB_SUCCESS;
3184         }
3185
3186         for (i = 0; i < msg->num_elements; i++) {
3187                 ret = ldb_kv_index_del_element(
3188                     module, ldb_kv, msg, &msg->elements[i]);
3189                 if (ret != LDB_SUCCESS) {
3190                         return ret;
3191                 }
3192         }
3193
3194         return LDB_SUCCESS;
3195 }
3196
3197
3198 /*
3199   traversal function that deletes all @INDEX records in the in-memory
3200   TDB.
3201
3202   This does not touch the actual DB, that is done at transaction
3203   commit, which in turn greatly reduces DB churn as we will likely
3204   be able to do a direct update into the old record.
3205 */
3206 static int delete_index(struct ldb_kv_private *ldb_kv,
3207                         struct ldb_val key,
3208                         struct ldb_val data,
3209                         void *state)
3210 {
3211         struct ldb_module *module = state;
3212         const char *dnstr = "DN=" LDB_KV_INDEX ":";
3213         struct dn_list list;
3214         struct ldb_dn *dn;
3215         struct ldb_val v;
3216         int ret;
3217
3218         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3219                 return 0;
3220         }
3221         /* we need to put a empty list in the internal tdb for this
3222          * index entry */
3223         list.dn = NULL;
3224         list.count = 0;
3225
3226         /* the offset of 3 is to remove the DN= prefix. */
3227         v.data = key.data + 3;
3228         v.length = strnlen((char *)key.data, key.length) - 3;
3229
3230         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3231
3232         /*
3233          * This does not actually touch the DB quite yet, just
3234          * the in-memory index cache
3235          */
3236         ret = ldb_kv_dn_list_store(module, dn, &list);
3237         if (ret != LDB_SUCCESS) {
3238                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3239                                        "Unable to store null index for %s\n",
3240                                                 ldb_dn_get_linearized(dn));
3241                 talloc_free(dn);
3242                 return -1;
3243         }
3244         talloc_free(dn);
3245         return 0;
3246 }
3247
3248 /*
3249   traversal function that adds @INDEX records during a re index TODO wrong comment
3250 */
3251 static int re_key(struct ldb_kv_private *ldb_kv,
3252                   struct ldb_val key,
3253                   struct ldb_val val,
3254                   void *state)
3255 {
3256         struct ldb_context *ldb;
3257         struct ldb_kv_reindex_context *ctx =
3258             (struct ldb_kv_reindex_context *)state;
3259         struct ldb_module *module = ctx->module;
3260         struct ldb_message *msg;
3261         unsigned int nb_elements_in_db;
3262         int ret;
3263         struct ldb_val key2;
3264         bool is_record;
3265
3266         ldb = ldb_module_get_ctx(module);
3267
3268         if (key.length > 4 &&
3269             memcmp(key.data, "DN=@", 4) == 0) {
3270                 return 0;
3271         }
3272
3273         is_record = ldb_kv_key_is_record(key);
3274         if (is_record == false) {
3275                 return 0;
3276         }
3277
3278         msg = ldb_msg_new(module);
3279         if (msg == NULL) {
3280                 return -1;
3281         }
3282
3283         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
3284                                                    msg,
3285                                                    NULL, 0,
3286                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
3287                                                    &nb_elements_in_db);
3288         if (ret != 0) {
3289                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3290                                                 ldb_dn_get_linearized(msg->dn));
3291                 ctx->error = ret;
3292                 talloc_free(msg);
3293                 return -1;
3294         }
3295
3296         if (msg->dn == NULL) {
3297                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3298                           "Refusing to re-index as GUID "
3299                           "key %*.*s with no DN\n",
3300                           (int)key.length, (int)key.length,
3301                           (char *)key.data);
3302                 talloc_free(msg);
3303                 return -1;
3304         }
3305
3306         /* check if the DN key has changed, perhaps due to the case
3307            insensitivity of an element changing, or a change from DN
3308            to GUID keys */
3309         key2 = ldb_kv_key_msg(module, msg, msg);
3310         if (key2.data == NULL) {
3311                 /* probably a corrupt record ... darn */
3312                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3313                                                 ldb_dn_get_linearized(msg->dn));
3314                 talloc_free(msg);
3315                 return 0;
3316         }
3317         if (key.length != key2.length ||
3318             (memcmp(key.data, key2.data, key.length) != 0)) {
3319                 ldb_kv->kv_ops->update_in_iterate(
3320                     ldb_kv, key, key2, val, ctx);
3321         }
3322         talloc_free(key2.data);
3323
3324         talloc_free(msg);
3325
3326         ctx->count++;
3327         if (ctx->count % 10000 == 0) {
3328                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3329                           "Reindexing: re-keyed %u records so far",
3330                           ctx->count);
3331         }
3332
3333         return 0;
3334 }
3335
3336 /*
3337   traversal function that adds @INDEX records during a re index
3338 */
3339 static int re_index(struct ldb_kv_private *ldb_kv,
3340                     struct ldb_val key,
3341                     struct ldb_val val,
3342                     void *state)
3343 {
3344         struct ldb_context *ldb;
3345         struct ldb_kv_reindex_context *ctx =
3346             (struct ldb_kv_reindex_context *)state;
3347         struct ldb_module *module = ctx->module;
3348         struct ldb_message *msg;
3349         unsigned int nb_elements_in_db;
3350         int ret;
3351         bool is_record;
3352
3353         ldb = ldb_module_get_ctx(module);
3354
3355         if (key.length > 4 &&
3356             memcmp(key.data, "DN=@", 4) == 0) {
3357                 return 0;
3358         }
3359
3360         is_record = ldb_kv_key_is_record(key);
3361         if (is_record == false) {
3362                 return 0;
3363         }
3364
3365         msg = ldb_msg_new(module);
3366         if (msg == NULL) {
3367                 return -1;
3368         }
3369
3370         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
3371                                                    msg,
3372                                                    NULL, 0,
3373                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
3374                                                    &nb_elements_in_db);
3375         if (ret != 0) {
3376                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3377                                                 ldb_dn_get_linearized(msg->dn));
3378                 ctx->error = ret;
3379                 talloc_free(msg);
3380                 return -1;
3381         }
3382
3383         if (msg->dn == NULL) {
3384                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3385                           "Refusing to re-index as GUID "
3386                           "key %*.*s with no DN\n",
3387                           (int)key.length, (int)key.length,
3388                           (char *)key.data);
3389                 talloc_free(msg);
3390                 return -1;
3391         }
3392
3393         ret = ldb_kv_index_onelevel(module, msg, 1);
3394         if (ret != LDB_SUCCESS) {
3395                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3396                           "Adding special ONE LEVEL index failed (%s)!",
3397                                                 ldb_dn_get_linearized(msg->dn));
3398                 talloc_free(msg);
3399                 return -1;
3400         }
3401
3402         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3403
3404         if (ret != LDB_SUCCESS) {
3405                 ctx->error = ret;
3406                 talloc_free(msg);
3407                 return -1;
3408         }
3409
3410         talloc_free(msg);
3411
3412         ctx->count++;
3413         if (ctx->count % 10000 == 0) {
3414                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3415                           "Reindexing: re-indexed %u records so far",
3416                           ctx->count);
3417         }
3418
3419         return 0;
3420 }
3421
3422 /*
3423   force a complete reindex of the database
3424 */
3425 int ldb_kv_reindex(struct ldb_module *module)
3426 {
3427         struct ldb_kv_private *ldb_kv = talloc_get_type(
3428             ldb_module_get_private(module), struct ldb_kv_private);
3429         int ret;
3430         struct ldb_kv_reindex_context ctx;
3431         size_t index_cache_size = 0;
3432
3433         /*
3434          * Only triggered after a modification, but make clear we do
3435          * not re-index a read-only DB
3436          */
3437         if (ldb_kv->read_only) {
3438                 return LDB_ERR_UNWILLING_TO_PERFORM;
3439         }
3440
3441         if (ldb_kv_cache_reload(module) != 0) {
3442                 return LDB_ERR_OPERATIONS_ERROR;
3443         }
3444
3445         /*
3446          * Ensure we read (and so remove) the entries from the real
3447          * DB, no values stored so far are any use as we want to do a
3448          * re-index
3449          */
3450         ldb_kv_index_transaction_cancel(module);
3451
3452         /*
3453          * Calculate the size of the index cache that we'll need for
3454          * the re-index
3455          */
3456         index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3457         if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3458                 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3459         }
3460
3461         ret = ldb_kv_index_transaction_start(module, index_cache_size);
3462         if (ret != LDB_SUCCESS) {
3463                 return ret;
3464         }
3465
3466         /* first traverse the database deleting any @INDEX records by
3467          * putting NULL entries in the in-memory tdb
3468          */
3469         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3470         if (ret < 0) {
3471                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3472                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3473                                        ldb_errstring(ldb));
3474                 return LDB_ERR_OPERATIONS_ERROR;
3475         }
3476
3477         ctx.module = module;
3478         ctx.error = 0;
3479         ctx.count = 0;
3480
3481         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3482         if (ret < 0) {
3483                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3484                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3485                                        ldb_errstring(ldb));
3486                 return LDB_ERR_OPERATIONS_ERROR;
3487         }
3488
3489         if (ctx.error != LDB_SUCCESS) {
3490                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3491                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3492                 return ctx.error;
3493         }
3494
3495         ctx.error = 0;
3496         ctx.count = 0;
3497
3498         /* now traverse adding any indexes for normal LDB records */
3499         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3500         if (ret < 0) {
3501                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3502                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3503                                        ldb_errstring(ldb));
3504                 return LDB_ERR_OPERATIONS_ERROR;
3505         }
3506
3507         if (ctx.error != LDB_SUCCESS) {
3508                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3509                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3510                 return ctx.error;
3511         }
3512
3513         if (ctx.count > 10000) {
3514                 ldb_debug(ldb_module_get_ctx(module),
3515                           LDB_DEBUG_WARNING,
3516                           "Reindexing: re_index successful on %s, "
3517                           "final index write-out will be in transaction commit",
3518                           ldb_kv->kv_ops->name(ldb_kv));
3519         }
3520         return LDB_SUCCESS;
3521 }