lib ldb ldb_key_value: csbuild fix integer comparison
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(
203         struct ldb_module *module,
204         size_t cache_size)
205 {
206         struct ldb_kv_private *ldb_kv = talloc_get_type(
207             ldb_module_get_private(module), struct ldb_kv_private);
208         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
209         if (ldb_kv->idxptr == NULL) {
210                 return ldb_oom(ldb_module_get_ctx(module));
211         }
212
213         ldb_kv->idxptr->itdb = tdb_open(
214                 NULL,
215                 cache_size,
216                 TDB_INTERNAL,
217                 O_RDWR,
218                 0);
219         if (ldb_kv->idxptr->itdb == NULL) {
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         return LDB_SUCCESS;
224 }
225
226 /*
227   see if two ldb_val structures contain exactly the same data
228   return -1 or 1 for a mismatch, 0 for match
229 */
230 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
231                                          const struct ldb_val *v2)
232 {
233         if (v1->length > v2->length) {
234                 return -1;
235         }
236         if (v1->length < v2->length) {
237                 return 1;
238         }
239         return memcmp(v1->data, v2->data, v1->length);
240 }
241
242 /*
243   see if two ldb_val structures contain exactly the same data
244   return -1 or 1 for a mismatch, 0 for match
245 */
246 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
247                                        const struct ldb_val *v2)
248 {
249         if (v1.length > v2->length) {
250                 return -1;
251         }
252         if (v1.length < v2->length) {
253                 return 1;
254         }
255         return memcmp(v1.data, v2->data, v1.length);
256 }
257
258
259 /*
260   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
261   binary-safe comparison for the 'dn' returns -1 if not found
262
263   This is therefore safe when the value is a GUID in the future
264  */
265 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
266                                    const struct dn_list *list,
267                                    const struct ldb_val *v)
268 {
269         unsigned int i;
270         struct ldb_val *exact = NULL, *next = NULL;
271
272         if (ldb_kv->cache->GUID_index_attribute == NULL) {
273                 for (i=0; i<list->count; i++) {
274                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
275                                 return i;
276                         }
277                 }
278                 return -1;
279         }
280
281         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
282                                 *v, ldb_val_equal_exact_ordered,
283                                 exact, next);
284         if (exact == NULL) {
285                 return -1;
286         }
287         /* Not required, but keeps the compiler quiet */
288         if (next != NULL) {
289                 return -1;
290         }
291
292         i = exact - list->dn;
293         return i;
294 }
295
296 /*
297   find a entry in a dn_list. Uses a case sensitive comparison with the dn
298   returns -1 if not found
299  */
300 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
301                                    struct dn_list *list,
302                                    const struct ldb_message *msg)
303 {
304         struct ldb_val v;
305         const struct ldb_val *key_val;
306         if (ldb_kv->cache->GUID_index_attribute == NULL) {
307                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
308                 v.data = discard_const_p(unsigned char, dn_str);
309                 v.length = strlen(dn_str);
310         } else {
311                 key_val = ldb_msg_find_ldb_val(
312                     msg, ldb_kv->cache->GUID_index_attribute);
313                 if (key_val == NULL) {
314                         return -1;
315                 }
316                 v = *key_val;
317         }
318         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
319 }
320
321 /*
322   this is effectively a cast function, but with lots of paranoia
323   checks and also copes with CPUs that are fussy about pointer
324   alignment
325  */
326 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
327                                            TDB_DATA rec,
328                                            bool check_parent)
329 {
330         struct dn_list *list;
331         if (rec.dsize != sizeof(void *)) {
332                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
333                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
334                 return NULL;
335         }
336         /* note that we can't just use a cast here, as rec.dptr may
337            not be aligned sufficiently for a pointer. A cast would cause
338            platforms like some ARM CPUs to crash */
339         memcpy(&list, rec.dptr, sizeof(void *));
340         list = talloc_get_type(list, struct dn_list);
341         if (list == NULL) {
342                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
343                                        "Bad type '%s' for idxptr",
344                                        talloc_get_name(list));
345                 return NULL;
346         }
347         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
348                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
349                                        "Bad parent '%s' for idxptr",
350                                        talloc_get_name(talloc_parent(list->dn)));
351                 return NULL;
352         }
353         return list;
354 }
355
356 /*
357   return the @IDX list in an index entry for a dn as a
358   struct dn_list
359  */
360 static int ldb_kv_dn_list_load(struct ldb_module *module,
361                                struct ldb_kv_private *ldb_kv,
362                                struct ldb_dn *dn,
363                                struct dn_list *list)
364 {
365         struct ldb_message *msg;
366         int ret, version;
367         struct ldb_message_element *el;
368         TDB_DATA rec;
369         struct dn_list *list2;
370         TDB_DATA key;
371
372         list->dn = NULL;
373         list->count = 0;
374
375         /* see if we have any in-memory index entries */
376         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
377                 goto normal_index;
378         }
379
380         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
381         key.dsize = strlen((char *)key.dptr);
382
383         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
384         if (rec.dptr == NULL) {
385                 goto normal_index;
386         }
387
388         /* we've found an in-memory index entry */
389         list2 = ldb_kv_index_idxptr(module, rec, true);
390         if (list2 == NULL) {
391                 free(rec.dptr);
392                 return LDB_ERR_OPERATIONS_ERROR;
393         }
394         free(rec.dptr);
395
396         *list = *list2;
397         return LDB_SUCCESS;
398
399 normal_index:
400         msg = ldb_msg_new(list);
401         if (msg == NULL) {
402                 return LDB_ERR_OPERATIONS_ERROR;
403         }
404
405         ret = ldb_kv_search_dn1(module,
406                                 dn,
407                                 msg,
408                                 LDB_UNPACK_DATA_FLAG_NO_DN |
409                                 /*
410                                  * The entry point ldb_kv_search_indexed is
411                                  * only called from the read-locked
412                                  * ldb_kv_search.
413                                  */
414                                 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
415         if (ret != LDB_SUCCESS) {
416                 talloc_free(msg);
417                 return ret;
418         }
419
420         el = ldb_msg_find_element(msg, LDB_KV_IDX);
421         if (!el) {
422                 talloc_free(msg);
423                 return LDB_SUCCESS;
424         }
425
426         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
427
428         /*
429          * we avoid copying the strings by stealing the list.  We have
430          * to steal msg onto el->values (which looks odd) because
431          * the memory is allocated on msg, not on each value.
432          */
433         if (ldb_kv->cache->GUID_index_attribute == NULL) {
434                 /* check indexing version number */
435                 if (version != LDB_KV_INDEXING_VERSION) {
436                         ldb_debug_set(ldb_module_get_ctx(module),
437                                       LDB_DEBUG_ERROR,
438                                       "Wrong DN index version %d "
439                                       "expected %d for %s",
440                                       version, LDB_KV_INDEXING_VERSION,
441                                       ldb_dn_get_linearized(dn));
442                         talloc_free(msg);
443                         return LDB_ERR_OPERATIONS_ERROR;
444                 }
445
446                 talloc_steal(el->values, msg);
447                 list->dn = talloc_steal(list, el->values);
448                 list->count = el->num_values;
449         } else {
450                 unsigned int i;
451                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
452                         /* This is quite likely during the DB startup
453                            on first upgrade to using a GUID index */
454                         ldb_debug_set(ldb_module_get_ctx(module),
455                                       LDB_DEBUG_ERROR,
456                                       "Wrong GUID index version %d "
457                                       "expected %d for %s",
458                                       version, LDB_KV_GUID_INDEXING_VERSION,
459                                       ldb_dn_get_linearized(dn));
460                         talloc_free(msg);
461                         return LDB_ERR_OPERATIONS_ERROR;
462                 }
463
464                 if (el->num_values == 0) {
465                         talloc_free(msg);
466                         return LDB_ERR_OPERATIONS_ERROR;
467                 }
468
469                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
470                         talloc_free(msg);
471                         return LDB_ERR_OPERATIONS_ERROR;
472                 }
473
474                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
475                 list->dn = talloc_array(list, struct ldb_val, list->count);
476                 if (list->dn == NULL) {
477                         talloc_free(msg);
478                         return LDB_ERR_OPERATIONS_ERROR;
479                 }
480
481                 /*
482                  * The actual data is on msg.
483                  */
484                 talloc_steal(list->dn, msg);
485                 for (i = 0; i < list->count; i++) {
486                         list->dn[i].data
487                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
488                         list->dn[i].length = LDB_KV_GUID_SIZE;
489                 }
490         }
491
492         /* We don't need msg->elements any more */
493         talloc_free(msg->elements);
494         return LDB_SUCCESS;
495 }
496
497 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
498                            struct ldb_kv_private *ldb_kv,
499                            TALLOC_CTX *mem_ctx,
500                            struct ldb_dn *dn,
501                            struct ldb_val *ldb_key)
502 {
503         struct ldb_context *ldb = ldb_module_get_ctx(module);
504         int ret;
505         int index = 0;
506         enum key_truncation truncation = KEY_NOT_TRUNCATED;
507         struct dn_list *list = talloc(mem_ctx, struct dn_list);
508         if (list == NULL) {
509                 ldb_oom(ldb);
510                 return LDB_ERR_OPERATIONS_ERROR;
511         }
512
513         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
514         if (ret != LDB_SUCCESS) {
515                 TALLOC_FREE(list);
516                 return ret;
517         }
518
519         if (list->count == 0) {
520                 TALLOC_FREE(list);
521                 return LDB_ERR_NO_SUCH_OBJECT;
522         }
523
524         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
525                 const char *dn_str = ldb_dn_get_linearized(dn);
526                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
527                                        __location__
528                                        ": Failed to read DN index "
529                                        "against %s for %s: too many "
530                                        "values (%u > 1)",
531                                        ldb_kv->cache->GUID_index_attribute,
532                                        dn_str,
533                                        list->count);
534                 TALLOC_FREE(list);
535                 return LDB_ERR_CONSTRAINT_VIOLATION;
536         }
537
538         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
539                 /*
540                  * DN key has been truncated, need to inspect the actual
541                  * records to locate the actual DN
542                  */
543                 unsigned int i;
544                 index = -1;
545                 for (i=0; i < list->count; i++) {
546                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
547                         struct ldb_val key = {
548                                 .data = guid_key,
549                                 .length = sizeof(guid_key)
550                         };
551                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
552                         struct ldb_message *rec = ldb_msg_new(ldb);
553                         if (rec == NULL) {
554                                 TALLOC_FREE(list);
555                                 return LDB_ERR_OPERATIONS_ERROR;
556                         }
557
558                         ret = ldb_kv_idx_to_key(
559                             module, ldb_kv, ldb, &list->dn[i], &key);
560                         if (ret != LDB_SUCCESS) {
561                                 TALLOC_FREE(list);
562                                 TALLOC_FREE(rec);
563                                 return ret;
564                         }
565
566                         ret =
567                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
568                         if (key.data != guid_key) {
569                                 TALLOC_FREE(key.data);
570                         }
571                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
572                                 /*
573                                  * the record has disappeared?
574                                  * yes, this can happen
575                                  */
576                                 TALLOC_FREE(rec);
577                                 continue;
578                         }
579
580                         if (ret != LDB_SUCCESS) {
581                                 /* an internal error */
582                                 TALLOC_FREE(rec);
583                                 TALLOC_FREE(list);
584                                 return LDB_ERR_OPERATIONS_ERROR;
585                         }
586
587                         /*
588                          * We found the actual DN that we wanted from in the
589                          * multiple values that matched the index
590                          * (due to truncation), so return that.
591                          *
592                          */
593                         if (ldb_dn_compare(dn, rec->dn) == 0) {
594                                 index = i;
595                                 TALLOC_FREE(rec);
596                                 break;
597                         }
598                 }
599
600                 /*
601                  * We matched the index but the actual DN we wanted
602                  * was not here.
603                  */
604                 if (index == -1) {
605                         TALLOC_FREE(list);
606                         return LDB_ERR_NO_SUCH_OBJECT;
607                 }
608         }
609
610         /* The ldb_key memory is allocated by the caller */
611         ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
612         TALLOC_FREE(list);
613
614         if (ret != LDB_SUCCESS) {
615                 return LDB_ERR_OPERATIONS_ERROR;
616         }
617
618         return LDB_SUCCESS;
619 }
620
621
622
623 /*
624   save a dn_list into a full @IDX style record
625  */
626 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
627                                      struct ldb_kv_private *ldb_kv,
628                                      struct ldb_dn *dn,
629                                      struct dn_list *list)
630 {
631         struct ldb_message *msg;
632         int ret;
633
634         msg = ldb_msg_new(module);
635         if (!msg) {
636                 return ldb_module_oom(module);
637         }
638
639         msg->dn = dn;
640
641         if (list->count == 0) {
642                 ret = ldb_kv_delete_noindex(module, msg);
643                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
644                         ret = LDB_SUCCESS;
645                 }
646                 talloc_free(msg);
647                 return ret;
648         }
649
650         if (ldb_kv->cache->GUID_index_attribute == NULL) {
651                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
652                                       LDB_KV_INDEXING_VERSION);
653                 if (ret != LDB_SUCCESS) {
654                         talloc_free(msg);
655                         return ldb_module_oom(module);
656                 }
657         } else {
658                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
659                                       LDB_KV_GUID_INDEXING_VERSION);
660                 if (ret != LDB_SUCCESS) {
661                         talloc_free(msg);
662                         return ldb_module_oom(module);
663                 }
664         }
665
666         if (list->count > 0) {
667                 struct ldb_message_element *el;
668
669                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
670                 if (ret != LDB_SUCCESS) {
671                         talloc_free(msg);
672                         return ldb_module_oom(module);
673                 }
674
675                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
676                         el->values = list->dn;
677                         el->num_values = list->count;
678                 } else {
679                         struct ldb_val v;
680                         unsigned int i;
681                         el->values = talloc_array(msg,
682                                                   struct ldb_val, 1);
683                         if (el->values == NULL) {
684                                 talloc_free(msg);
685                                 return ldb_module_oom(module);
686                         }
687
688                         v.data = talloc_array_size(el->values,
689                                                    list->count,
690                                                    LDB_KV_GUID_SIZE);
691                         if (v.data == NULL) {
692                                 talloc_free(msg);
693                                 return ldb_module_oom(module);
694                         }
695
696                         v.length = talloc_get_size(v.data);
697
698                         for (i = 0; i < list->count; i++) {
699                                 if (list->dn[i].length !=
700                                     LDB_KV_GUID_SIZE) {
701                                         talloc_free(msg);
702                                         return ldb_module_operr(module);
703                                 }
704                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
705                                        list->dn[i].data,
706                                        LDB_KV_GUID_SIZE);
707                         }
708                         el->values[0] = v;
709                         el->num_values = 1;
710                 }
711         }
712
713         ret = ldb_kv_store(module, msg, TDB_REPLACE);
714         talloc_free(msg);
715         return ret;
716 }
717
718 /*
719   save a dn_list into the database, in either @IDX or internal format
720  */
721 static int ldb_kv_dn_list_store(struct ldb_module *module,
722                                 struct ldb_dn *dn,
723                                 struct dn_list *list)
724 {
725         struct ldb_kv_private *ldb_kv = talloc_get_type(
726             ldb_module_get_private(module), struct ldb_kv_private);
727         TDB_DATA rec, key;
728         int ret;
729         struct dn_list *list2;
730
731         if (ldb_kv->idxptr == NULL) {
732                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
733         }
734
735         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
736         if (key.dptr == NULL) {
737                 return LDB_ERR_OPERATIONS_ERROR;
738         }
739         key.dsize = strlen((char *)key.dptr);
740
741         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
742         if (rec.dptr != NULL) {
743                 list2 = ldb_kv_index_idxptr(module, rec, false);
744                 if (list2 == NULL) {
745                         free(rec.dptr);
746                         return LDB_ERR_OPERATIONS_ERROR;
747                 }
748                 free(rec.dptr);
749                 list2->dn = talloc_steal(list2, list->dn);
750                 list2->count = list->count;
751                 return LDB_SUCCESS;
752         }
753
754         list2 = talloc(ldb_kv->idxptr, struct dn_list);
755         if (list2 == NULL) {
756                 return LDB_ERR_OPERATIONS_ERROR;
757         }
758         list2->dn = talloc_steal(list2, list->dn);
759         list2->count = list->count;
760
761         rec.dptr = (uint8_t *)&list2;
762         rec.dsize = sizeof(void *);
763
764
765         /*
766          * This is not a store into the main DB, but into an in-memory
767          * TDB, so we don't need a guard on ltdb->read_only
768          */
769         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
770         if (ret != 0) {
771                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
772         }
773         return LDB_SUCCESS;
774 }
775
776 /*
777   traverse function for storing the in-memory index entries on disk
778  */
779 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
780                                        TDB_DATA key,
781                                        TDB_DATA data,
782                                        void *state)
783 {
784         struct ldb_module *module = state;
785         struct ldb_kv_private *ldb_kv = talloc_get_type(
786             ldb_module_get_private(module), struct ldb_kv_private);
787         struct ldb_dn *dn;
788         struct ldb_context *ldb = ldb_module_get_ctx(module);
789         struct ldb_val v;
790         struct dn_list *list;
791
792         list = ldb_kv_index_idxptr(module, data, true);
793         if (list == NULL) {
794                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
795                 return -1;
796         }
797
798         v.data = key.dptr;
799         v.length = strnlen((char *)key.dptr, key.dsize);
800
801         dn = ldb_dn_from_ldb_val(module, ldb, &v);
802         if (dn == NULL) {
803                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
804                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
805                 return -1;
806         }
807
808         ldb_kv->idxptr->error =
809             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
810         talloc_free(dn);
811         if (ldb_kv->idxptr->error != 0) {
812                 return -1;
813         }
814         return 0;
815 }
816
817 /* cleanup the idxptr mode when transaction commits */
818 int ldb_kv_index_transaction_commit(struct ldb_module *module)
819 {
820         struct ldb_kv_private *ldb_kv = talloc_get_type(
821             ldb_module_get_private(module), struct ldb_kv_private);
822         int ret;
823
824         struct ldb_context *ldb = ldb_module_get_ctx(module);
825
826         ldb_reset_err_string(ldb);
827
828         if (ldb_kv->idxptr->itdb) {
829                 tdb_traverse(
830                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
831                 tdb_close(ldb_kv->idxptr->itdb);
832         }
833
834         ret = ldb_kv->idxptr->error;
835         if (ret != LDB_SUCCESS) {
836                 if (!ldb_errstring(ldb)) {
837                         ldb_set_errstring(ldb, ldb_strerror(ret));
838                 }
839                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
840         }
841
842         talloc_free(ldb_kv->idxptr);
843         ldb_kv->idxptr = NULL;
844         return ret;
845 }
846
847 /* cleanup the idxptr mode when transaction cancels */
848 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
849 {
850         struct ldb_kv_private *ldb_kv = talloc_get_type(
851             ldb_module_get_private(module), struct ldb_kv_private);
852         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
853                 tdb_close(ldb_kv->idxptr->itdb);
854         }
855         talloc_free(ldb_kv->idxptr);
856         ldb_kv->idxptr = NULL;
857         return LDB_SUCCESS;
858 }
859
860
861 /*
862   return the dn key to be used for an index
863   the caller is responsible for freeing
864 */
865 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
866                                        struct ldb_kv_private *ldb_kv,
867                                        const char *attr,
868                                        const struct ldb_val *value,
869                                        const struct ldb_schema_attribute **ap,
870                                        enum key_truncation *truncation)
871 {
872         struct ldb_dn *ret;
873         struct ldb_val v;
874         const struct ldb_schema_attribute *a = NULL;
875         char *attr_folded = NULL;
876         const char *attr_for_dn = NULL;
877         int r;
878         bool should_b64_encode;
879
880         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
881         size_t key_len = 0;
882         size_t attr_len = 0;
883         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
884         unsigned frmt_len = 0;
885         const size_t additional_key_length = 4;
886         unsigned int num_separators = 3; /* Estimate for overflow check */
887         const size_t min_data = 1;
888         const size_t min_key_length = additional_key_length
889                 + indx_len + num_separators + min_data;
890         struct ldb_val empty;
891
892         /*
893          * Accept a NULL value as a request for a key with no value.  This is
894          * different from passing an empty value, which might be given
895          * significance by some canonicalise functions.
896          */
897         bool empty_val = value == NULL;
898         if (empty_val) {
899                 empty.length = 0;
900                 empty.data = discard_const_p(unsigned char, "");
901                 value = &empty;
902         }
903
904         if (attr[0] == '@') {
905                 attr_for_dn = attr;
906                 v = *value;
907                 if (ap != NULL) {
908                         *ap = NULL;
909                 }
910         } else {
911                 attr_folded = ldb_attr_casefold(ldb, attr);
912                 if (!attr_folded) {
913                         return NULL;
914                 }
915
916                 attr_for_dn = attr_folded;
917
918                 a = ldb_schema_attribute_by_name(ldb, attr);
919                 if (ap) {
920                         *ap = a;
921                 }
922
923                 if (empty_val) {
924                         v = *value;
925                 } else {
926                         ldb_attr_handler_t fn;
927                         if (a->syntax->index_format_fn &&
928                             ldb_kv->cache->GUID_index_attribute != NULL) {
929                                 fn = a->syntax->index_format_fn;
930                         } else {
931                                 fn = a->syntax->canonicalise_fn;
932                         }
933                         r = fn(ldb, ldb, value, &v);
934                         if (r != LDB_SUCCESS) {
935                                 const char *errstr = ldb_errstring(ldb);
936                                 /* canonicalisation can be refused. For
937                                    example, a attribute that takes wildcards
938                                    will refuse to canonicalise if the value
939                                    contains a wildcard */
940                                 ldb_asprintf_errstring(ldb,
941                                                        "Failed to create "
942                                                        "index key for "
943                                                        "attribute '%s':%s%s%s",
944                                                        attr, ldb_strerror(r),
945                                                        (errstr?":":""),
946                                                        (errstr?errstr:""));
947                                 talloc_free(attr_folded);
948                                 return NULL;
949                         }
950                 }
951         }
952         attr_len = strlen(attr_for_dn);
953
954         /*
955          * Check if there is any hope this will fit into the DB.
956          * Overflow here is not actually critical the code below
957          * checks again to make the printf and the DB does another
958          * check for too long keys
959          */
960         if (max_key_length - attr_len < min_key_length) {
961                 ldb_asprintf_errstring(
962                         ldb,
963                         __location__ ": max_key_length "
964                         "is too small (%u) < (%u)",
965                         max_key_length,
966                         (unsigned)(min_key_length + attr_len));
967                 talloc_free(attr_folded);
968                 return NULL;
969         }
970
971         /*
972          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
973          * "DN=" and a trailing string terminator
974          */
975         max_key_length -= additional_key_length;
976
977         /*
978          * We do not base 64 encode a DN in a key, it has already been
979          * casefold and lineraized, that is good enough.  That already
980          * avoids embedded NUL etc.
981          */
982         if (ldb_kv->cache->GUID_index_attribute != NULL) {
983                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
984                         should_b64_encode = false;
985                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
986                         /*
987                          * We can only change the behaviour for IDXONE
988                          * when the GUID index is enabled
989                          */
990                         should_b64_encode = false;
991                 } else {
992                         should_b64_encode
993                                 = ldb_should_b64_encode(ldb, &v);
994                 }
995         } else {
996                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
997         }
998
999         if (should_b64_encode) {
1000                 size_t vstr_len = 0;
1001                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1002                 if (!vstr) {
1003                         talloc_free(attr_folded);
1004                         return NULL;
1005                 }
1006                 vstr_len = strlen(vstr);
1007                 /*
1008                  * Overflow here is not critical as we only use this
1009                  * to choose the printf truncation
1010                  */
1011                 key_len = num_separators + indx_len + attr_len + vstr_len;
1012                 if (key_len > max_key_length) {
1013                         size_t excess = key_len - max_key_length;
1014                         frmt_len = vstr_len - excess;
1015                         *truncation = KEY_TRUNCATED;
1016                         /*
1017                         * Truncated keys are placed in a separate key space
1018                         * from the non truncated keys
1019                         * Note: the double hash "##" is not a typo and
1020                         * indicates that the following value is base64 encoded
1021                         */
1022                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1023                                              LDB_KV_INDEX, attr_for_dn,
1024                                              frmt_len, vstr);
1025                 } else {
1026                         frmt_len = vstr_len;
1027                         *truncation = KEY_NOT_TRUNCATED;
1028                         /*
1029                          * Note: the double colon "::" is not a typo and
1030                          * indicates that the following value is base64 encoded
1031                          */
1032                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1033                                              LDB_KV_INDEX, attr_for_dn,
1034                                              frmt_len, vstr);
1035                 }
1036                 talloc_free(vstr);
1037         } else {
1038                 /* Only need two seperators */
1039                 num_separators = 2;
1040
1041                 /*
1042                  * Overflow here is not critical as we only use this
1043                  * to choose the printf truncation
1044                  */
1045                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1046                 if (key_len > max_key_length) {
1047                         size_t excess = key_len - max_key_length;
1048                         frmt_len = v.length - excess;
1049                         *truncation = KEY_TRUNCATED;
1050                         /*
1051                          * Truncated keys are placed in a separate key space
1052                          * from the non truncated keys
1053                          */
1054                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1055                                              LDB_KV_INDEX, attr_for_dn,
1056                                              frmt_len, (char *)v.data);
1057                 } else {
1058                         frmt_len = v.length;
1059                         *truncation = KEY_NOT_TRUNCATED;
1060                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1061                                              LDB_KV_INDEX, attr_for_dn,
1062                                              frmt_len, (char *)v.data);
1063                 }
1064         }
1065
1066         if (v.data != value->data && !empty_val) {
1067                 talloc_free(v.data);
1068         }
1069         talloc_free(attr_folded);
1070
1071         return ret;
1072 }
1073
1074 /*
1075   see if a attribute value is in the list of indexed attributes
1076 */
1077 static bool ldb_kv_is_indexed(struct ldb_module *module,
1078                               struct ldb_kv_private *ldb_kv,
1079                               const char *attr)
1080 {
1081         struct ldb_context *ldb = ldb_module_get_ctx(module);
1082         unsigned int i;
1083         struct ldb_message_element *el;
1084
1085         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1086             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1087                 /* Implicity covered, this is the index key */
1088                 return false;
1089         }
1090         if (ldb->schema.index_handler_override) {
1091                 const struct ldb_schema_attribute *a
1092                         = ldb_schema_attribute_by_name(ldb, attr);
1093
1094                 if (a == NULL) {
1095                         return false;
1096                 }
1097
1098                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1099                         return true;
1100                 } else {
1101                         return false;
1102                 }
1103         }
1104
1105         if (!ldb_kv->cache->attribute_indexes) {
1106                 return false;
1107         }
1108
1109         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1110         if (el == NULL) {
1111                 return false;
1112         }
1113
1114         /* TODO: this is too expensive! At least use a binary search */
1115         for (i=0; i<el->num_values; i++) {
1116                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1117                         return true;
1118                 }
1119         }
1120         return false;
1121 }
1122
1123 /*
1124   in the following logic functions, the return value is treated as
1125   follows:
1126
1127      LDB_SUCCESS: we found some matching index values
1128
1129      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1130
1131      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1132                                we'll need a full search
1133  */
1134
1135 /*
1136   return a list of dn's that might match a simple indexed search (an
1137   equality search only)
1138  */
1139 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1140                                   struct ldb_kv_private *ldb_kv,
1141                                   const struct ldb_parse_tree *tree,
1142                                   struct dn_list *list)
1143 {
1144         struct ldb_context *ldb;
1145         struct ldb_dn *dn;
1146         int ret;
1147         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1148
1149         ldb = ldb_module_get_ctx(module);
1150
1151         list->count = 0;
1152         list->dn = NULL;
1153
1154         /* if the attribute isn't in the list of indexed attributes then
1155            this node needs a full search */
1156         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1157                 return LDB_ERR_OPERATIONS_ERROR;
1158         }
1159
1160         /* the attribute is indexed. Pull the list of DNs that match the
1161            search criterion */
1162         dn = ldb_kv_index_key(ldb,
1163                               ldb_kv,
1164                               tree->u.equality.attr,
1165                               &tree->u.equality.value,
1166                               NULL,
1167                               &truncation);
1168         /*
1169          * We ignore truncation here and allow multi-valued matches
1170          * as ltdb_search_indexed will filter out the wrong one in
1171          * ltdb_index_filter() which calls ldb_match_message().
1172          */
1173         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1174
1175         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1176         talloc_free(dn);
1177         return ret;
1178 }
1179
1180 static bool list_union(struct ldb_context *ldb,
1181                        struct ldb_kv_private *ldb_kv,
1182                        struct dn_list *list,
1183                        struct dn_list *list2);
1184
1185 /*
1186   return a list of dn's that might match a leaf indexed search
1187  */
1188 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1189                                 struct ldb_kv_private *ldb_kv,
1190                                 const struct ldb_parse_tree *tree,
1191                                 struct dn_list *list)
1192 {
1193         if (ldb_kv->disallow_dn_filter &&
1194             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1195                 /* in AD mode we do not support "(dn=...)" search filters */
1196                 list->dn = NULL;
1197                 list->count = 0;
1198                 return LDB_SUCCESS;
1199         }
1200         if (tree->u.equality.attr[0] == '@') {
1201                 /* Do not allow a indexed search against an @ */
1202                 list->dn = NULL;
1203                 list->count = 0;
1204                 return LDB_SUCCESS;
1205         }
1206         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1207                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1208                 bool valid_dn = false;
1209                 struct ldb_dn *dn
1210                         = ldb_dn_from_ldb_val(list,
1211                                               ldb_module_get_ctx(module),
1212                                               &tree->u.equality.value);
1213                 if (dn == NULL) {
1214                         /* If we can't parse it, no match */
1215                         list->dn = NULL;
1216                         list->count = 0;
1217                         return LDB_SUCCESS;
1218                 }
1219
1220                 valid_dn = ldb_dn_validate(dn);
1221                 if (valid_dn == false) {
1222                         /* If we can't parse it, no match */
1223                         list->dn = NULL;
1224                         list->count = 0;
1225                         return LDB_SUCCESS;
1226                 }
1227
1228                 /*
1229                  * Re-use the same code we use for a SCOPE_BASE
1230                  * search
1231                  *
1232                  * We can't call TALLOC_FREE(dn) as this must belong
1233                  * to list for the memory to remain valid.
1234                  */
1235                 return ldb_kv_index_dn_base_dn(
1236                     module, ldb_kv, dn, list, &truncation);
1237                 /*
1238                  * We ignore truncation here and allow multi-valued matches
1239                  * as ltdb_search_indexed will filter out the wrong one in
1240                  * ltdb_index_filter() which calls ldb_match_message().
1241                  */
1242
1243         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1244                    (ldb_attr_cmp(tree->u.equality.attr,
1245                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1246                 int ret;
1247                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1248                 list->dn = talloc_array(list, struct ldb_val, 1);
1249                 if (list->dn == NULL) {
1250                         ldb_module_oom(module);
1251                         return LDB_ERR_OPERATIONS_ERROR;
1252                 }
1253                 /*
1254                  * We need to go via the canonicalise_fn() to
1255                  * ensure we get the index in binary, rather
1256                  * than a string
1257                  */
1258                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1259                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1260                 if (ret != LDB_SUCCESS) {
1261                         return LDB_ERR_OPERATIONS_ERROR;
1262                 }
1263                 list->count = 1;
1264                 return LDB_SUCCESS;
1265         }
1266
1267         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1268 }
1269
1270
1271 /*
1272   list intersection
1273   list = list & list2
1274 */
1275 static bool list_intersect(struct ldb_context *ldb,
1276                            struct ldb_kv_private *ldb_kv,
1277                            struct dn_list *list,
1278                            const struct dn_list *list2)
1279 {
1280         const struct dn_list *short_list, *long_list;
1281         struct dn_list *list3;
1282         unsigned int i;
1283
1284         if (list->count == 0) {
1285                 /* 0 & X == 0 */
1286                 return true;
1287         }
1288         if (list2->count == 0) {
1289                 /* X & 0 == 0 */
1290                 list->count = 0;
1291                 list->dn = NULL;
1292                 return true;
1293         }
1294
1295         /*
1296          * In both of the below we check for strict and in that
1297          * case do not optimise the intersection of this list,
1298          * we must never return an entry not in this
1299          * list.  This allows the index for
1300          * SCOPE_ONELEVEL to be trusted.
1301          */
1302
1303         /* the indexing code is allowed to return a longer list than
1304            what really matches, as all results are filtered by the
1305            full expression at the end - this shortcut avoids a lot of
1306            work in some cases */
1307         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1308                 return true;
1309         }
1310         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1311                 list->count = list2->count;
1312                 list->dn = list2->dn;
1313                 /* note that list2 may not be the parent of list2->dn,
1314                    as list2->dn may be owned by ltdb->idxptr. In that
1315                    case we expect this reparent call to fail, which is
1316                    OK */
1317                 talloc_reparent(list2, list, list2->dn);
1318                 return true;
1319         }
1320
1321         if (list->count > list2->count) {
1322                 short_list = list2;
1323                 long_list = list;
1324         } else {
1325                 short_list = list;
1326                 long_list = list2;
1327         }
1328
1329         list3 = talloc_zero(list, struct dn_list);
1330         if (list3 == NULL) {
1331                 return false;
1332         }
1333
1334         list3->dn = talloc_array(list3, struct ldb_val,
1335                                  MIN(list->count, list2->count));
1336         if (!list3->dn) {
1337                 talloc_free(list3);
1338                 return false;
1339         }
1340         list3->count = 0;
1341
1342         for (i=0;i<short_list->count;i++) {
1343                 /* For the GUID index case, this is a binary search */
1344                 if (ldb_kv_dn_list_find_val(
1345                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1346                         list3->dn[list3->count] = short_list->dn[i];
1347                         list3->count++;
1348                 }
1349         }
1350
1351         list->strict |= list2->strict;
1352         list->dn = talloc_steal(list, list3->dn);
1353         list->count = list3->count;
1354         talloc_free(list3);
1355
1356         return true;
1357 }
1358
1359
1360 /*
1361   list union
1362   list = list | list2
1363 */
1364 static bool list_union(struct ldb_context *ldb,
1365                        struct ldb_kv_private *ldb_kv,
1366                        struct dn_list *list,
1367                        struct dn_list *list2)
1368 {
1369         struct ldb_val *dn3;
1370         unsigned int i = 0, j = 0, k = 0;
1371
1372         if (list2->count == 0) {
1373                 /* X | 0 == X */
1374                 return true;
1375         }
1376
1377         if (list->count == 0) {
1378                 /* 0 | X == X */
1379                 list->count = list2->count;
1380                 list->dn = list2->dn;
1381                 /* note that list2 may not be the parent of list2->dn,
1382                    as list2->dn may be owned by ltdb->idxptr. In that
1383                    case we expect this reparent call to fail, which is
1384                    OK */
1385                 talloc_reparent(list2, list, list2->dn);
1386                 return true;
1387         }
1388
1389         /*
1390          * Sort the lists (if not in GUID DN mode) so we can do
1391          * the de-duplication during the merge
1392          *
1393          * NOTE: This can sort the in-memory index values, as list or
1394          * list2 might not be a copy!
1395          */
1396         ldb_kv_dn_list_sort(ldb_kv, list);
1397         ldb_kv_dn_list_sort(ldb_kv, list2);
1398
1399         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1400         if (!dn3) {
1401                 ldb_oom(ldb);
1402                 return false;
1403         }
1404
1405         while (i < list->count || j < list2->count) {
1406                 int cmp;
1407                 if (i >= list->count) {
1408                         cmp = 1;
1409                 } else if (j >= list2->count) {
1410                         cmp = -1;
1411                 } else {
1412                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1413                                                           &list2->dn[j]);
1414                 }
1415
1416                 if (cmp < 0) {
1417                         /* Take list */
1418                         dn3[k] = list->dn[i];
1419                         i++;
1420                         k++;
1421                 } else if (cmp > 0) {
1422                         /* Take list2 */
1423                         dn3[k] = list2->dn[j];
1424                         j++;
1425                         k++;
1426                 } else {
1427                         /* Equal, take list */
1428                         dn3[k] = list->dn[i];
1429                         i++;
1430                         j++;
1431                         k++;
1432                 }
1433         }
1434
1435         list->dn = dn3;
1436         list->count = k;
1437
1438         return true;
1439 }
1440
1441 static int ldb_kv_index_dn(struct ldb_module *module,
1442                            struct ldb_kv_private *ldb_kv,
1443                            const struct ldb_parse_tree *tree,
1444                            struct dn_list *list);
1445
1446 /*
1447   process an OR list (a union)
1448  */
1449 static int ldb_kv_index_dn_or(struct ldb_module *module,
1450                               struct ldb_kv_private *ldb_kv,
1451                               const struct ldb_parse_tree *tree,
1452                               struct dn_list *list)
1453 {
1454         struct ldb_context *ldb;
1455         unsigned int i;
1456
1457         ldb = ldb_module_get_ctx(module);
1458
1459         list->dn = NULL;
1460         list->count = 0;
1461
1462         for (i=0; i<tree->u.list.num_elements; i++) {
1463                 struct dn_list *list2;
1464                 int ret;
1465
1466                 list2 = talloc_zero(list, struct dn_list);
1467                 if (list2 == NULL) {
1468                         return LDB_ERR_OPERATIONS_ERROR;
1469                 }
1470
1471                 ret = ldb_kv_index_dn(
1472                     module, ldb_kv, tree->u.list.elements[i], list2);
1473
1474                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1475                         /* X || 0 == X */
1476                         talloc_free(list2);
1477                         continue;
1478                 }
1479
1480                 if (ret != LDB_SUCCESS) {
1481                         /* X || * == * */
1482                         talloc_free(list2);
1483                         return ret;
1484                 }
1485
1486                 if (!list_union(ldb, ldb_kv, list, list2)) {
1487                         talloc_free(list2);
1488                         return LDB_ERR_OPERATIONS_ERROR;
1489                 }
1490         }
1491
1492         if (list->count == 0) {
1493                 return LDB_ERR_NO_SUCH_OBJECT;
1494         }
1495
1496         return LDB_SUCCESS;
1497 }
1498
1499
1500 /*
1501   NOT an index results
1502  */
1503 static int ldb_kv_index_dn_not(struct ldb_module *module,
1504                                struct ldb_kv_private *ldb_kv,
1505                                const struct ldb_parse_tree *tree,
1506                                struct dn_list *list)
1507 {
1508         /* the only way to do an indexed not would be if we could
1509            negate the not via another not or if we knew the total
1510            number of database elements so we could know that the
1511            existing expression covered the whole database.
1512
1513            instead, we just give up, and rely on a full index scan
1514            (unless an outer & manages to reduce the list)
1515         */
1516         return LDB_ERR_OPERATIONS_ERROR;
1517 }
1518
1519 /*
1520  * These things are unique, so avoid a full scan if this is a search
1521  * by GUID, DN or a unique attribute
1522  */
1523 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1524                                 struct ldb_kv_private *ldb_kv,
1525                                 const char *attr)
1526 {
1527         const struct ldb_schema_attribute *a;
1528         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1529                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1530                     0) {
1531                         return true;
1532                 }
1533         }
1534         if (ldb_attr_dn(attr) == 0) {
1535                 return true;
1536         }
1537
1538         a = ldb_schema_attribute_by_name(ldb, attr);
1539         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1540                 return true;
1541         }
1542         return false;
1543 }
1544
1545 /*
1546   process an AND expression (intersection)
1547  */
1548 static int ldb_kv_index_dn_and(struct ldb_module *module,
1549                                struct ldb_kv_private *ldb_kv,
1550                                const struct ldb_parse_tree *tree,
1551                                struct dn_list *list)
1552 {
1553         struct ldb_context *ldb;
1554         unsigned int i;
1555         bool found;
1556
1557         ldb = ldb_module_get_ctx(module);
1558
1559         list->dn = NULL;
1560         list->count = 0;
1561
1562         /* in the first pass we only look for unique simple
1563            equality tests, in the hope of avoiding having to look
1564            at any others */
1565         for (i=0; i<tree->u.list.num_elements; i++) {
1566                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1567                 int ret;
1568
1569                 if (subtree->operation != LDB_OP_EQUALITY ||
1570                     !ldb_kv_index_unique(
1571                         ldb, ldb_kv, subtree->u.equality.attr)) {
1572                         continue;
1573                 }
1574
1575                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1576                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1577                         /* 0 && X == 0 */
1578                         return LDB_ERR_NO_SUCH_OBJECT;
1579                 }
1580                 if (ret == LDB_SUCCESS) {
1581                         /* a unique index match means we can
1582                          * stop. Note that we don't care if we return
1583                          * a few too many objects, due to later
1584                          * filtering */
1585                         return LDB_SUCCESS;
1586                 }
1587         }
1588
1589         /* now do a full intersection */
1590         found = false;
1591
1592         for (i=0; i<tree->u.list.num_elements; i++) {
1593                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1594                 struct dn_list *list2;
1595                 int ret;
1596
1597                 list2 = talloc_zero(list, struct dn_list);
1598                 if (list2 == NULL) {
1599                         return ldb_module_oom(module);
1600                 }
1601
1602                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1603
1604                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1605                         /* X && 0 == 0 */
1606                         list->dn = NULL;
1607                         list->count = 0;
1608                         talloc_free(list2);
1609                         return LDB_ERR_NO_SUCH_OBJECT;
1610                 }
1611
1612                 if (ret != LDB_SUCCESS) {
1613                         /* this didn't adding anything */
1614                         talloc_free(list2);
1615                         continue;
1616                 }
1617
1618                 if (!found) {
1619                         talloc_reparent(list2, list, list->dn);
1620                         list->dn = list2->dn;
1621                         list->count = list2->count;
1622                         found = true;
1623                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1624                         talloc_free(list2);
1625                         return LDB_ERR_OPERATIONS_ERROR;
1626                 }
1627
1628                 if (list->count == 0) {
1629                         list->dn = NULL;
1630                         return LDB_ERR_NO_SUCH_OBJECT;
1631                 }
1632
1633                 if (list->count < 2) {
1634                         /* it isn't worth loading the next part of the tree */
1635                         return LDB_SUCCESS;
1636                 }
1637         }
1638
1639         if (!found) {
1640                 /* none of the attributes were indexed */
1641                 return LDB_ERR_OPERATIONS_ERROR;
1642         }
1643
1644         return LDB_SUCCESS;
1645 }
1646
1647 struct ldb_kv_ordered_index_context {
1648         struct ldb_module *module;
1649         int error;
1650         struct dn_list *dn_list;
1651 };
1652
1653 static int traverse_range_index(struct ldb_kv_private *ldb_kv,
1654                                 struct ldb_val key,
1655                                 struct ldb_val data,
1656                                 void *state)
1657 {
1658
1659         struct ldb_context *ldb;
1660         struct ldb_kv_ordered_index_context *ctx =
1661             (struct ldb_kv_ordered_index_context *)state;
1662         struct ldb_module *module = ctx->module;
1663         struct ldb_message_element *el = NULL;
1664         struct ldb_message *msg = NULL;
1665         int version;
1666         size_t dn_array_size, additional_length;
1667         unsigned int i;
1668
1669         ldb = ldb_module_get_ctx(module);
1670
1671         msg = ldb_msg_new(module);
1672
1673         ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1674                                            LDB_UNPACK_DATA_FLAG_NO_DN);
1675
1676         if (ctx->error != LDB_SUCCESS) {
1677                 talloc_free(msg);
1678                 return ctx->error;
1679         }
1680
1681         el = ldb_msg_find_element(msg, LDB_KV_IDX);
1682         if (!el) {
1683                 talloc_free(msg);
1684                 return LDB_SUCCESS;
1685         }
1686
1687         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1688
1689         /*
1690          * we avoid copying the strings by stealing the list.  We have
1691          * to steal msg onto el->values (which looks odd) because
1692          * the memory is allocated on msg, not on each value.
1693          */
1694         if (version != LDB_KV_GUID_INDEXING_VERSION) {
1695                 /* This is quite likely during the DB startup
1696                    on first upgrade to using a GUID index */
1697                 ldb_debug_set(ldb_module_get_ctx(module),
1698                               LDB_DEBUG_ERROR, __location__
1699                               ": Wrong GUID index version %d expected %d",
1700                               version, LDB_KV_GUID_INDEXING_VERSION);
1701                 talloc_free(msg);
1702                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1703                 return ctx->error;
1704         }
1705
1706         if (el->num_values == 0) {
1707                 talloc_free(msg);
1708                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1709                 return ctx->error;
1710         }
1711
1712         if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1713             || el->values[0].length == 0) {
1714                 talloc_free(msg);
1715                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1716                 return ctx->error;
1717         }
1718
1719         dn_array_size = talloc_array_length(ctx->dn_list->dn);
1720
1721         additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1722
1723         if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1724                 talloc_free(msg);
1725                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1726                 return ctx->error;
1727         }
1728
1729         if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1730                 size_t new_array_length;
1731
1732                 if (dn_array_size * 2 < dn_array_size) {
1733                         talloc_free(msg);
1734                         ctx->error = LDB_ERR_OPERATIONS_ERROR;
1735                         return ctx->error;
1736                 }
1737
1738                 new_array_length = MAX(ctx->dn_list->count + additional_length,
1739                                        dn_array_size * 2);
1740
1741                 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1742                                                   ctx->dn_list->dn,
1743                                                   struct ldb_val,
1744                                                   new_array_length);
1745         }
1746
1747         if (ctx->dn_list->dn == NULL) {
1748                 talloc_free(msg);
1749                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1750                 return ctx->error;
1751         }
1752
1753         /*
1754          * The actual data is on msg.
1755          */
1756         talloc_steal(ctx->dn_list->dn, msg);
1757         for (i = 0; i < additional_length; i++) {
1758                 ctx->dn_list->dn[i + ctx->dn_list->count].data
1759                         = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1760                 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1761
1762         }
1763
1764         ctx->dn_list->count += additional_length;
1765
1766         talloc_free(msg->elements);
1767
1768         return LDB_SUCCESS;
1769 }
1770
1771 /*
1772  * >= and <= indexing implemented using lexicographically sorted keys
1773  *
1774  * We only run this in GUID indexing mode and when there is no write
1775  * transaction (only implicit read locks are being held). Otherwise, we would
1776  * have to deal with the in-memory index cache.
1777  *
1778  * We rely on the implementation of index_format_fn on a schema syntax which
1779  * will can help us to construct keys which can be ordered correctly, and we
1780  * terminate using schema agnostic start and end keys.
1781  *
1782  * index_format_fn must output values which can be memcmp-able to produce the
1783  * correct ordering as defined by the schema syntax class.
1784  */
1785 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1786                                    struct ldb_kv_private *ldb_kv,
1787                                    const struct ldb_parse_tree *tree,
1788                                    struct dn_list *list, bool ascending)
1789 {
1790         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1791         struct ldb_context *ldb = ldb_module_get_ctx(module);
1792
1793         struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1794         struct ldb_val start_key, end_key;
1795         struct ldb_dn *key_dn = NULL;
1796         const struct ldb_schema_attribute *a = NULL;
1797
1798         struct ldb_kv_ordered_index_context ctx;
1799         int ret;
1800
1801         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1802
1803         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1804                 return LDB_ERR_OPERATIONS_ERROR;
1805         }
1806
1807         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1808                 return LDB_ERR_OPERATIONS_ERROR;
1809         }
1810
1811         /* bail out if we're in a transaction, full search instead. */
1812         if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1813                 return LDB_ERR_OPERATIONS_ERROR;
1814         }
1815
1816         if (ldb_kv->disallow_dn_filter &&
1817             (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1818                 /* in AD mode we do not support "(dn=...)" search filters */
1819                 list->dn = NULL;
1820                 list->count = 0;
1821                 return LDB_SUCCESS;
1822         }
1823         if (tree->u.comparison.attr[0] == '@') {
1824                 /* Do not allow a indexed search against an @ */
1825                 list->dn = NULL;
1826                 list->count = 0;
1827                 return LDB_SUCCESS;
1828         }
1829
1830         a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1831
1832         /*
1833          * If there's no index format function defined for this attr, then
1834          * the lexicographic order in the database doesn't correspond to the
1835          * attr's ordering, so we can't use the iterate_range op.
1836          */
1837         if (a->syntax->index_format_fn == NULL) {
1838                 return LDB_ERR_OPERATIONS_ERROR;
1839         }
1840
1841         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1842                                   &tree->u.comparison.value,
1843                                   NULL, &truncation);
1844         if (!key_dn) {
1845                 return LDB_ERR_OPERATIONS_ERROR;
1846         } else if (truncation == KEY_TRUNCATED) {
1847                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1848                           __location__
1849                           ": ordered index violation: key dn truncated: %s\n",
1850                           ldb_dn_get_linearized(key_dn));
1851                 return LDB_ERR_OPERATIONS_ERROR;
1852         }
1853         ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1854         talloc_free(key_dn);
1855         if (ldb_key.data == NULL) {
1856                 return LDB_ERR_OPERATIONS_ERROR;
1857         }
1858
1859         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1860                                   NULL, NULL, &truncation);
1861         if (!key_dn) {
1862                 return LDB_ERR_OPERATIONS_ERROR;
1863         } else if (truncation == KEY_TRUNCATED) {
1864                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1865                           __location__
1866                           ": ordered index violation: key dn truncated: %s\n",
1867                           ldb_dn_get_linearized(key_dn));
1868                 return LDB_ERR_OPERATIONS_ERROR;
1869         }
1870
1871         ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1872         talloc_free(key_dn);
1873         if (ldb_key2.data == NULL) {
1874                 return LDB_ERR_OPERATIONS_ERROR;
1875         }
1876
1877         /*
1878          * In order to avoid defining a start and end key for the search, we
1879          * notice that each index key is of the form:
1880          *
1881          *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1882          *
1883          * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1884          * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1885          * particular attribute.
1886          *
1887          * Our LMDB backend uses the default memcmp for key comparison.
1888          */
1889
1890         /* Eliminate NUL byte at the end of the empty key */
1891         ldb_key2.length--;
1892
1893         if (ascending) {
1894                 /* : becomes ; for pseudo end-key */
1895                 ldb_key2.data[ldb_key2.length-1]++;
1896                 start_key = ldb_key;
1897                 end_key = ldb_key2;
1898         } else {
1899                 start_key = ldb_key2;
1900                 end_key = ldb_key;
1901         }
1902
1903         ctx.module = module;
1904         ctx.error = 0;
1905         ctx.dn_list = list;
1906         ctx.dn_list->count = 0;
1907         ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
1908
1909         ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
1910                                             traverse_range_index, &ctx);
1911
1912         if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
1913                 return LDB_ERR_OPERATIONS_ERROR;
1914         }
1915
1916         TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
1917                        ldb_val_equal_exact_for_qsort);
1918
1919         talloc_free(tmp_ctx);
1920
1921         return LDB_SUCCESS;
1922 }
1923
1924 static int ldb_kv_index_dn_greater(struct ldb_module *module,
1925                                    struct ldb_kv_private *ldb_kv,
1926                                    const struct ldb_parse_tree *tree,
1927                                    struct dn_list *list)
1928 {
1929         return ldb_kv_index_dn_ordered(module,
1930                                        ldb_kv,
1931                                        tree,
1932                                        list, true);
1933 }
1934
1935 static int ldb_kv_index_dn_less(struct ldb_module *module,
1936                                    struct ldb_kv_private *ldb_kv,
1937                                    const struct ldb_parse_tree *tree,
1938                                    struct dn_list *list)
1939 {
1940         return ldb_kv_index_dn_ordered(module,
1941                                        ldb_kv,
1942                                        tree,
1943                                        list, false);
1944 }
1945
1946 /*
1947   return a list of matching objects using a one-level index
1948  */
1949 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1950                                 struct ldb_kv_private *ldb_kv,
1951                                 const char *attr,
1952                                 struct ldb_dn *dn,
1953                                 struct dn_list *list,
1954                                 enum key_truncation *truncation)
1955 {
1956         struct ldb_context *ldb;
1957         struct ldb_dn *key;
1958         struct ldb_val val;
1959         int ret;
1960
1961         ldb = ldb_module_get_ctx(module);
1962
1963         /* work out the index key from the parent DN */
1964         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1965         if (val.data == NULL) {
1966                 const char *dn_str = ldb_dn_get_linearized(dn);
1967                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1968                                        __location__
1969                                        ": Failed to get casefold DN "
1970                                        "from: %s",
1971                                        dn_str);
1972                 return LDB_ERR_OPERATIONS_ERROR;
1973         }
1974         val.length = strlen((char *)val.data);
1975         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1976         if (!key) {
1977                 ldb_oom(ldb);
1978                 return LDB_ERR_OPERATIONS_ERROR;
1979         }
1980
1981         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1982         talloc_free(key);
1983         if (ret != LDB_SUCCESS) {
1984                 return ret;
1985         }
1986
1987         if (list->count == 0) {
1988                 return LDB_ERR_NO_SUCH_OBJECT;
1989         }
1990
1991         return LDB_SUCCESS;
1992 }
1993
1994 /*
1995   return a list of matching objects using a one-level index
1996  */
1997 static int ldb_kv_index_dn_one(struct ldb_module *module,
1998                                struct ldb_kv_private *ldb_kv,
1999                                struct ldb_dn *parent_dn,
2000                                struct dn_list *list,
2001                                enum key_truncation *truncation)
2002 {
2003         /*
2004          * Ensure we do not shortcut on intersection for this list.
2005          * We must never be lazy and return an entry not in this
2006          * list.  This allows the index for
2007          * SCOPE_ONELEVEL to be trusted.
2008          */
2009
2010         list->strict = true;
2011         return ldb_kv_index_dn_attr(
2012             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2013 }
2014
2015 /*
2016   return a list of matching objects using the DN index
2017  */
2018 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2019                                    struct ldb_kv_private *ldb_kv,
2020                                    struct ldb_dn *base_dn,
2021                                    struct dn_list *dn_list,
2022                                    enum key_truncation *truncation)
2023 {
2024         const struct ldb_val *guid_val = NULL;
2025         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2026                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2027                 if (dn_list->dn == NULL) {
2028                         return ldb_module_oom(module);
2029                 }
2030                 dn_list->dn[0].data = discard_const_p(unsigned char,
2031                                                       ldb_dn_get_linearized(base_dn));
2032                 if (dn_list->dn[0].data == NULL) {
2033                         talloc_free(dn_list->dn);
2034                         return ldb_module_oom(module);
2035                 }
2036                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2037                 dn_list->count = 1;
2038
2039                 return LDB_SUCCESS;
2040         }
2041
2042         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2043                 guid_val = ldb_dn_get_extended_component(
2044                     base_dn, ldb_kv->cache->GUID_index_dn_component);
2045         }
2046
2047         if (guid_val != NULL) {
2048                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2049                 if (dn_list->dn == NULL) {
2050                         return ldb_module_oom(module);
2051                 }
2052                 dn_list->dn[0].data = guid_val->data;
2053                 dn_list->dn[0].length = guid_val->length;
2054                 dn_list->count = 1;
2055
2056                 return LDB_SUCCESS;
2057         }
2058
2059         return ldb_kv_index_dn_attr(
2060             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2061 }
2062
2063 /*
2064   return a list of dn's that might match a indexed search or
2065   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2066  */
2067 static int ldb_kv_index_dn(struct ldb_module *module,
2068                            struct ldb_kv_private *ldb_kv,
2069                            const struct ldb_parse_tree *tree,
2070                            struct dn_list *list)
2071 {
2072         int ret = LDB_ERR_OPERATIONS_ERROR;
2073
2074         switch (tree->operation) {
2075         case LDB_OP_AND:
2076                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2077                 break;
2078
2079         case LDB_OP_OR:
2080                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2081                 break;
2082
2083         case LDB_OP_NOT:
2084                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2085                 break;
2086
2087         case LDB_OP_EQUALITY:
2088                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2089                 break;
2090
2091         case LDB_OP_GREATER:
2092                 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2093                 break;
2094
2095         case LDB_OP_LESS:
2096                 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2097                 break;
2098
2099         case LDB_OP_SUBSTRING:
2100         case LDB_OP_PRESENT:
2101         case LDB_OP_APPROX:
2102         case LDB_OP_EXTENDED:
2103                 /* we can't index with fancy bitops yet */
2104                 ret = LDB_ERR_OPERATIONS_ERROR;
2105                 break;
2106         }
2107
2108         return ret;
2109 }
2110
2111 /*
2112   filter a candidate dn_list from an indexed search into a set of results
2113   extracting just the given attributes
2114 */
2115 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2116                                const struct dn_list *dn_list,
2117                                struct ldb_kv_context *ac,
2118                                uint32_t *match_count,
2119                                enum key_truncation scope_one_truncation)
2120 {
2121         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2122         struct ldb_message *msg;
2123         struct ldb_message *filtered_msg;
2124         unsigned int i;
2125         unsigned int num_keys = 0;
2126         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2127         struct ldb_val *keys = NULL;
2128
2129         /*
2130          * We have to allocate the key list (rather than just walk the
2131          * caller supplied list) as the callback could change the list
2132          * (by modifying an indexed attribute hosted in the in-memory
2133          * index cache!)
2134          */
2135         keys = talloc_array(ac, struct ldb_val, dn_list->count);
2136         if (keys == NULL) {
2137                 return ldb_module_oom(ac->module);
2138         }
2139
2140         if (ldb_kv->cache->GUID_index_attribute != NULL) {
2141                 /*
2142                  * We speculate that the keys will be GUID based and so
2143                  * pre-fill in enough space for a GUID (avoiding a pile of
2144                  * small allocations)
2145                  */
2146                 struct guid_tdb_key {
2147                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2148                 } *key_values = NULL;
2149
2150                 key_values = talloc_array(keys,
2151                                           struct guid_tdb_key,
2152                                           dn_list->count);
2153
2154                 if (key_values == NULL) {
2155                         talloc_free(keys);
2156                         return ldb_module_oom(ac->module);
2157                 }
2158                 for (i = 0; i < dn_list->count; i++) {
2159                         keys[i].data = key_values[i].guid_key;
2160                         keys[i].length = sizeof(key_values[i].guid_key);
2161                 }
2162         } else {
2163                 for (i = 0; i < dn_list->count; i++) {
2164                         keys[i].data = NULL;
2165                         keys[i].length = 0;
2166                 }
2167         }
2168
2169         for (i = 0; i < dn_list->count; i++) {
2170                 int ret;
2171
2172                 ret = ldb_kv_idx_to_key(
2173                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2174                 if (ret != LDB_SUCCESS) {
2175                         talloc_free(keys);
2176                         return ret;
2177                 }
2178
2179                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2180                         /*
2181                          * If we are in GUID index mode, then the dn_list is
2182                          * sorted.  If we got a duplicate, forget about it, as
2183                          * otherwise we would send the same entry back more
2184                          * than once.
2185                          *
2186                          * This is needed in the truncated DN case, or if a
2187                          * duplicate was forced in via
2188                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2189                          */
2190
2191                         if (memcmp(previous_guid_key,
2192                                    keys[num_keys].data,
2193                                    sizeof(previous_guid_key)) == 0) {
2194                                 continue;
2195                         }
2196
2197                         memcpy(previous_guid_key,
2198                                keys[num_keys].data,
2199                                sizeof(previous_guid_key));
2200                 }
2201                 num_keys++;
2202         }
2203
2204
2205         /*
2206          * Now that the list is a safe copy, send the callbacks
2207          */
2208         for (i = 0; i < num_keys; i++) {
2209                 int ret;
2210                 bool matched;
2211                 msg = ldb_msg_new(ac);
2212                 if (!msg) {
2213                         talloc_free(keys);
2214                         return LDB_ERR_OPERATIONS_ERROR;
2215                 }
2216
2217                 ret =
2218                     ldb_kv_search_key(ac->module,
2219                                       ldb_kv,
2220                                       keys[i],
2221                                       msg,
2222                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2223                                       /*
2224                                        * The entry point ldb_kv_search_indexed is
2225                                        * only called from the read-locked
2226                                        * ldb_kv_search.
2227                                        */
2228                                       LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2229                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2230                         /*
2231                          * the record has disappeared? yes, this can
2232                          * happen if the entry is deleted by something
2233                          * operating in the callback (not another
2234                          * process, as we have a read lock)
2235                          */
2236                         talloc_free(msg);
2237                         continue;
2238                 }
2239
2240                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2241                         /* an internal error */
2242                         talloc_free(keys);
2243                         talloc_free(msg);
2244                         return LDB_ERR_OPERATIONS_ERROR;
2245                 }
2246
2247                 /*
2248                  * We trust the index for LDB_SCOPE_ONELEVEL
2249                  * unless the index key has been truncated.
2250                  *
2251                  * LDB_SCOPE_BASE is not passed in by our only caller.
2252                  */
2253                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2254                     ldb_kv->cache->one_level_indexes &&
2255                     scope_one_truncation == KEY_NOT_TRUNCATED) {
2256                         ret = ldb_match_message(ldb, msg, ac->tree,
2257                                                 ac->scope, &matched);
2258                 } else {
2259                         ret = ldb_match_msg_error(ldb, msg,
2260                                                   ac->tree, ac->base,
2261                                                   ac->scope, &matched);
2262                 }
2263
2264                 if (ret != LDB_SUCCESS) {
2265                         talloc_free(keys);
2266                         talloc_free(msg);
2267                         return ret;
2268                 }
2269                 if (!matched) {
2270                         talloc_free(msg);
2271                         continue;
2272                 }
2273
2274                 filtered_msg = ldb_msg_new(ac);
2275                 if (filtered_msg == NULL) {
2276                         TALLOC_FREE(keys);
2277                         TALLOC_FREE(msg);
2278                         return LDB_ERR_OPERATIONS_ERROR;
2279                 }
2280
2281                 filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2282
2283                 /* filter the attributes that the user wants */
2284                 ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2285
2286                 talloc_free(msg);
2287
2288                 if (ret == -1) {
2289                         TALLOC_FREE(filtered_msg);
2290                         talloc_free(keys);
2291                         return LDB_ERR_OPERATIONS_ERROR;
2292                 }
2293
2294                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2295                 if (ret != LDB_SUCCESS) {
2296                         /* Regardless of success or failure, the msg
2297                          * is the callbacks responsiblity, and should
2298                          * not be talloc_free()'ed */
2299                         ac->request_terminated = true;
2300                         talloc_free(keys);
2301                         return ret;
2302                 }
2303
2304                 (*match_count)++;
2305         }
2306
2307         TALLOC_FREE(keys);
2308         return LDB_SUCCESS;
2309 }
2310
2311 /*
2312   sort a DN list
2313  */
2314 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2315                                 struct dn_list *list)
2316 {
2317         if (list->count < 2) {
2318                 return;
2319         }
2320
2321         /* We know the list is sorted when using the GUID index */
2322         if (ltdb->cache->GUID_index_attribute != NULL) {
2323                 return;
2324         }
2325
2326         TYPESAFE_QSORT(list->dn, list->count,
2327                        ldb_val_equal_exact_for_qsort);
2328 }
2329
2330 /*
2331   search the database with a LDAP-like expression using indexes
2332   returns -1 if an indexed search is not possible, in which
2333   case the caller should call ltdb_search_full()
2334 */
2335 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2336 {
2337         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2338         struct ldb_kv_private *ldb_kv = talloc_get_type(
2339             ldb_module_get_private(ac->module), struct ldb_kv_private);
2340         struct dn_list *dn_list;
2341         int ret;
2342         enum ldb_scope index_scope;
2343         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2344
2345         /* see if indexing is enabled */
2346         if (!ldb_kv->cache->attribute_indexes &&
2347             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2348                 /* fallback to a full search */
2349                 return LDB_ERR_OPERATIONS_ERROR;
2350         }
2351
2352         dn_list = talloc_zero(ac, struct dn_list);
2353         if (dn_list == NULL) {
2354                 return ldb_module_oom(ac->module);
2355         }
2356
2357         /*
2358          * For the purposes of selecting the switch arm below, if we
2359          * don't have a one-level index then treat it like a subtree
2360          * search
2361          */
2362         if (ac->scope == LDB_SCOPE_ONELEVEL &&
2363             !ldb_kv->cache->one_level_indexes) {
2364                 index_scope = LDB_SCOPE_SUBTREE;
2365         } else {
2366                 index_scope = ac->scope;
2367         }
2368
2369         switch (index_scope) {
2370         case LDB_SCOPE_BASE:
2371                 /*
2372                  * The only caller will have filtered the operation out
2373                  * so we should never get here
2374                  */
2375                 return ldb_operr(ldb);
2376
2377         case LDB_SCOPE_ONELEVEL:
2378
2379                 /*
2380                  * First, load all the one-level child objects (regardless of
2381                  * whether they match the search filter or not). The database
2382                  * maintains a one-level index, so retrieving this is quick.
2383                  */
2384                 ret = ldb_kv_index_dn_one(ac->module,
2385                                           ldb_kv,
2386                                           ac->base,
2387                                           dn_list,
2388                                           &scope_one_truncation);
2389                 if (ret != LDB_SUCCESS) {
2390                         talloc_free(dn_list);
2391                         return ret;
2392                 }
2393
2394                 /*
2395                  * If we have too many children, running ldb_kv_index_filter()
2396                  * over all the child objects can be quite expensive. So next
2397                  * we do a separate indexed query using the search filter.
2398                  *
2399                  * This should be quick, but it may return objects that are not
2400                  * the direct one-level child objects we're interested in.
2401                  *
2402                  * We only do this in the GUID index mode, which is
2403                  * O(n*log(m)) otherwise the intersection below will
2404                  * be too costly at O(n*m).
2405                  *
2406                  * We don't set a heuristic for 'too many' but instead
2407                  * do it always and rely on the index lookup being
2408                  * fast enough in the small case.
2409                  */
2410                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2411                         struct dn_list *indexed_search_result
2412                                 = talloc_zero(ac, struct dn_list);
2413                         if (indexed_search_result == NULL) {
2414                                 talloc_free(dn_list);
2415                                 return ldb_module_oom(ac->module);
2416                         }
2417
2418                         if (!ldb_kv->cache->attribute_indexes) {
2419                                 talloc_free(indexed_search_result);
2420                                 talloc_free(dn_list);
2421                                 return LDB_ERR_OPERATIONS_ERROR;
2422                         }
2423
2424                         /*
2425                          * Try to do an indexed database search
2426                          */
2427                         ret = ldb_kv_index_dn(
2428                             ac->module, ldb_kv, ac->tree,
2429                             indexed_search_result);
2430
2431                         /*
2432                          * We can stop if we're sure the object doesn't exist
2433                          */
2434                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2435                                 talloc_free(indexed_search_result);
2436                                 talloc_free(dn_list);
2437                                 return LDB_ERR_NO_SUCH_OBJECT;
2438                         }
2439
2440                         /*
2441                          * Once we have a successful search result, we
2442                          * intersect it with the one-level children (dn_list).
2443                          * This should give us exactly the result we're after
2444                          * (we still need to run ldb_kv_index_filter() to
2445                          * handle potential index truncation cases).
2446                          *
2447                          * The indexed search may fail because we don't support
2448                          * indexing on that type of search operation, e.g.
2449                          * matching against '*'. In which case we fall through
2450                          * and run ldb_kv_index_filter() over all the one-level
2451                          * children (which is still better than bailing out here
2452                          * and falling back to a full DB scan).
2453                          */
2454                         if (ret == LDB_SUCCESS) {
2455                                 if (!list_intersect(ldb,
2456                                                     ldb_kv,
2457                                                     dn_list,
2458                                                     indexed_search_result)) {
2459                                         talloc_free(indexed_search_result);
2460                                         talloc_free(dn_list);
2461                                         return LDB_ERR_OPERATIONS_ERROR;
2462                                 }
2463                         }
2464                 }
2465                 break;
2466
2467         case LDB_SCOPE_SUBTREE:
2468         case LDB_SCOPE_DEFAULT:
2469                 if (!ldb_kv->cache->attribute_indexes) {
2470                         talloc_free(dn_list);
2471                         return LDB_ERR_OPERATIONS_ERROR;
2472                 }
2473                 /*
2474                  * Here we load the index for the tree.  We have no
2475                  * index for the subtree.
2476                  */
2477                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2478                 if (ret != LDB_SUCCESS) {
2479                         talloc_free(dn_list);
2480                         return ret;
2481                 }
2482                 break;
2483         }
2484
2485         /*
2486          * It is critical that this function do the re-filter even
2487          * on things found by the index as the index can over-match
2488          * in cases of truncation (as well as when it decides it is
2489          * not worth further filtering)
2490          *
2491          * If this changes, then the index code above would need to
2492          * pass up a flag to say if any index was truncated during
2493          * processing as the truncation here refers only to the
2494          * SCOPE_ONELEVEL index.
2495          */
2496         ret = ldb_kv_index_filter(
2497             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2498         talloc_free(dn_list);
2499         return ret;
2500 }
2501
2502 /**
2503  * @brief Add a DN in the index list of a given attribute name/value pair
2504  *
2505  * This function will add the DN in the index list for the index for
2506  * the given attribute name and value.
2507  *
2508  * @param[in]  module       A ldb_module structure
2509  *
2510  * @param[in]  dn           The string representation of the DN as it
2511  *                          will be stored in the index entry
2512  *
2513  * @param[in]  el           A ldb_message_element array, one of the entry
2514  *                          referred by the v_idx is the attribute name and
2515  *                          value pair which will be used to construct the
2516  *                          index name
2517  *
2518  * @param[in]  v_idx        The index of element in the el array to use
2519  *
2520  * @return                  An ldb error code
2521  */
2522 static int ldb_kv_index_add1(struct ldb_module *module,
2523                              struct ldb_kv_private *ldb_kv,
2524                              const struct ldb_message *msg,
2525                              struct ldb_message_element *el,
2526                              int v_idx)
2527 {
2528         struct ldb_context *ldb;
2529         struct ldb_dn *dn_key;
2530         int ret;
2531         const struct ldb_schema_attribute *a;
2532         struct dn_list *list;
2533         unsigned alloc_len;
2534         enum key_truncation truncation = KEY_TRUNCATED;
2535
2536
2537         ldb = ldb_module_get_ctx(module);
2538
2539         list = talloc_zero(module, struct dn_list);
2540         if (list == NULL) {
2541                 return LDB_ERR_OPERATIONS_ERROR;
2542         }
2543
2544         dn_key = ldb_kv_index_key(
2545             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2546         if (!dn_key) {
2547                 talloc_free(list);
2548                 return LDB_ERR_OPERATIONS_ERROR;
2549         }
2550         /*
2551          * Samba only maintains unique indexes on the objectSID and objectGUID
2552          * so if a unique index key exceeds the maximum length there is a
2553          * problem.
2554          */
2555         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2556                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2557                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2558
2559                 ldb_asprintf_errstring(
2560                     ldb,
2561                     __location__ ": unique index key on %s in %s, "
2562                                  "exceeds maximum key length of %u (encoded).",
2563                     el->name,
2564                     ldb_dn_get_linearized(msg->dn),
2565                     ldb_kv->max_key_length);
2566                 talloc_free(list);
2567                 return LDB_ERR_CONSTRAINT_VIOLATION;
2568         }
2569         talloc_steal(list, dn_key);
2570
2571         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2572         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2573                 talloc_free(list);
2574                 return ret;
2575         }
2576
2577         /*
2578          * Check for duplicates in the @IDXDN DN -> GUID record
2579          *
2580          * This is very normal, it just means a duplicate DN creation
2581          * was attempted, so don't set the error string or print scary
2582          * messages.
2583          */
2584         if (list->count > 0 &&
2585             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2586             truncation == KEY_NOT_TRUNCATED) {
2587
2588                 talloc_free(list);
2589                 return LDB_ERR_CONSTRAINT_VIOLATION;
2590
2591         } else if (list->count > 0
2592                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2593
2594                 /*
2595                  * At least one existing entry in the DN->GUID index, which
2596                  * arises when the DN indexes have been truncated
2597                  *
2598                  * So need to pull the DN's to check if it's really a duplicate
2599                  */
2600                 int i;
2601                 for (i=0; i < list->count; i++) {
2602                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2603                         struct ldb_val key = {
2604                                 .data = guid_key,
2605                                 .length = sizeof(guid_key)
2606                         };
2607                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2608                         struct ldb_message *rec = ldb_msg_new(ldb);
2609                         if (rec == NULL) {
2610                                 return LDB_ERR_OPERATIONS_ERROR;
2611                         }
2612
2613                         ret = ldb_kv_idx_to_key(
2614                             module, ldb_kv, ldb, &list->dn[i], &key);
2615                         if (ret != LDB_SUCCESS) {
2616                                 TALLOC_FREE(list);
2617                                 TALLOC_FREE(rec);
2618                                 return ret;
2619                         }
2620
2621                         ret =
2622                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2623                         if (key.data != guid_key) {
2624                                 TALLOC_FREE(key.data);
2625                         }
2626                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2627                                 /*
2628                                  * the record has disappeared?
2629                                  * yes, this can happen
2630                                  */
2631                                 talloc_free(rec);
2632                                 continue;
2633                         }
2634
2635                         if (ret != LDB_SUCCESS) {
2636                                 /* an internal error */
2637                                 TALLOC_FREE(rec);
2638                                 TALLOC_FREE(list);
2639                                 return LDB_ERR_OPERATIONS_ERROR;
2640                         }
2641                         /*
2642                          * The DN we are trying to add to the DB and index
2643                          * is already here, so we must deny the addition
2644                          */
2645                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2646                                 TALLOC_FREE(rec);
2647                                 TALLOC_FREE(list);
2648                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2649                         }
2650                 }
2651         }
2652
2653         /*
2654          * Check for duplicates in unique indexes
2655          *
2656          * We don't need to do a loop test like the @IDXDN case
2657          * above as we have a ban on long unique index values
2658          * at the start of this function.
2659          */
2660         if (list->count > 0 &&
2661             ((a != NULL
2662               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2663                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2664                 /*
2665                  * We do not want to print info about a possibly
2666                  * confidential DN that the conflict was with in the
2667                  * user-visible error string
2668                  */
2669
2670                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2671                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2672                                   __location__
2673                                   ": unique index violation on %s in %s, "
2674                                   "conflicts with %*.*s in %s",
2675                                   el->name, ldb_dn_get_linearized(msg->dn),
2676                                   (int)list->dn[0].length,
2677                                   (int)list->dn[0].length,
2678                                   list->dn[0].data,
2679                                   ldb_dn_get_linearized(dn_key));
2680                 } else {
2681                         /* This can't fail, gives a default at worst */
2682                         const struct ldb_schema_attribute *attr =
2683                             ldb_schema_attribute_by_name(
2684                                 ldb, ldb_kv->cache->GUID_index_attribute);
2685                         struct ldb_val v;
2686                         ret = attr->syntax->ldif_write_fn(ldb, list,
2687                                                           &list->dn[0], &v);
2688                         if (ret == LDB_SUCCESS) {
2689                                 ldb_debug(ldb,
2690                                           LDB_DEBUG_WARNING,
2691                                           __location__
2692                                           ": unique index violation on %s in "
2693                                           "%s, conflicts with %s %*.*s in %s",
2694                                           el->name,
2695                                           ldb_dn_get_linearized(msg->dn),
2696                                           ldb_kv->cache->GUID_index_attribute,
2697                                           (int)v.length,
2698                                           (int)v.length,
2699                                           v.data,
2700                                           ldb_dn_get_linearized(dn_key));
2701                         }
2702                 }
2703                 ldb_asprintf_errstring(ldb,
2704                                        __location__ ": unique index violation "
2705                                        "on %s in %s",
2706                                        el->name,
2707                                        ldb_dn_get_linearized(msg->dn));
2708                 talloc_free(list);
2709                 return LDB_ERR_CONSTRAINT_VIOLATION;
2710         }
2711
2712         /* overallocate the list a bit, to reduce the number of
2713          * realloc trigered copies */
2714         alloc_len = ((list->count+1)+7) & ~7;
2715         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2716         if (list->dn == NULL) {
2717                 talloc_free(list);
2718                 return LDB_ERR_OPERATIONS_ERROR;
2719         }
2720
2721         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2722                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2723                 list->dn[list->count].data
2724                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2725                 if (list->dn[list->count].data == NULL) {
2726                         talloc_free(list);
2727                         return LDB_ERR_OPERATIONS_ERROR;
2728                 }
2729                 list->dn[list->count].length = strlen(dn_str);
2730         } else {
2731                 const struct ldb_val *key_val;
2732                 struct ldb_val *exact = NULL, *next = NULL;
2733                 key_val = ldb_msg_find_ldb_val(
2734                     msg, ldb_kv->cache->GUID_index_attribute);
2735                 if (key_val == NULL) {
2736                         talloc_free(list);
2737                         return ldb_module_operr(module);
2738                 }
2739
2740                 if (key_val->length != LDB_KV_GUID_SIZE) {
2741                         talloc_free(list);
2742                         return ldb_module_operr(module);
2743                 }
2744
2745                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2746                                         *key_val, ldb_val_equal_exact_ordered,
2747                                         exact, next);
2748
2749                 /*
2750                  * Give a warning rather than fail, this could be a
2751                  * duplicate value in the record allowed by a caller
2752                  * forcing in the value with
2753                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2754                  */
2755                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2756                         /* This can't fail, gives a default at worst */
2757                         const struct ldb_schema_attribute *attr =
2758                             ldb_schema_attribute_by_name(
2759                                 ldb, ldb_kv->cache->GUID_index_attribute);
2760                         struct ldb_val v;
2761                         ret = attr->syntax->ldif_write_fn(ldb, list,
2762                                                           exact, &v);
2763                         if (ret == LDB_SUCCESS) {
2764                                 ldb_debug(ldb,
2765                                           LDB_DEBUG_WARNING,
2766                                           __location__
2767                                           ": duplicate attribute value in %s "
2768                                           "for index on %s, "
2769                                           "duplicate of %s %*.*s in %s",
2770                                           ldb_dn_get_linearized(msg->dn),
2771                                           el->name,
2772                                           ldb_kv->cache->GUID_index_attribute,
2773                                           (int)v.length,
2774                                           (int)v.length,
2775                                           v.data,
2776                                           ldb_dn_get_linearized(dn_key));
2777                         }
2778                 }
2779
2780                 if (next == NULL) {
2781                         next = &list->dn[list->count];
2782                 } else {
2783                         memmove(&next[1], next,
2784                                 sizeof(*next) * (list->count - (next - list->dn)));
2785                 }
2786                 *next = ldb_val_dup(list->dn, key_val);
2787                 if (next->data == NULL) {
2788                         talloc_free(list);
2789                         return ldb_module_operr(module);
2790                 }
2791         }
2792         list->count++;
2793
2794         ret = ldb_kv_dn_list_store(module, dn_key, list);
2795
2796         talloc_free(list);
2797
2798         return ret;
2799 }
2800
2801 /*
2802   add index entries for one elements in a message
2803  */
2804 static int ldb_kv_index_add_el(struct ldb_module *module,
2805                                struct ldb_kv_private *ldb_kv,
2806                                const struct ldb_message *msg,
2807                                struct ldb_message_element *el)
2808 {
2809         unsigned int i;
2810         for (i = 0; i < el->num_values; i++) {
2811                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2812                 if (ret != LDB_SUCCESS) {
2813                         return ret;
2814                 }
2815         }
2816
2817         return LDB_SUCCESS;
2818 }
2819
2820 /*
2821   add index entries for all elements in a message
2822  */
2823 static int ldb_kv_index_add_all(struct ldb_module *module,
2824                                 struct ldb_kv_private *ldb_kv,
2825                                 const struct ldb_message *msg)
2826 {
2827         struct ldb_message_element *elements = msg->elements;
2828         unsigned int i;
2829         const char *dn_str;
2830         int ret;
2831
2832         if (ldb_dn_is_special(msg->dn)) {
2833                 return LDB_SUCCESS;
2834         }
2835
2836         dn_str = ldb_dn_get_linearized(msg->dn);
2837         if (dn_str == NULL) {
2838                 return LDB_ERR_OPERATIONS_ERROR;
2839         }
2840
2841         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2842         if (ret != LDB_SUCCESS) {
2843                 return ret;
2844         }
2845
2846         if (!ldb_kv->cache->attribute_indexes) {
2847                 /* no indexed fields */
2848                 return LDB_SUCCESS;
2849         }
2850
2851         for (i = 0; i < msg->num_elements; i++) {
2852                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2853                         continue;
2854                 }
2855                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2856                 if (ret != LDB_SUCCESS) {
2857                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2858                         ldb_asprintf_errstring(ldb,
2859                                                __location__ ": Failed to re-index %s in %s - %s",
2860                                                elements[i].name, dn_str,
2861                                                ldb_errstring(ldb));
2862                         return ret;
2863                 }
2864         }
2865
2866         return LDB_SUCCESS;
2867 }
2868
2869
2870 /*
2871   insert a DN index for a message
2872 */
2873 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2874                                   struct ldb_kv_private *ldb_kv,
2875                                   const struct ldb_message *msg,
2876                                   struct ldb_dn *dn,
2877                                   const char *index,
2878                                   int add)
2879 {
2880         struct ldb_message_element el;
2881         struct ldb_val val;
2882         int ret;
2883
2884         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2885         if (val.data == NULL) {
2886                 const char *dn_str = ldb_dn_get_linearized(dn);
2887                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2888                                        __location__ ": Failed to modify %s "
2889                                                     "against %s in %s: failed "
2890                                                     "to get casefold DN",
2891                                        index,
2892                                        ldb_kv->cache->GUID_index_attribute,
2893                                        dn_str);
2894                 return LDB_ERR_OPERATIONS_ERROR;
2895         }
2896
2897         val.length = strlen((char *)val.data);
2898         el.name = index;
2899         el.values = &val;
2900         el.num_values = 1;
2901
2902         if (add) {
2903                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2904         } else { /* delete */
2905                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2906         }
2907
2908         if (ret != LDB_SUCCESS) {
2909                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2910                 const char *dn_str = ldb_dn_get_linearized(dn);
2911                 ldb_asprintf_errstring(ldb,
2912                                        __location__ ": Failed to modify %s "
2913                                                     "against %s in %s - %s",
2914                                        index,
2915                                        ldb_kv->cache->GUID_index_attribute,
2916                                        dn_str,
2917                                        ldb_errstring(ldb));
2918                 return ret;
2919         }
2920         return ret;
2921 }
2922
2923 /*
2924   insert a one level index for a message
2925 */
2926 static int ldb_kv_index_onelevel(struct ldb_module *module,
2927                                  const struct ldb_message *msg,
2928                                  int add)
2929 {
2930         struct ldb_kv_private *ldb_kv = talloc_get_type(
2931             ldb_module_get_private(module), struct ldb_kv_private);
2932         struct ldb_dn *pdn;
2933         int ret;
2934
2935         /* We index for ONE Level only if requested */
2936         if (!ldb_kv->cache->one_level_indexes) {
2937                 return LDB_SUCCESS;
2938         }
2939
2940         pdn = ldb_dn_get_parent(module, msg->dn);
2941         if (pdn == NULL) {
2942                 return LDB_ERR_OPERATIONS_ERROR;
2943         }
2944         ret =
2945             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2946
2947         talloc_free(pdn);
2948
2949         return ret;
2950 }
2951
2952 /*
2953   insert a one level index for a message
2954 */
2955 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2956                                       const struct ldb_message *msg,
2957                                       int add)
2958 {
2959         int ret;
2960         struct ldb_kv_private *ldb_kv = talloc_get_type(
2961             ldb_module_get_private(module), struct ldb_kv_private);
2962
2963         /* We index for DN only if using a GUID index */
2964         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2965                 return LDB_SUCCESS;
2966         }
2967
2968         ret = ldb_kv_modify_index_dn(
2969             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2970
2971         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2972                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2973                                        "Entry %s already exists",
2974                                        ldb_dn_get_linearized(msg->dn));
2975                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2976         }
2977         return ret;
2978 }
2979
2980 /*
2981   add the index entries for a new element in a record
2982   The caller guarantees that these element values are not yet indexed
2983 */
2984 int ldb_kv_index_add_element(struct ldb_module *module,
2985                              struct ldb_kv_private *ldb_kv,
2986                              const struct ldb_message *msg,
2987                              struct ldb_message_element *el)
2988 {
2989         if (ldb_dn_is_special(msg->dn)) {
2990                 return LDB_SUCCESS;
2991         }
2992         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2993                 return LDB_SUCCESS;
2994         }
2995         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2996 }
2997
2998 /*
2999   add the index entries for a new record
3000 */
3001 int ldb_kv_index_add_new(struct ldb_module *module,
3002                          struct ldb_kv_private *ldb_kv,
3003                          const struct ldb_message *msg)
3004 {
3005         int ret;
3006
3007         if (ldb_dn_is_special(msg->dn)) {
3008                 return LDB_SUCCESS;
3009         }
3010
3011         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3012         if (ret != LDB_SUCCESS) {
3013                 /*
3014                  * Because we can't trust the caller to be doing
3015                  * transactions properly, clean up any index for this
3016                  * entry rather than relying on a transaction
3017                  * cleanup
3018                  */
3019
3020                 ldb_kv_index_delete(module, msg);
3021                 return ret;
3022         }
3023
3024         ret = ldb_kv_index_onelevel(module, msg, 1);
3025         if (ret != LDB_SUCCESS) {
3026                 /*
3027                  * Because we can't trust the caller to be doing
3028                  * transactions properly, clean up any index for this
3029                  * entry rather than relying on a transaction
3030                  * cleanup
3031                  */
3032                 ldb_kv_index_delete(module, msg);
3033                 return ret;
3034         }
3035         return ret;
3036 }
3037
3038
3039 /*
3040   delete an index entry for one message element
3041 */
3042 int ldb_kv_index_del_value(struct ldb_module *module,
3043                            struct ldb_kv_private *ldb_kv,
3044                            const struct ldb_message *msg,
3045                            struct ldb_message_element *el,
3046                            unsigned int v_idx)
3047 {
3048         struct ldb_context *ldb;
3049         struct ldb_dn *dn_key;
3050         const char *dn_str;
3051         int ret, i;
3052         unsigned int j;
3053         struct dn_list *list;
3054         struct ldb_dn *dn = msg->dn;
3055         enum key_truncation truncation = KEY_NOT_TRUNCATED;
3056
3057         ldb = ldb_module_get_ctx(module);
3058
3059         dn_str = ldb_dn_get_linearized(dn);
3060         if (dn_str == NULL) {
3061                 return LDB_ERR_OPERATIONS_ERROR;
3062         }
3063
3064         if (dn_str[0] == '@') {
3065                 return LDB_SUCCESS;
3066         }
3067
3068         dn_key = ldb_kv_index_key(
3069             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3070         /*
3071          * We ignore key truncation in ltdb_index_add1() so
3072          * match that by ignoring it here as well
3073          *
3074          * Multiple values are legitimate and accepted
3075          */
3076         if (!dn_key) {
3077                 return LDB_ERR_OPERATIONS_ERROR;
3078         }
3079
3080         list = talloc_zero(dn_key, struct dn_list);
3081         if (list == NULL) {
3082                 talloc_free(dn_key);
3083                 return LDB_ERR_OPERATIONS_ERROR;
3084         }
3085
3086         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
3087         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3088                 /* it wasn't indexed. Did we have an earlier error? If we did then
3089                    its gone now */
3090                 talloc_free(dn_key);
3091                 return LDB_SUCCESS;
3092         }
3093
3094         if (ret != LDB_SUCCESS) {
3095                 talloc_free(dn_key);
3096                 return ret;
3097         }
3098
3099         /*
3100          * Find one of the values matching this message to remove
3101          */
3102         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3103         if (i == -1) {
3104                 /* nothing to delete */
3105                 talloc_free(dn_key);
3106                 return LDB_SUCCESS;
3107         }
3108
3109         j = (unsigned int) i;
3110         if (j != list->count - 1) {
3111                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3112         }
3113         list->count--;
3114         if (list->count == 0) {
3115                 talloc_free(list->dn);
3116                 list->dn = NULL;
3117         } else {
3118                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3119         }
3120
3121         ret = ldb_kv_dn_list_store(module, dn_key, list);
3122
3123         talloc_free(dn_key);
3124
3125         return ret;
3126 }
3127
3128 /*
3129   delete the index entries for a element
3130   return -1 on failure
3131 */
3132 int ldb_kv_index_del_element(struct ldb_module *module,
3133                              struct ldb_kv_private *ldb_kv,
3134                              const struct ldb_message *msg,
3135                              struct ldb_message_element *el)
3136 {
3137         const char *dn_str;
3138         int ret;
3139         unsigned int i;
3140
3141         if (!ldb_kv->cache->attribute_indexes) {
3142                 /* no indexed fields */
3143                 return LDB_SUCCESS;
3144         }
3145
3146         dn_str = ldb_dn_get_linearized(msg->dn);
3147         if (dn_str == NULL) {
3148                 return LDB_ERR_OPERATIONS_ERROR;
3149         }
3150
3151         if (dn_str[0] == '@') {
3152                 return LDB_SUCCESS;
3153         }
3154
3155         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3156                 return LDB_SUCCESS;
3157         }
3158         for (i = 0; i < el->num_values; i++) {
3159                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3160                 if (ret != LDB_SUCCESS) {
3161                         return ret;
3162                 }
3163         }
3164
3165         return LDB_SUCCESS;
3166 }
3167
3168 /*
3169   delete the index entries for a record
3170   return -1 on failure
3171 */
3172 int ldb_kv_index_delete(struct ldb_module *module,
3173                         const struct ldb_message *msg)
3174 {
3175         struct ldb_kv_private *ldb_kv = talloc_get_type(
3176             ldb_module_get_private(module), struct ldb_kv_private);
3177         int ret;
3178         unsigned int i;
3179
3180         if (ldb_dn_is_special(msg->dn)) {
3181                 return LDB_SUCCESS;
3182         }
3183
3184         ret = ldb_kv_index_onelevel(module, msg, 0);
3185         if (ret != LDB_SUCCESS) {
3186                 return ret;
3187         }
3188
3189         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3190         if (ret != LDB_SUCCESS) {
3191                 return ret;
3192         }
3193
3194         if (!ldb_kv->cache->attribute_indexes) {
3195                 /* no indexed fields */
3196                 return LDB_SUCCESS;
3197         }
3198
3199         for (i = 0; i < msg->num_elements; i++) {
3200                 ret = ldb_kv_index_del_element(
3201                     module, ldb_kv, msg, &msg->elements[i]);
3202                 if (ret != LDB_SUCCESS) {
3203                         return ret;
3204                 }
3205         }
3206
3207         return LDB_SUCCESS;
3208 }
3209
3210
3211 /*
3212   traversal function that deletes all @INDEX records in the in-memory
3213   TDB.
3214
3215   This does not touch the actual DB, that is done at transaction
3216   commit, which in turn greatly reduces DB churn as we will likely
3217   be able to do a direct update into the old record.
3218 */
3219 static int delete_index(struct ldb_kv_private *ldb_kv,
3220                         struct ldb_val key,
3221                         struct ldb_val data,
3222                         void *state)
3223 {
3224         struct ldb_module *module = state;
3225         const char *dnstr = "DN=" LDB_KV_INDEX ":";
3226         struct dn_list list;
3227         struct ldb_dn *dn;
3228         struct ldb_val v;
3229         int ret;
3230
3231         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3232                 return 0;
3233         }
3234         /* we need to put a empty list in the internal tdb for this
3235          * index entry */
3236         list.dn = NULL;
3237         list.count = 0;
3238
3239         /* the offset of 3 is to remove the DN= prefix. */
3240         v.data = key.data + 3;
3241         v.length = strnlen((char *)key.data, key.length) - 3;
3242
3243         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3244
3245         /*
3246          * This does not actually touch the DB quite yet, just
3247          * the in-memory index cache
3248          */
3249         ret = ldb_kv_dn_list_store(module, dn, &list);
3250         if (ret != LDB_SUCCESS) {
3251                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3252                                        "Unable to store null index for %s\n",
3253                                                 ldb_dn_get_linearized(dn));
3254                 talloc_free(dn);
3255                 return -1;
3256         }
3257         talloc_free(dn);
3258         return 0;
3259 }
3260
3261 /*
3262   traversal function that adds @INDEX records during a re index TODO wrong comment
3263 */
3264 static int re_key(struct ldb_kv_private *ldb_kv,
3265                   struct ldb_val key,
3266                   struct ldb_val val,
3267                   void *state)
3268 {
3269         struct ldb_context *ldb;
3270         struct ldb_kv_reindex_context *ctx =
3271             (struct ldb_kv_reindex_context *)state;
3272         struct ldb_module *module = ldb_kv->module;
3273         struct ldb_message *msg;
3274         int ret;
3275         struct ldb_val key2;
3276         bool is_record;
3277
3278         ldb = ldb_module_get_ctx(module);
3279
3280         is_record = ldb_kv_key_is_normal_record(key);
3281         if (is_record == false) {
3282                 return 0;
3283         }
3284
3285         msg = ldb_msg_new(module);
3286         if (msg == NULL) {
3287                 return -1;
3288         }
3289
3290         ret = ldb_unpack_data(ldb, &val, msg);
3291         if (ret != 0) {
3292                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3293                                                 ldb_dn_get_linearized(msg->dn));
3294                 ctx->error = ret;
3295                 talloc_free(msg);
3296                 return -1;
3297         }
3298
3299         if (msg->dn == NULL) {
3300                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3301                           "Refusing to re-index as GUID "
3302                           "key %*.*s with no DN\n",
3303                           (int)key.length, (int)key.length,
3304                           (char *)key.data);
3305                 talloc_free(msg);
3306                 return -1;
3307         }
3308
3309         /* check if the DN key has changed, perhaps due to the case
3310            insensitivity of an element changing, or a change from DN
3311            to GUID keys */
3312         key2 = ldb_kv_key_msg(module, msg, msg);
3313         if (key2.data == NULL) {
3314                 /* probably a corrupt record ... darn */
3315                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3316                                                 ldb_dn_get_linearized(msg->dn));
3317                 talloc_free(msg);
3318                 return 0;
3319         }
3320         if (key.length != key2.length ||
3321             (memcmp(key.data, key2.data, key.length) != 0)) {
3322                 ldb_kv->kv_ops->update_in_iterate(
3323                     ldb_kv, key, key2, val, ctx);
3324         }
3325         talloc_free(key2.data);
3326
3327         talloc_free(msg);
3328
3329         ctx->count++;
3330         if (ctx->count % 10000 == 0) {
3331                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3332                           "Reindexing: re-keyed %u records so far",
3333                           ctx->count);
3334         }
3335
3336         return 0;
3337 }
3338
3339 /*
3340   traversal function that adds @INDEX records during a re index
3341 */
3342 static int re_index(struct ldb_kv_private *ldb_kv,
3343                     struct ldb_val key,
3344                     struct ldb_val val,
3345                     void *state)
3346 {
3347         struct ldb_context *ldb;
3348         struct ldb_kv_reindex_context *ctx =
3349             (struct ldb_kv_reindex_context *)state;
3350         struct ldb_module *module = ldb_kv->module;
3351         struct ldb_message *msg;
3352         int ret;
3353         bool is_record;
3354
3355         ldb = ldb_module_get_ctx(module);
3356
3357         is_record = ldb_kv_key_is_normal_record(key);
3358         if (is_record == false) {
3359                 return 0;
3360         }
3361
3362         msg = ldb_msg_new(module);
3363         if (msg == NULL) {
3364                 return -1;
3365         }
3366
3367         ret = ldb_unpack_data(ldb, &val, msg);
3368         if (ret != 0) {
3369                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3370                                                 ldb_dn_get_linearized(msg->dn));
3371                 ctx->error = ret;
3372                 talloc_free(msg);
3373                 return -1;
3374         }
3375
3376         if (msg->dn == NULL) {
3377                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3378                           "Refusing to re-index as GUID "
3379                           "key %*.*s with no DN\n",
3380                           (int)key.length, (int)key.length,
3381                           (char *)key.data);
3382                 talloc_free(msg);
3383                 return -1;
3384         }
3385
3386         ret = ldb_kv_index_onelevel(module, msg, 1);
3387         if (ret != LDB_SUCCESS) {
3388                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3389                           "Adding special ONE LEVEL index failed (%s)!",
3390                                                 ldb_dn_get_linearized(msg->dn));
3391                 talloc_free(msg);
3392                 return -1;
3393         }
3394
3395         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3396
3397         if (ret != LDB_SUCCESS) {
3398                 ctx->error = ret;
3399                 talloc_free(msg);
3400                 return -1;
3401         }
3402
3403         talloc_free(msg);
3404
3405         ctx->count++;
3406         if (ctx->count % 10000 == 0) {
3407                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3408                           "Reindexing: re-indexed %u records so far",
3409                           ctx->count);
3410         }
3411
3412         return 0;
3413 }
3414
3415 static int re_pack(struct ldb_kv_private *ldb_kv,
3416                    struct ldb_val key,
3417                    struct ldb_val val,
3418                    void *state)
3419 {
3420         struct ldb_context *ldb;
3421         struct ldb_message *msg;
3422         struct ldb_module *module = ldb_kv->module;
3423         struct ldb_kv_repack_context *ctx =
3424             (struct ldb_kv_repack_context *)state;
3425         int ret;
3426
3427         ldb = ldb_module_get_ctx(module);
3428
3429         msg = ldb_msg_new(module);
3430         if (msg == NULL) {
3431                 return -1;
3432         }
3433
3434         ret = ldb_unpack_data(ldb, &val, msg);
3435         if (ret != 0) {
3436                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3437                           ldb_dn_get_linearized(msg->dn));
3438                 ctx->error = ret;
3439                 talloc_free(msg);
3440                 return -1;
3441         }
3442
3443         ret = ldb_kv_store(module, msg, TDB_MODIFY);
3444         if (ret != LDB_SUCCESS) {
3445                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3446                           ldb_dn_get_linearized(msg->dn));
3447                 ctx->error = ret;
3448                 talloc_free(msg);
3449                 return -1;
3450         }
3451
3452         /*
3453          * Warn the user that we're repacking the first time we see a normal
3454          * record. This means we never warn if we're repacking a database with
3455          * only @ records. This is because during database initialisation,
3456          * we might repack as initial settings are written out, and we don't
3457          * want to spam the log.
3458          */
3459         if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3460                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3461                           "Repacking database with format %#010x",
3462                           ldb_kv->pack_format_version);
3463                 ctx->normal_record_seen = true;
3464         }
3465
3466         ctx->count++;
3467         if (ctx->count % 10000 == 0) {
3468                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3469                           "Repack: re-packed %u records so far",
3470                           ctx->count);
3471         }
3472
3473         return 0;
3474 }
3475
3476 int ldb_kv_repack(struct ldb_module *module)
3477 {
3478         struct ldb_kv_private *ldb_kv = talloc_get_type(
3479             ldb_module_get_private(module), struct ldb_kv_private);
3480         struct ldb_context *ldb = ldb_module_get_ctx(module);
3481         struct ldb_kv_repack_context ctx;
3482         int ret;
3483
3484         ctx.count = 0;
3485         ctx.error = LDB_SUCCESS;
3486         ctx.normal_record_seen = false;
3487
3488         /* Iterate all database records and repack them in the new format */
3489         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3490         if (ret < 0) {
3491                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3492                           ldb_errstring(ldb));
3493                 return LDB_ERR_OPERATIONS_ERROR;
3494         }
3495
3496         if (ctx.error != LDB_SUCCESS) {
3497                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3498                           ldb_errstring(ldb));
3499                 return ctx.error;
3500         }
3501
3502         return LDB_SUCCESS;
3503 }
3504
3505 /*
3506   force a complete reindex of the database
3507 */
3508 int ldb_kv_reindex(struct ldb_module *module)
3509 {
3510         struct ldb_kv_private *ldb_kv = talloc_get_type(
3511             ldb_module_get_private(module), struct ldb_kv_private);
3512         int ret;
3513         struct ldb_kv_reindex_context ctx;
3514         size_t index_cache_size = 0;
3515
3516         /*
3517          * Only triggered after a modification, but make clear we do
3518          * not re-index a read-only DB
3519          */
3520         if (ldb_kv->read_only) {
3521                 return LDB_ERR_UNWILLING_TO_PERFORM;
3522         }
3523
3524         if (ldb_kv_cache_reload(module) != 0) {
3525                 return LDB_ERR_OPERATIONS_ERROR;
3526         }
3527
3528         /*
3529          * Ensure we read (and so remove) the entries from the real
3530          * DB, no values stored so far are any use as we want to do a
3531          * re-index
3532          */
3533         ldb_kv_index_transaction_cancel(module);
3534
3535         /*
3536          * Calculate the size of the index cache that we'll need for
3537          * the re-index
3538          */
3539         index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3540         if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3541                 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3542         }
3543
3544         ret = ldb_kv_index_transaction_start(module, index_cache_size);
3545         if (ret != LDB_SUCCESS) {
3546                 return ret;
3547         }
3548
3549         /* first traverse the database deleting any @INDEX records by
3550          * putting NULL entries in the in-memory tdb
3551          */
3552         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3553         if (ret < 0) {
3554                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3555                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3556                                        ldb_errstring(ldb));
3557                 return LDB_ERR_OPERATIONS_ERROR;
3558         }
3559
3560         ctx.error = 0;
3561         ctx.count = 0;
3562
3563         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3564         if (ret < 0) {
3565                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3566                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3567                                        ldb_errstring(ldb));
3568                 return LDB_ERR_OPERATIONS_ERROR;
3569         }
3570
3571         if (ctx.error != LDB_SUCCESS) {
3572                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3573                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3574                 return ctx.error;
3575         }
3576
3577         ctx.error = 0;
3578         ctx.count = 0;
3579
3580         /* now traverse adding any indexes for normal LDB records */
3581         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3582         if (ret < 0) {
3583                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3584                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3585                                        ldb_errstring(ldb));
3586                 return LDB_ERR_OPERATIONS_ERROR;
3587         }
3588
3589         if (ctx.error != LDB_SUCCESS) {
3590                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3591                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3592                 return ctx.error;
3593         }
3594
3595         if (ctx.count > 10000) {
3596                 ldb_debug(ldb_module_get_ctx(module),
3597                           LDB_DEBUG_WARNING,
3598                           "Reindexing: re_index successful on %s, "
3599                           "final index write-out will be in transaction commit",
3600                           ldb_kv->kv_ops->name(ldb_kv));
3601         }
3602         return LDB_SUCCESS;
3603 }