lib ldb ldb_key_value: csbuild fix unused parm data
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151 #include "lib/util/attr.h"
152
153 struct dn_list {
154         unsigned int count;
155         struct ldb_val *dn;
156         /*
157          * Do not optimise the intersection of this list,
158          * we must never return an entry not in this
159          * list.  This allows the index for
160          * SCOPE_ONELEVEL to be trusted.
161          */
162         bool strict;
163 };
164
165 struct ldb_kv_idxptr {
166         struct tdb_context *itdb;
167         int error;
168 };
169
170 enum key_truncation {
171         KEY_NOT_TRUNCATED,
172         KEY_TRUNCATED,
173 };
174
175 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
176                                       const struct ldb_message *msg,
177                                       int add);
178 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
179                                    struct ldb_kv_private *ldb_kv,
180                                    struct ldb_dn *base_dn,
181                                    struct dn_list *dn_list,
182                                    enum key_truncation *truncation);
183
184 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
185                                 struct dn_list *list);
186
187 /* we put a @IDXVERSION attribute on index entries. This
188    allows us to tell if it was written by an older version
189 */
190 #define LDB_KV_INDEXING_VERSION 2
191
192 #define LDB_KV_GUID_INDEXING_VERSION 3
193
194 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
195 {
196         if (ldb_kv->max_key_length == 0) {
197                 return UINT_MAX;
198         }
199         return ldb_kv->max_key_length;
200 }
201
202 /* enable the idxptr mode when transactions start */
203 int ldb_kv_index_transaction_start(
204         struct ldb_module *module,
205         size_t cache_size)
206 {
207         struct ldb_kv_private *ldb_kv = talloc_get_type(
208             ldb_module_get_private(module), struct ldb_kv_private);
209         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
210         if (ldb_kv->idxptr == NULL) {
211                 return ldb_oom(ldb_module_get_ctx(module));
212         }
213
214         ldb_kv->idxptr->itdb = tdb_open(
215                 NULL,
216                 cache_size,
217                 TDB_INTERNAL,
218                 O_RDWR,
219                 0);
220         if (ldb_kv->idxptr->itdb == NULL) {
221                 return LDB_ERR_OPERATIONS_ERROR;
222         }
223
224         return LDB_SUCCESS;
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
232                                          const struct ldb_val *v2)
233 {
234         if (v1->length > v2->length) {
235                 return -1;
236         }
237         if (v1->length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1->data, v2->data, v1->length);
241 }
242
243 /*
244   see if two ldb_val structures contain exactly the same data
245   return -1 or 1 for a mismatch, 0 for match
246 */
247 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
248                                        const struct ldb_val *v2)
249 {
250         if (v1.length > v2->length) {
251                 return -1;
252         }
253         if (v1.length < v2->length) {
254                 return 1;
255         }
256         return memcmp(v1.data, v2->data, v1.length);
257 }
258
259
260 /*
261   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
262   binary-safe comparison for the 'dn' returns -1 if not found
263
264   This is therefore safe when the value is a GUID in the future
265  */
266 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
267                                    const struct dn_list *list,
268                                    const struct ldb_val *v)
269 {
270         unsigned int i;
271         struct ldb_val *exact = NULL, *next = NULL;
272
273         if (ldb_kv->cache->GUID_index_attribute == NULL) {
274                 for (i=0; i<list->count; i++) {
275                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
276                                 return i;
277                         }
278                 }
279                 return -1;
280         }
281
282         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
283                                 *v, ldb_val_equal_exact_ordered,
284                                 exact, next);
285         if (exact == NULL) {
286                 return -1;
287         }
288         /* Not required, but keeps the compiler quiet */
289         if (next != NULL) {
290                 return -1;
291         }
292
293         i = exact - list->dn;
294         return i;
295 }
296
297 /*
298   find a entry in a dn_list. Uses a case sensitive comparison with the dn
299   returns -1 if not found
300  */
301 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
302                                    struct dn_list *list,
303                                    const struct ldb_message *msg)
304 {
305         struct ldb_val v;
306         const struct ldb_val *key_val;
307         if (ldb_kv->cache->GUID_index_attribute == NULL) {
308                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
309                 v.data = discard_const_p(unsigned char, dn_str);
310                 v.length = strlen(dn_str);
311         } else {
312                 key_val = ldb_msg_find_ldb_val(
313                     msg, ldb_kv->cache->GUID_index_attribute);
314                 if (key_val == NULL) {
315                         return -1;
316                 }
317                 v = *key_val;
318         }
319         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
320 }
321
322 /*
323   this is effectively a cast function, but with lots of paranoia
324   checks and also copes with CPUs that are fussy about pointer
325   alignment
326  */
327 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
328                                            TDB_DATA rec,
329                                            bool check_parent)
330 {
331         struct dn_list *list;
332         if (rec.dsize != sizeof(void *)) {
333                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
334                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
335                 return NULL;
336         }
337         /* note that we can't just use a cast here, as rec.dptr may
338            not be aligned sufficiently for a pointer. A cast would cause
339            platforms like some ARM CPUs to crash */
340         memcpy(&list, rec.dptr, sizeof(void *));
341         list = talloc_get_type(list, struct dn_list);
342         if (list == NULL) {
343                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
344                                        "Bad type '%s' for idxptr",
345                                        talloc_get_name(list));
346                 return NULL;
347         }
348         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
349                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
350                                        "Bad parent '%s' for idxptr",
351                                        talloc_get_name(talloc_parent(list->dn)));
352                 return NULL;
353         }
354         return list;
355 }
356
357 /*
358   return the @IDX list in an index entry for a dn as a
359   struct dn_list
360  */
361 static int ldb_kv_dn_list_load(struct ldb_module *module,
362                                struct ldb_kv_private *ldb_kv,
363                                struct ldb_dn *dn,
364                                struct dn_list *list)
365 {
366         struct ldb_message *msg;
367         int ret, version;
368         struct ldb_message_element *el;
369         TDB_DATA rec;
370         struct dn_list *list2;
371         TDB_DATA key;
372
373         list->dn = NULL;
374         list->count = 0;
375
376         /* see if we have any in-memory index entries */
377         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
378                 goto normal_index;
379         }
380
381         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
382         key.dsize = strlen((char *)key.dptr);
383
384         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
385         if (rec.dptr == NULL) {
386                 goto normal_index;
387         }
388
389         /* we've found an in-memory index entry */
390         list2 = ldb_kv_index_idxptr(module, rec, true);
391         if (list2 == NULL) {
392                 free(rec.dptr);
393                 return LDB_ERR_OPERATIONS_ERROR;
394         }
395         free(rec.dptr);
396
397         *list = *list2;
398         return LDB_SUCCESS;
399
400 normal_index:
401         msg = ldb_msg_new(list);
402         if (msg == NULL) {
403                 return LDB_ERR_OPERATIONS_ERROR;
404         }
405
406         ret = ldb_kv_search_dn1(module,
407                                 dn,
408                                 msg,
409                                 LDB_UNPACK_DATA_FLAG_NO_DN |
410                                 /*
411                                  * The entry point ldb_kv_search_indexed is
412                                  * only called from the read-locked
413                                  * ldb_kv_search.
414                                  */
415                                 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
416         if (ret != LDB_SUCCESS) {
417                 talloc_free(msg);
418                 return ret;
419         }
420
421         el = ldb_msg_find_element(msg, LDB_KV_IDX);
422         if (!el) {
423                 talloc_free(msg);
424                 return LDB_SUCCESS;
425         }
426
427         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
428
429         /*
430          * we avoid copying the strings by stealing the list.  We have
431          * to steal msg onto el->values (which looks odd) because
432          * the memory is allocated on msg, not on each value.
433          */
434         if (ldb_kv->cache->GUID_index_attribute == NULL) {
435                 /* check indexing version number */
436                 if (version != LDB_KV_INDEXING_VERSION) {
437                         ldb_debug_set(ldb_module_get_ctx(module),
438                                       LDB_DEBUG_ERROR,
439                                       "Wrong DN index version %d "
440                                       "expected %d for %s",
441                                       version, LDB_KV_INDEXING_VERSION,
442                                       ldb_dn_get_linearized(dn));
443                         talloc_free(msg);
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 talloc_steal(el->values, msg);
448                 list->dn = talloc_steal(list, el->values);
449                 list->count = el->num_values;
450         } else {
451                 unsigned int i;
452                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
453                         /* This is quite likely during the DB startup
454                            on first upgrade to using a GUID index */
455                         ldb_debug_set(ldb_module_get_ctx(module),
456                                       LDB_DEBUG_ERROR,
457                                       "Wrong GUID index version %d "
458                                       "expected %d for %s",
459                                       version, LDB_KV_GUID_INDEXING_VERSION,
460                                       ldb_dn_get_linearized(dn));
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 if (el->num_values == 0) {
466                         talloc_free(msg);
467                         return LDB_ERR_OPERATIONS_ERROR;
468                 }
469
470                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
471                         talloc_free(msg);
472                         return LDB_ERR_OPERATIONS_ERROR;
473                 }
474
475                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
476                 list->dn = talloc_array(list, struct ldb_val, list->count);
477                 if (list->dn == NULL) {
478                         talloc_free(msg);
479                         return LDB_ERR_OPERATIONS_ERROR;
480                 }
481
482                 /*
483                  * The actual data is on msg.
484                  */
485                 talloc_steal(list->dn, msg);
486                 for (i = 0; i < list->count; i++) {
487                         list->dn[i].data
488                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
489                         list->dn[i].length = LDB_KV_GUID_SIZE;
490                 }
491         }
492
493         /* We don't need msg->elements any more */
494         talloc_free(msg->elements);
495         return LDB_SUCCESS;
496 }
497
498 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
499                            struct ldb_kv_private *ldb_kv,
500                            TALLOC_CTX *mem_ctx,
501                            struct ldb_dn *dn,
502                            struct ldb_val *ldb_key)
503 {
504         struct ldb_context *ldb = ldb_module_get_ctx(module);
505         int ret;
506         int index = 0;
507         enum key_truncation truncation = KEY_NOT_TRUNCATED;
508         struct dn_list *list = talloc(mem_ctx, struct dn_list);
509         if (list == NULL) {
510                 ldb_oom(ldb);
511                 return LDB_ERR_OPERATIONS_ERROR;
512         }
513
514         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
515         if (ret != LDB_SUCCESS) {
516                 TALLOC_FREE(list);
517                 return ret;
518         }
519
520         if (list->count == 0) {
521                 TALLOC_FREE(list);
522                 return LDB_ERR_NO_SUCH_OBJECT;
523         }
524
525         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
526                 const char *dn_str = ldb_dn_get_linearized(dn);
527                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
528                                        __location__
529                                        ": Failed to read DN index "
530                                        "against %s for %s: too many "
531                                        "values (%u > 1)",
532                                        ldb_kv->cache->GUID_index_attribute,
533                                        dn_str,
534                                        list->count);
535                 TALLOC_FREE(list);
536                 return LDB_ERR_CONSTRAINT_VIOLATION;
537         }
538
539         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
540                 /*
541                  * DN key has been truncated, need to inspect the actual
542                  * records to locate the actual DN
543                  */
544                 unsigned int i;
545                 index = -1;
546                 for (i=0; i < list->count; i++) {
547                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
548                         struct ldb_val key = {
549                                 .data = guid_key,
550                                 .length = sizeof(guid_key)
551                         };
552                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
553                         struct ldb_message *rec = ldb_msg_new(ldb);
554                         if (rec == NULL) {
555                                 TALLOC_FREE(list);
556                                 return LDB_ERR_OPERATIONS_ERROR;
557                         }
558
559                         ret = ldb_kv_idx_to_key(
560                             module, ldb_kv, ldb, &list->dn[i], &key);
561                         if (ret != LDB_SUCCESS) {
562                                 TALLOC_FREE(list);
563                                 TALLOC_FREE(rec);
564                                 return ret;
565                         }
566
567                         ret =
568                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
569                         if (key.data != guid_key) {
570                                 TALLOC_FREE(key.data);
571                         }
572                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
573                                 /*
574                                  * the record has disappeared?
575                                  * yes, this can happen
576                                  */
577                                 TALLOC_FREE(rec);
578                                 continue;
579                         }
580
581                         if (ret != LDB_SUCCESS) {
582                                 /* an internal error */
583                                 TALLOC_FREE(rec);
584                                 TALLOC_FREE(list);
585                                 return LDB_ERR_OPERATIONS_ERROR;
586                         }
587
588                         /*
589                          * We found the actual DN that we wanted from in the
590                          * multiple values that matched the index
591                          * (due to truncation), so return that.
592                          *
593                          */
594                         if (ldb_dn_compare(dn, rec->dn) == 0) {
595                                 index = i;
596                                 TALLOC_FREE(rec);
597                                 break;
598                         }
599                 }
600
601                 /*
602                  * We matched the index but the actual DN we wanted
603                  * was not here.
604                  */
605                 if (index == -1) {
606                         TALLOC_FREE(list);
607                         return LDB_ERR_NO_SUCH_OBJECT;
608                 }
609         }
610
611         /* The ldb_key memory is allocated by the caller */
612         ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
613         TALLOC_FREE(list);
614
615         if (ret != LDB_SUCCESS) {
616                 return LDB_ERR_OPERATIONS_ERROR;
617         }
618
619         return LDB_SUCCESS;
620 }
621
622
623
624 /*
625   save a dn_list into a full @IDX style record
626  */
627 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
628                                      struct ldb_kv_private *ldb_kv,
629                                      struct ldb_dn *dn,
630                                      struct dn_list *list)
631 {
632         struct ldb_message *msg;
633         int ret;
634
635         msg = ldb_msg_new(module);
636         if (!msg) {
637                 return ldb_module_oom(module);
638         }
639
640         msg->dn = dn;
641
642         if (list->count == 0) {
643                 ret = ldb_kv_delete_noindex(module, msg);
644                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
645                         ret = LDB_SUCCESS;
646                 }
647                 talloc_free(msg);
648                 return ret;
649         }
650
651         if (ldb_kv->cache->GUID_index_attribute == NULL) {
652                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
653                                       LDB_KV_INDEXING_VERSION);
654                 if (ret != LDB_SUCCESS) {
655                         talloc_free(msg);
656                         return ldb_module_oom(module);
657                 }
658         } else {
659                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
660                                       LDB_KV_GUID_INDEXING_VERSION);
661                 if (ret != LDB_SUCCESS) {
662                         talloc_free(msg);
663                         return ldb_module_oom(module);
664                 }
665         }
666
667         if (list->count > 0) {
668                 struct ldb_message_element *el;
669
670                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
671                 if (ret != LDB_SUCCESS) {
672                         talloc_free(msg);
673                         return ldb_module_oom(module);
674                 }
675
676                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
677                         el->values = list->dn;
678                         el->num_values = list->count;
679                 } else {
680                         struct ldb_val v;
681                         unsigned int i;
682                         el->values = talloc_array(msg,
683                                                   struct ldb_val, 1);
684                         if (el->values == NULL) {
685                                 talloc_free(msg);
686                                 return ldb_module_oom(module);
687                         }
688
689                         v.data = talloc_array_size(el->values,
690                                                    list->count,
691                                                    LDB_KV_GUID_SIZE);
692                         if (v.data == NULL) {
693                                 talloc_free(msg);
694                                 return ldb_module_oom(module);
695                         }
696
697                         v.length = talloc_get_size(v.data);
698
699                         for (i = 0; i < list->count; i++) {
700                                 if (list->dn[i].length !=
701                                     LDB_KV_GUID_SIZE) {
702                                         talloc_free(msg);
703                                         return ldb_module_operr(module);
704                                 }
705                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
706                                        list->dn[i].data,
707                                        LDB_KV_GUID_SIZE);
708                         }
709                         el->values[0] = v;
710                         el->num_values = 1;
711                 }
712         }
713
714         ret = ldb_kv_store(module, msg, TDB_REPLACE);
715         talloc_free(msg);
716         return ret;
717 }
718
719 /*
720   save a dn_list into the database, in either @IDX or internal format
721  */
722 static int ldb_kv_dn_list_store(struct ldb_module *module,
723                                 struct ldb_dn *dn,
724                                 struct dn_list *list)
725 {
726         struct ldb_kv_private *ldb_kv = talloc_get_type(
727             ldb_module_get_private(module), struct ldb_kv_private);
728         TDB_DATA rec, key;
729         int ret;
730         struct dn_list *list2;
731
732         if (ldb_kv->idxptr == NULL) {
733                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
734         }
735
736         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
737         if (key.dptr == NULL) {
738                 return LDB_ERR_OPERATIONS_ERROR;
739         }
740         key.dsize = strlen((char *)key.dptr);
741
742         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
743         if (rec.dptr != NULL) {
744                 list2 = ldb_kv_index_idxptr(module, rec, false);
745                 if (list2 == NULL) {
746                         free(rec.dptr);
747                         return LDB_ERR_OPERATIONS_ERROR;
748                 }
749                 free(rec.dptr);
750                 list2->dn = talloc_steal(list2, list->dn);
751                 list2->count = list->count;
752                 return LDB_SUCCESS;
753         }
754
755         list2 = talloc(ldb_kv->idxptr, struct dn_list);
756         if (list2 == NULL) {
757                 return LDB_ERR_OPERATIONS_ERROR;
758         }
759         list2->dn = talloc_steal(list2, list->dn);
760         list2->count = list->count;
761
762         rec.dptr = (uint8_t *)&list2;
763         rec.dsize = sizeof(void *);
764
765
766         /*
767          * This is not a store into the main DB, but into an in-memory
768          * TDB, so we don't need a guard on ltdb->read_only
769          */
770         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
771         if (ret != 0) {
772                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
773         }
774         return LDB_SUCCESS;
775 }
776
777 /*
778   traverse function for storing the in-memory index entries on disk
779  */
780 static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
781                                        TDB_DATA key,
782                                        TDB_DATA data,
783                                        void *state)
784 {
785         struct ldb_module *module = state;
786         struct ldb_kv_private *ldb_kv = talloc_get_type(
787             ldb_module_get_private(module), struct ldb_kv_private);
788         struct ldb_dn *dn;
789         struct ldb_context *ldb = ldb_module_get_ctx(module);
790         struct ldb_val v;
791         struct dn_list *list;
792
793         list = ldb_kv_index_idxptr(module, data, true);
794         if (list == NULL) {
795                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
796                 return -1;
797         }
798
799         v.data = key.dptr;
800         v.length = strnlen((char *)key.dptr, key.dsize);
801
802         dn = ldb_dn_from_ldb_val(module, ldb, &v);
803         if (dn == NULL) {
804                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
805                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
806                 return -1;
807         }
808
809         ldb_kv->idxptr->error =
810             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
811         talloc_free(dn);
812         if (ldb_kv->idxptr->error != 0) {
813                 return -1;
814         }
815         return 0;
816 }
817
818 /* cleanup the idxptr mode when transaction commits */
819 int ldb_kv_index_transaction_commit(struct ldb_module *module)
820 {
821         struct ldb_kv_private *ldb_kv = talloc_get_type(
822             ldb_module_get_private(module), struct ldb_kv_private);
823         int ret;
824
825         struct ldb_context *ldb = ldb_module_get_ctx(module);
826
827         ldb_reset_err_string(ldb);
828
829         if (ldb_kv->idxptr->itdb) {
830                 tdb_traverse(
831                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
832                 tdb_close(ldb_kv->idxptr->itdb);
833         }
834
835         ret = ldb_kv->idxptr->error;
836         if (ret != LDB_SUCCESS) {
837                 if (!ldb_errstring(ldb)) {
838                         ldb_set_errstring(ldb, ldb_strerror(ret));
839                 }
840                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
841         }
842
843         talloc_free(ldb_kv->idxptr);
844         ldb_kv->idxptr = NULL;
845         return ret;
846 }
847
848 /* cleanup the idxptr mode when transaction cancels */
849 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
850 {
851         struct ldb_kv_private *ldb_kv = talloc_get_type(
852             ldb_module_get_private(module), struct ldb_kv_private);
853         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
854                 tdb_close(ldb_kv->idxptr->itdb);
855         }
856         talloc_free(ldb_kv->idxptr);
857         ldb_kv->idxptr = NULL;
858         return LDB_SUCCESS;
859 }
860
861
862 /*
863   return the dn key to be used for an index
864   the caller is responsible for freeing
865 */
866 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
867                                        struct ldb_kv_private *ldb_kv,
868                                        const char *attr,
869                                        const struct ldb_val *value,
870                                        const struct ldb_schema_attribute **ap,
871                                        enum key_truncation *truncation)
872 {
873         struct ldb_dn *ret;
874         struct ldb_val v;
875         const struct ldb_schema_attribute *a = NULL;
876         char *attr_folded = NULL;
877         const char *attr_for_dn = NULL;
878         int r;
879         bool should_b64_encode;
880
881         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
882         size_t key_len = 0;
883         size_t attr_len = 0;
884         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
885         unsigned frmt_len = 0;
886         const size_t additional_key_length = 4;
887         unsigned int num_separators = 3; /* Estimate for overflow check */
888         const size_t min_data = 1;
889         const size_t min_key_length = additional_key_length
890                 + indx_len + num_separators + min_data;
891         struct ldb_val empty;
892
893         /*
894          * Accept a NULL value as a request for a key with no value.  This is
895          * different from passing an empty value, which might be given
896          * significance by some canonicalise functions.
897          */
898         bool empty_val = value == NULL;
899         if (empty_val) {
900                 empty.length = 0;
901                 empty.data = discard_const_p(unsigned char, "");
902                 value = &empty;
903         }
904
905         if (attr[0] == '@') {
906                 attr_for_dn = attr;
907                 v = *value;
908                 if (ap != NULL) {
909                         *ap = NULL;
910                 }
911         } else {
912                 attr_folded = ldb_attr_casefold(ldb, attr);
913                 if (!attr_folded) {
914                         return NULL;
915                 }
916
917                 attr_for_dn = attr_folded;
918
919                 a = ldb_schema_attribute_by_name(ldb, attr);
920                 if (ap) {
921                         *ap = a;
922                 }
923
924                 if (empty_val) {
925                         v = *value;
926                 } else {
927                         ldb_attr_handler_t fn;
928                         if (a->syntax->index_format_fn &&
929                             ldb_kv->cache->GUID_index_attribute != NULL) {
930                                 fn = a->syntax->index_format_fn;
931                         } else {
932                                 fn = a->syntax->canonicalise_fn;
933                         }
934                         r = fn(ldb, ldb, value, &v);
935                         if (r != LDB_SUCCESS) {
936                                 const char *errstr = ldb_errstring(ldb);
937                                 /* canonicalisation can be refused. For
938                                    example, a attribute that takes wildcards
939                                    will refuse to canonicalise if the value
940                                    contains a wildcard */
941                                 ldb_asprintf_errstring(ldb,
942                                                        "Failed to create "
943                                                        "index key for "
944                                                        "attribute '%s':%s%s%s",
945                                                        attr, ldb_strerror(r),
946                                                        (errstr?":":""),
947                                                        (errstr?errstr:""));
948                                 talloc_free(attr_folded);
949                                 return NULL;
950                         }
951                 }
952         }
953         attr_len = strlen(attr_for_dn);
954
955         /*
956          * Check if there is any hope this will fit into the DB.
957          * Overflow here is not actually critical the code below
958          * checks again to make the printf and the DB does another
959          * check for too long keys
960          */
961         if (max_key_length - attr_len < min_key_length) {
962                 ldb_asprintf_errstring(
963                         ldb,
964                         __location__ ": max_key_length "
965                         "is too small (%u) < (%u)",
966                         max_key_length,
967                         (unsigned)(min_key_length + attr_len));
968                 talloc_free(attr_folded);
969                 return NULL;
970         }
971
972         /*
973          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
974          * "DN=" and a trailing string terminator
975          */
976         max_key_length -= additional_key_length;
977
978         /*
979          * We do not base 64 encode a DN in a key, it has already been
980          * casefold and lineraized, that is good enough.  That already
981          * avoids embedded NUL etc.
982          */
983         if (ldb_kv->cache->GUID_index_attribute != NULL) {
984                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
985                         should_b64_encode = false;
986                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
987                         /*
988                          * We can only change the behaviour for IDXONE
989                          * when the GUID index is enabled
990                          */
991                         should_b64_encode = false;
992                 } else {
993                         should_b64_encode
994                                 = ldb_should_b64_encode(ldb, &v);
995                 }
996         } else {
997                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
998         }
999
1000         if (should_b64_encode) {
1001                 size_t vstr_len = 0;
1002                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1003                 if (!vstr) {
1004                         talloc_free(attr_folded);
1005                         return NULL;
1006                 }
1007                 vstr_len = strlen(vstr);
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + vstr_len;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = vstr_len - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                         * Truncated keys are placed in a separate key space
1019                         * from the non truncated keys
1020                         * Note: the double hash "##" is not a typo and
1021                         * indicates that the following value is base64 encoded
1022                         */
1023                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1024                                              LDB_KV_INDEX, attr_for_dn,
1025                                              frmt_len, vstr);
1026                 } else {
1027                         frmt_len = vstr_len;
1028                         *truncation = KEY_NOT_TRUNCATED;
1029                         /*
1030                          * Note: the double colon "::" is not a typo and
1031                          * indicates that the following value is base64 encoded
1032                          */
1033                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1034                                              LDB_KV_INDEX, attr_for_dn,
1035                                              frmt_len, vstr);
1036                 }
1037                 talloc_free(vstr);
1038         } else {
1039                 /* Only need two seperators */
1040                 num_separators = 2;
1041
1042                 /*
1043                  * Overflow here is not critical as we only use this
1044                  * to choose the printf truncation
1045                  */
1046                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1047                 if (key_len > max_key_length) {
1048                         size_t excess = key_len - max_key_length;
1049                         frmt_len = v.length - excess;
1050                         *truncation = KEY_TRUNCATED;
1051                         /*
1052                          * Truncated keys are placed in a separate key space
1053                          * from the non truncated keys
1054                          */
1055                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1056                                              LDB_KV_INDEX, attr_for_dn,
1057                                              frmt_len, (char *)v.data);
1058                 } else {
1059                         frmt_len = v.length;
1060                         *truncation = KEY_NOT_TRUNCATED;
1061                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1062                                              LDB_KV_INDEX, attr_for_dn,
1063                                              frmt_len, (char *)v.data);
1064                 }
1065         }
1066
1067         if (v.data != value->data && !empty_val) {
1068                 talloc_free(v.data);
1069         }
1070         talloc_free(attr_folded);
1071
1072         return ret;
1073 }
1074
1075 /*
1076   see if a attribute value is in the list of indexed attributes
1077 */
1078 static bool ldb_kv_is_indexed(struct ldb_module *module,
1079                               struct ldb_kv_private *ldb_kv,
1080                               const char *attr)
1081 {
1082         struct ldb_context *ldb = ldb_module_get_ctx(module);
1083         unsigned int i;
1084         struct ldb_message_element *el;
1085
1086         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1087             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1088                 /* Implicity covered, this is the index key */
1089                 return false;
1090         }
1091         if (ldb->schema.index_handler_override) {
1092                 const struct ldb_schema_attribute *a
1093                         = ldb_schema_attribute_by_name(ldb, attr);
1094
1095                 if (a == NULL) {
1096                         return false;
1097                 }
1098
1099                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1100                         return true;
1101                 } else {
1102                         return false;
1103                 }
1104         }
1105
1106         if (!ldb_kv->cache->attribute_indexes) {
1107                 return false;
1108         }
1109
1110         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1111         if (el == NULL) {
1112                 return false;
1113         }
1114
1115         /* TODO: this is too expensive! At least use a binary search */
1116         for (i=0; i<el->num_values; i++) {
1117                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1118                         return true;
1119                 }
1120         }
1121         return false;
1122 }
1123
1124 /*
1125   in the following logic functions, the return value is treated as
1126   follows:
1127
1128      LDB_SUCCESS: we found some matching index values
1129
1130      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1131
1132      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1133                                we'll need a full search
1134  */
1135
1136 /*
1137   return a list of dn's that might match a simple indexed search (an
1138   equality search only)
1139  */
1140 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1141                                   struct ldb_kv_private *ldb_kv,
1142                                   const struct ldb_parse_tree *tree,
1143                                   struct dn_list *list)
1144 {
1145         struct ldb_context *ldb;
1146         struct ldb_dn *dn;
1147         int ret;
1148         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1149
1150         ldb = ldb_module_get_ctx(module);
1151
1152         list->count = 0;
1153         list->dn = NULL;
1154
1155         /* if the attribute isn't in the list of indexed attributes then
1156            this node needs a full search */
1157         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160
1161         /* the attribute is indexed. Pull the list of DNs that match the
1162            search criterion */
1163         dn = ldb_kv_index_key(ldb,
1164                               ldb_kv,
1165                               tree->u.equality.attr,
1166                               &tree->u.equality.value,
1167                               NULL,
1168                               &truncation);
1169         /*
1170          * We ignore truncation here and allow multi-valued matches
1171          * as ltdb_search_indexed will filter out the wrong one in
1172          * ltdb_index_filter() which calls ldb_match_message().
1173          */
1174         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1175
1176         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1177         talloc_free(dn);
1178         return ret;
1179 }
1180
1181 static bool list_union(struct ldb_context *ldb,
1182                        struct ldb_kv_private *ldb_kv,
1183                        struct dn_list *list,
1184                        struct dn_list *list2);
1185
1186 /*
1187   return a list of dn's that might match a leaf indexed search
1188  */
1189 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1190                                 struct ldb_kv_private *ldb_kv,
1191                                 const struct ldb_parse_tree *tree,
1192                                 struct dn_list *list)
1193 {
1194         if (ldb_kv->disallow_dn_filter &&
1195             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1196                 /* in AD mode we do not support "(dn=...)" search filters */
1197                 list->dn = NULL;
1198                 list->count = 0;
1199                 return LDB_SUCCESS;
1200         }
1201         if (tree->u.equality.attr[0] == '@') {
1202                 /* Do not allow a indexed search against an @ */
1203                 list->dn = NULL;
1204                 list->count = 0;
1205                 return LDB_SUCCESS;
1206         }
1207         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1208                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1209                 bool valid_dn = false;
1210                 struct ldb_dn *dn
1211                         = ldb_dn_from_ldb_val(list,
1212                                               ldb_module_get_ctx(module),
1213                                               &tree->u.equality.value);
1214                 if (dn == NULL) {
1215                         /* If we can't parse it, no match */
1216                         list->dn = NULL;
1217                         list->count = 0;
1218                         return LDB_SUCCESS;
1219                 }
1220
1221                 valid_dn = ldb_dn_validate(dn);
1222                 if (valid_dn == false) {
1223                         /* If we can't parse it, no match */
1224                         list->dn = NULL;
1225                         list->count = 0;
1226                         return LDB_SUCCESS;
1227                 }
1228
1229                 /*
1230                  * Re-use the same code we use for a SCOPE_BASE
1231                  * search
1232                  *
1233                  * We can't call TALLOC_FREE(dn) as this must belong
1234                  * to list for the memory to remain valid.
1235                  */
1236                 return ldb_kv_index_dn_base_dn(
1237                     module, ldb_kv, dn, list, &truncation);
1238                 /*
1239                  * We ignore truncation here and allow multi-valued matches
1240                  * as ltdb_search_indexed will filter out the wrong one in
1241                  * ltdb_index_filter() which calls ldb_match_message().
1242                  */
1243
1244         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1245                    (ldb_attr_cmp(tree->u.equality.attr,
1246                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1247                 int ret;
1248                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1249                 list->dn = talloc_array(list, struct ldb_val, 1);
1250                 if (list->dn == NULL) {
1251                         ldb_module_oom(module);
1252                         return LDB_ERR_OPERATIONS_ERROR;
1253                 }
1254                 /*
1255                  * We need to go via the canonicalise_fn() to
1256                  * ensure we get the index in binary, rather
1257                  * than a string
1258                  */
1259                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1260                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1261                 if (ret != LDB_SUCCESS) {
1262                         return LDB_ERR_OPERATIONS_ERROR;
1263                 }
1264                 list->count = 1;
1265                 return LDB_SUCCESS;
1266         }
1267
1268         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1269 }
1270
1271
1272 /*
1273   list intersection
1274   list = list & list2
1275 */
1276 static bool list_intersect(struct ldb_kv_private *ldb_kv,
1277                            struct dn_list *list,
1278                            const struct dn_list *list2)
1279 {
1280         const struct dn_list *short_list, *long_list;
1281         struct dn_list *list3;
1282         unsigned int i;
1283
1284         if (list->count == 0) {
1285                 /* 0 & X == 0 */
1286                 return true;
1287         }
1288         if (list2->count == 0) {
1289                 /* X & 0 == 0 */
1290                 list->count = 0;
1291                 list->dn = NULL;
1292                 return true;
1293         }
1294
1295         /*
1296          * In both of the below we check for strict and in that
1297          * case do not optimise the intersection of this list,
1298          * we must never return an entry not in this
1299          * list.  This allows the index for
1300          * SCOPE_ONELEVEL to be trusted.
1301          */
1302
1303         /* the indexing code is allowed to return a longer list than
1304            what really matches, as all results are filtered by the
1305            full expression at the end - this shortcut avoids a lot of
1306            work in some cases */
1307         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1308                 return true;
1309         }
1310         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1311                 list->count = list2->count;
1312                 list->dn = list2->dn;
1313                 /* note that list2 may not be the parent of list2->dn,
1314                    as list2->dn may be owned by ltdb->idxptr. In that
1315                    case we expect this reparent call to fail, which is
1316                    OK */
1317                 talloc_reparent(list2, list, list2->dn);
1318                 return true;
1319         }
1320
1321         if (list->count > list2->count) {
1322                 short_list = list2;
1323                 long_list = list;
1324         } else {
1325                 short_list = list;
1326                 long_list = list2;
1327         }
1328
1329         list3 = talloc_zero(list, struct dn_list);
1330         if (list3 == NULL) {
1331                 return false;
1332         }
1333
1334         list3->dn = talloc_array(list3, struct ldb_val,
1335                                  MIN(list->count, list2->count));
1336         if (!list3->dn) {
1337                 talloc_free(list3);
1338                 return false;
1339         }
1340         list3->count = 0;
1341
1342         for (i=0;i<short_list->count;i++) {
1343                 /* For the GUID index case, this is a binary search */
1344                 if (ldb_kv_dn_list_find_val(
1345                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1346                         list3->dn[list3->count] = short_list->dn[i];
1347                         list3->count++;
1348                 }
1349         }
1350
1351         list->strict |= list2->strict;
1352         list->dn = talloc_steal(list, list3->dn);
1353         list->count = list3->count;
1354         talloc_free(list3);
1355
1356         return true;
1357 }
1358
1359
1360 /*
1361   list union
1362   list = list | list2
1363 */
1364 static bool list_union(struct ldb_context *ldb,
1365                        struct ldb_kv_private *ldb_kv,
1366                        struct dn_list *list,
1367                        struct dn_list *list2)
1368 {
1369         struct ldb_val *dn3;
1370         unsigned int i = 0, j = 0, k = 0;
1371
1372         if (list2->count == 0) {
1373                 /* X | 0 == X */
1374                 return true;
1375         }
1376
1377         if (list->count == 0) {
1378                 /* 0 | X == X */
1379                 list->count = list2->count;
1380                 list->dn = list2->dn;
1381                 /* note that list2 may not be the parent of list2->dn,
1382                    as list2->dn may be owned by ltdb->idxptr. In that
1383                    case we expect this reparent call to fail, which is
1384                    OK */
1385                 talloc_reparent(list2, list, list2->dn);
1386                 return true;
1387         }
1388
1389         /*
1390          * Sort the lists (if not in GUID DN mode) so we can do
1391          * the de-duplication during the merge
1392          *
1393          * NOTE: This can sort the in-memory index values, as list or
1394          * list2 might not be a copy!
1395          */
1396         ldb_kv_dn_list_sort(ldb_kv, list);
1397         ldb_kv_dn_list_sort(ldb_kv, list2);
1398
1399         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1400         if (!dn3) {
1401                 ldb_oom(ldb);
1402                 return false;
1403         }
1404
1405         while (i < list->count || j < list2->count) {
1406                 int cmp;
1407                 if (i >= list->count) {
1408                         cmp = 1;
1409                 } else if (j >= list2->count) {
1410                         cmp = -1;
1411                 } else {
1412                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1413                                                           &list2->dn[j]);
1414                 }
1415
1416                 if (cmp < 0) {
1417                         /* Take list */
1418                         dn3[k] = list->dn[i];
1419                         i++;
1420                         k++;
1421                 } else if (cmp > 0) {
1422                         /* Take list2 */
1423                         dn3[k] = list2->dn[j];
1424                         j++;
1425                         k++;
1426                 } else {
1427                         /* Equal, take list */
1428                         dn3[k] = list->dn[i];
1429                         i++;
1430                         j++;
1431                         k++;
1432                 }
1433         }
1434
1435         list->dn = dn3;
1436         list->count = k;
1437
1438         return true;
1439 }
1440
1441 static int ldb_kv_index_dn(struct ldb_module *module,
1442                            struct ldb_kv_private *ldb_kv,
1443                            const struct ldb_parse_tree *tree,
1444                            struct dn_list *list);
1445
1446 /*
1447   process an OR list (a union)
1448  */
1449 static int ldb_kv_index_dn_or(struct ldb_module *module,
1450                               struct ldb_kv_private *ldb_kv,
1451                               const struct ldb_parse_tree *tree,
1452                               struct dn_list *list)
1453 {
1454         struct ldb_context *ldb;
1455         unsigned int i;
1456
1457         ldb = ldb_module_get_ctx(module);
1458
1459         list->dn = NULL;
1460         list->count = 0;
1461
1462         for (i=0; i<tree->u.list.num_elements; i++) {
1463                 struct dn_list *list2;
1464                 int ret;
1465
1466                 list2 = talloc_zero(list, struct dn_list);
1467                 if (list2 == NULL) {
1468                         return LDB_ERR_OPERATIONS_ERROR;
1469                 }
1470
1471                 ret = ldb_kv_index_dn(
1472                     module, ldb_kv, tree->u.list.elements[i], list2);
1473
1474                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1475                         /* X || 0 == X */
1476                         talloc_free(list2);
1477                         continue;
1478                 }
1479
1480                 if (ret != LDB_SUCCESS) {
1481                         /* X || * == * */
1482                         talloc_free(list2);
1483                         return ret;
1484                 }
1485
1486                 if (!list_union(ldb, ldb_kv, list, list2)) {
1487                         talloc_free(list2);
1488                         return LDB_ERR_OPERATIONS_ERROR;
1489                 }
1490         }
1491
1492         if (list->count == 0) {
1493                 return LDB_ERR_NO_SUCH_OBJECT;
1494         }
1495
1496         return LDB_SUCCESS;
1497 }
1498
1499
1500 /*
1501   NOT an index results
1502  */
1503 static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
1504                                _UNUSED_ struct ldb_kv_private *ldb_kv,
1505                                _UNUSED_ const struct ldb_parse_tree *tree,
1506                                _UNUSED_ struct dn_list *list)
1507 {
1508         /* the only way to do an indexed not would be if we could
1509            negate the not via another not or if we knew the total
1510            number of database elements so we could know that the
1511            existing expression covered the whole database.
1512
1513            instead, we just give up, and rely on a full index scan
1514            (unless an outer & manages to reduce the list)
1515         */
1516         return LDB_ERR_OPERATIONS_ERROR;
1517 }
1518
1519 /*
1520  * These things are unique, so avoid a full scan if this is a search
1521  * by GUID, DN or a unique attribute
1522  */
1523 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1524                                 struct ldb_kv_private *ldb_kv,
1525                                 const char *attr)
1526 {
1527         const struct ldb_schema_attribute *a;
1528         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1529                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1530                     0) {
1531                         return true;
1532                 }
1533         }
1534         if (ldb_attr_dn(attr) == 0) {
1535                 return true;
1536         }
1537
1538         a = ldb_schema_attribute_by_name(ldb, attr);
1539         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1540                 return true;
1541         }
1542         return false;
1543 }
1544
1545 /*
1546   process an AND expression (intersection)
1547  */
1548 static int ldb_kv_index_dn_and(struct ldb_module *module,
1549                                struct ldb_kv_private *ldb_kv,
1550                                const struct ldb_parse_tree *tree,
1551                                struct dn_list *list)
1552 {
1553         struct ldb_context *ldb;
1554         unsigned int i;
1555         bool found;
1556
1557         ldb = ldb_module_get_ctx(module);
1558
1559         list->dn = NULL;
1560         list->count = 0;
1561
1562         /* in the first pass we only look for unique simple
1563            equality tests, in the hope of avoiding having to look
1564            at any others */
1565         for (i=0; i<tree->u.list.num_elements; i++) {
1566                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1567                 int ret;
1568
1569                 if (subtree->operation != LDB_OP_EQUALITY ||
1570                     !ldb_kv_index_unique(
1571                         ldb, ldb_kv, subtree->u.equality.attr)) {
1572                         continue;
1573                 }
1574
1575                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1576                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1577                         /* 0 && X == 0 */
1578                         return LDB_ERR_NO_SUCH_OBJECT;
1579                 }
1580                 if (ret == LDB_SUCCESS) {
1581                         /* a unique index match means we can
1582                          * stop. Note that we don't care if we return
1583                          * a few too many objects, due to later
1584                          * filtering */
1585                         return LDB_SUCCESS;
1586                 }
1587         }
1588
1589         /* now do a full intersection */
1590         found = false;
1591
1592         for (i=0; i<tree->u.list.num_elements; i++) {
1593                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1594                 struct dn_list *list2;
1595                 int ret;
1596
1597                 list2 = talloc_zero(list, struct dn_list);
1598                 if (list2 == NULL) {
1599                         return ldb_module_oom(module);
1600                 }
1601
1602                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1603
1604                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1605                         /* X && 0 == 0 */
1606                         list->dn = NULL;
1607                         list->count = 0;
1608                         talloc_free(list2);
1609                         return LDB_ERR_NO_SUCH_OBJECT;
1610                 }
1611
1612                 if (ret != LDB_SUCCESS) {
1613                         /* this didn't adding anything */
1614                         talloc_free(list2);
1615                         continue;
1616                 }
1617
1618                 if (!found) {
1619                         talloc_reparent(list2, list, list->dn);
1620                         list->dn = list2->dn;
1621                         list->count = list2->count;
1622                         found = true;
1623                 } else if (!list_intersect(ldb_kv, list, list2)) {
1624                         talloc_free(list2);
1625                         return LDB_ERR_OPERATIONS_ERROR;
1626                 }
1627
1628                 if (list->count == 0) {
1629                         list->dn = NULL;
1630                         return LDB_ERR_NO_SUCH_OBJECT;
1631                 }
1632
1633                 if (list->count < 2) {
1634                         /* it isn't worth loading the next part of the tree */
1635                         return LDB_SUCCESS;
1636                 }
1637         }
1638
1639         if (!found) {
1640                 /* none of the attributes were indexed */
1641                 return LDB_ERR_OPERATIONS_ERROR;
1642         }
1643
1644         return LDB_SUCCESS;
1645 }
1646
1647 struct ldb_kv_ordered_index_context {
1648         struct ldb_module *module;
1649         int error;
1650         struct dn_list *dn_list;
1651 };
1652
1653 static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
1654                                 _UNUSED_ struct ldb_val key,
1655                                 struct ldb_val data,
1656                                 void *state)
1657 {
1658
1659         struct ldb_context *ldb;
1660         struct ldb_kv_ordered_index_context *ctx =
1661             (struct ldb_kv_ordered_index_context *)state;
1662         struct ldb_module *module = ctx->module;
1663         struct ldb_message_element *el = NULL;
1664         struct ldb_message *msg = NULL;
1665         int version;
1666         size_t dn_array_size, additional_length;
1667         unsigned int i;
1668
1669         ldb = ldb_module_get_ctx(module);
1670
1671         msg = ldb_msg_new(module);
1672
1673         ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1674                                            LDB_UNPACK_DATA_FLAG_NO_DN);
1675
1676         if (ctx->error != LDB_SUCCESS) {
1677                 talloc_free(msg);
1678                 return ctx->error;
1679         }
1680
1681         el = ldb_msg_find_element(msg, LDB_KV_IDX);
1682         if (!el) {
1683                 talloc_free(msg);
1684                 return LDB_SUCCESS;
1685         }
1686
1687         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1688
1689         /*
1690          * we avoid copying the strings by stealing the list.  We have
1691          * to steal msg onto el->values (which looks odd) because
1692          * the memory is allocated on msg, not on each value.
1693          */
1694         if (version != LDB_KV_GUID_INDEXING_VERSION) {
1695                 /* This is quite likely during the DB startup
1696                    on first upgrade to using a GUID index */
1697                 ldb_debug_set(ldb_module_get_ctx(module),
1698                               LDB_DEBUG_ERROR, __location__
1699                               ": Wrong GUID index version %d expected %d",
1700                               version, LDB_KV_GUID_INDEXING_VERSION);
1701                 talloc_free(msg);
1702                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1703                 return ctx->error;
1704         }
1705
1706         if (el->num_values == 0) {
1707                 talloc_free(msg);
1708                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1709                 return ctx->error;
1710         }
1711
1712         if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1713             || el->values[0].length == 0) {
1714                 talloc_free(msg);
1715                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1716                 return ctx->error;
1717         }
1718
1719         dn_array_size = talloc_array_length(ctx->dn_list->dn);
1720
1721         additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1722
1723         if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1724                 talloc_free(msg);
1725                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1726                 return ctx->error;
1727         }
1728
1729         if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1730                 size_t new_array_length;
1731
1732                 if (dn_array_size * 2 < dn_array_size) {
1733                         talloc_free(msg);
1734                         ctx->error = LDB_ERR_OPERATIONS_ERROR;
1735                         return ctx->error;
1736                 }
1737
1738                 new_array_length = MAX(ctx->dn_list->count + additional_length,
1739                                        dn_array_size * 2);
1740
1741                 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1742                                                   ctx->dn_list->dn,
1743                                                   struct ldb_val,
1744                                                   new_array_length);
1745         }
1746
1747         if (ctx->dn_list->dn == NULL) {
1748                 talloc_free(msg);
1749                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1750                 return ctx->error;
1751         }
1752
1753         /*
1754          * The actual data is on msg.
1755          */
1756         talloc_steal(ctx->dn_list->dn, msg);
1757         for (i = 0; i < additional_length; i++) {
1758                 ctx->dn_list->dn[i + ctx->dn_list->count].data
1759                         = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1760                 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1761
1762         }
1763
1764         ctx->dn_list->count += additional_length;
1765
1766         talloc_free(msg->elements);
1767
1768         return LDB_SUCCESS;
1769 }
1770
1771 /*
1772  * >= and <= indexing implemented using lexicographically sorted keys
1773  *
1774  * We only run this in GUID indexing mode and when there is no write
1775  * transaction (only implicit read locks are being held). Otherwise, we would
1776  * have to deal with the in-memory index cache.
1777  *
1778  * We rely on the implementation of index_format_fn on a schema syntax which
1779  * will can help us to construct keys which can be ordered correctly, and we
1780  * terminate using schema agnostic start and end keys.
1781  *
1782  * index_format_fn must output values which can be memcmp-able to produce the
1783  * correct ordering as defined by the schema syntax class.
1784  */
1785 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1786                                    struct ldb_kv_private *ldb_kv,
1787                                    const struct ldb_parse_tree *tree,
1788                                    struct dn_list *list, bool ascending)
1789 {
1790         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1791         struct ldb_context *ldb = ldb_module_get_ctx(module);
1792
1793         struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1794         struct ldb_val start_key, end_key;
1795         struct ldb_dn *key_dn = NULL;
1796         const struct ldb_schema_attribute *a = NULL;
1797
1798         struct ldb_kv_ordered_index_context ctx;
1799         int ret;
1800
1801         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1802
1803         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1804                 return LDB_ERR_OPERATIONS_ERROR;
1805         }
1806
1807         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1808                 return LDB_ERR_OPERATIONS_ERROR;
1809         }
1810
1811         /* bail out if we're in a transaction, full search instead. */
1812         if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1813                 return LDB_ERR_OPERATIONS_ERROR;
1814         }
1815
1816         if (ldb_kv->disallow_dn_filter &&
1817             (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1818                 /* in AD mode we do not support "(dn=...)" search filters */
1819                 list->dn = NULL;
1820                 list->count = 0;
1821                 return LDB_SUCCESS;
1822         }
1823         if (tree->u.comparison.attr[0] == '@') {
1824                 /* Do not allow a indexed search against an @ */
1825                 list->dn = NULL;
1826                 list->count = 0;
1827                 return LDB_SUCCESS;
1828         }
1829
1830         a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1831
1832         /*
1833          * If there's no index format function defined for this attr, then
1834          * the lexicographic order in the database doesn't correspond to the
1835          * attr's ordering, so we can't use the iterate_range op.
1836          */
1837         if (a->syntax->index_format_fn == NULL) {
1838                 return LDB_ERR_OPERATIONS_ERROR;
1839         }
1840
1841         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1842                                   &tree->u.comparison.value,
1843                                   NULL, &truncation);
1844         if (!key_dn) {
1845                 return LDB_ERR_OPERATIONS_ERROR;
1846         } else if (truncation == KEY_TRUNCATED) {
1847                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1848                           __location__
1849                           ": ordered index violation: key dn truncated: %s\n",
1850                           ldb_dn_get_linearized(key_dn));
1851                 return LDB_ERR_OPERATIONS_ERROR;
1852         }
1853         ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1854         talloc_free(key_dn);
1855         if (ldb_key.data == NULL) {
1856                 return LDB_ERR_OPERATIONS_ERROR;
1857         }
1858
1859         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1860                                   NULL, NULL, &truncation);
1861         if (!key_dn) {
1862                 return LDB_ERR_OPERATIONS_ERROR;
1863         } else if (truncation == KEY_TRUNCATED) {
1864                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1865                           __location__
1866                           ": ordered index violation: key dn truncated: %s\n",
1867                           ldb_dn_get_linearized(key_dn));
1868                 return LDB_ERR_OPERATIONS_ERROR;
1869         }
1870
1871         ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1872         talloc_free(key_dn);
1873         if (ldb_key2.data == NULL) {
1874                 return LDB_ERR_OPERATIONS_ERROR;
1875         }
1876
1877         /*
1878          * In order to avoid defining a start and end key for the search, we
1879          * notice that each index key is of the form:
1880          *
1881          *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1882          *
1883          * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1884          * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1885          * particular attribute.
1886          *
1887          * Our LMDB backend uses the default memcmp for key comparison.
1888          */
1889
1890         /* Eliminate NUL byte at the end of the empty key */
1891         ldb_key2.length--;
1892
1893         if (ascending) {
1894                 /* : becomes ; for pseudo end-key */
1895                 ldb_key2.data[ldb_key2.length-1]++;
1896                 start_key = ldb_key;
1897                 end_key = ldb_key2;
1898         } else {
1899                 start_key = ldb_key2;
1900                 end_key = ldb_key;
1901         }
1902
1903         ctx.module = module;
1904         ctx.error = 0;
1905         ctx.dn_list = list;
1906         ctx.dn_list->count = 0;
1907         ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
1908
1909         ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
1910                                             traverse_range_index, &ctx);
1911
1912         if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
1913                 return LDB_ERR_OPERATIONS_ERROR;
1914         }
1915
1916         TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
1917                        ldb_val_equal_exact_for_qsort);
1918
1919         talloc_free(tmp_ctx);
1920
1921         return LDB_SUCCESS;
1922 }
1923
1924 static int ldb_kv_index_dn_greater(struct ldb_module *module,
1925                                    struct ldb_kv_private *ldb_kv,
1926                                    const struct ldb_parse_tree *tree,
1927                                    struct dn_list *list)
1928 {
1929         return ldb_kv_index_dn_ordered(module,
1930                                        ldb_kv,
1931                                        tree,
1932                                        list, true);
1933 }
1934
1935 static int ldb_kv_index_dn_less(struct ldb_module *module,
1936                                    struct ldb_kv_private *ldb_kv,
1937                                    const struct ldb_parse_tree *tree,
1938                                    struct dn_list *list)
1939 {
1940         return ldb_kv_index_dn_ordered(module,
1941                                        ldb_kv,
1942                                        tree,
1943                                        list, false);
1944 }
1945
1946 /*
1947   return a list of matching objects using a one-level index
1948  */
1949 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1950                                 struct ldb_kv_private *ldb_kv,
1951                                 const char *attr,
1952                                 struct ldb_dn *dn,
1953                                 struct dn_list *list,
1954                                 enum key_truncation *truncation)
1955 {
1956         struct ldb_context *ldb;
1957         struct ldb_dn *key;
1958         struct ldb_val val;
1959         int ret;
1960
1961         ldb = ldb_module_get_ctx(module);
1962
1963         /* work out the index key from the parent DN */
1964         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1965         if (val.data == NULL) {
1966                 const char *dn_str = ldb_dn_get_linearized(dn);
1967                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1968                                        __location__
1969                                        ": Failed to get casefold DN "
1970                                        "from: %s",
1971                                        dn_str);
1972                 return LDB_ERR_OPERATIONS_ERROR;
1973         }
1974         val.length = strlen((char *)val.data);
1975         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1976         if (!key) {
1977                 ldb_oom(ldb);
1978                 return LDB_ERR_OPERATIONS_ERROR;
1979         }
1980
1981         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1982         talloc_free(key);
1983         if (ret != LDB_SUCCESS) {
1984                 return ret;
1985         }
1986
1987         if (list->count == 0) {
1988                 return LDB_ERR_NO_SUCH_OBJECT;
1989         }
1990
1991         return LDB_SUCCESS;
1992 }
1993
1994 /*
1995   return a list of matching objects using a one-level index
1996  */
1997 static int ldb_kv_index_dn_one(struct ldb_module *module,
1998                                struct ldb_kv_private *ldb_kv,
1999                                struct ldb_dn *parent_dn,
2000                                struct dn_list *list,
2001                                enum key_truncation *truncation)
2002 {
2003         /*
2004          * Ensure we do not shortcut on intersection for this list.
2005          * We must never be lazy and return an entry not in this
2006          * list.  This allows the index for
2007          * SCOPE_ONELEVEL to be trusted.
2008          */
2009
2010         list->strict = true;
2011         return ldb_kv_index_dn_attr(
2012             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2013 }
2014
2015 /*
2016   return a list of matching objects using the DN index
2017  */
2018 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2019                                    struct ldb_kv_private *ldb_kv,
2020                                    struct ldb_dn *base_dn,
2021                                    struct dn_list *dn_list,
2022                                    enum key_truncation *truncation)
2023 {
2024         const struct ldb_val *guid_val = NULL;
2025         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2026                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2027                 if (dn_list->dn == NULL) {
2028                         return ldb_module_oom(module);
2029                 }
2030                 dn_list->dn[0].data = discard_const_p(unsigned char,
2031                                                       ldb_dn_get_linearized(base_dn));
2032                 if (dn_list->dn[0].data == NULL) {
2033                         talloc_free(dn_list->dn);
2034                         return ldb_module_oom(module);
2035                 }
2036                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2037                 dn_list->count = 1;
2038
2039                 return LDB_SUCCESS;
2040         }
2041
2042         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2043                 guid_val = ldb_dn_get_extended_component(
2044                     base_dn, ldb_kv->cache->GUID_index_dn_component);
2045         }
2046
2047         if (guid_val != NULL) {
2048                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2049                 if (dn_list->dn == NULL) {
2050                         return ldb_module_oom(module);
2051                 }
2052                 dn_list->dn[0].data = guid_val->data;
2053                 dn_list->dn[0].length = guid_val->length;
2054                 dn_list->count = 1;
2055
2056                 return LDB_SUCCESS;
2057         }
2058
2059         return ldb_kv_index_dn_attr(
2060             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2061 }
2062
2063 /*
2064   return a list of dn's that might match a indexed search or
2065   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2066  */
2067 static int ldb_kv_index_dn(struct ldb_module *module,
2068                            struct ldb_kv_private *ldb_kv,
2069                            const struct ldb_parse_tree *tree,
2070                            struct dn_list *list)
2071 {
2072         int ret = LDB_ERR_OPERATIONS_ERROR;
2073
2074         switch (tree->operation) {
2075         case LDB_OP_AND:
2076                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2077                 break;
2078
2079         case LDB_OP_OR:
2080                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2081                 break;
2082
2083         case LDB_OP_NOT:
2084                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2085                 break;
2086
2087         case LDB_OP_EQUALITY:
2088                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2089                 break;
2090
2091         case LDB_OP_GREATER:
2092                 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2093                 break;
2094
2095         case LDB_OP_LESS:
2096                 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2097                 break;
2098
2099         case LDB_OP_SUBSTRING:
2100         case LDB_OP_PRESENT:
2101         case LDB_OP_APPROX:
2102         case LDB_OP_EXTENDED:
2103                 /* we can't index with fancy bitops yet */
2104                 ret = LDB_ERR_OPERATIONS_ERROR;
2105                 break;
2106         }
2107
2108         return ret;
2109 }
2110
2111 /*
2112   filter a candidate dn_list from an indexed search into a set of results
2113   extracting just the given attributes
2114 */
2115 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2116                                const struct dn_list *dn_list,
2117                                struct ldb_kv_context *ac,
2118                                uint32_t *match_count,
2119                                enum key_truncation scope_one_truncation)
2120 {
2121         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2122         struct ldb_message *msg;
2123         struct ldb_message *filtered_msg;
2124         unsigned int i;
2125         unsigned int num_keys = 0;
2126         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2127         struct ldb_val *keys = NULL;
2128
2129         /*
2130          * We have to allocate the key list (rather than just walk the
2131          * caller supplied list) as the callback could change the list
2132          * (by modifying an indexed attribute hosted in the in-memory
2133          * index cache!)
2134          */
2135         keys = talloc_array(ac, struct ldb_val, dn_list->count);
2136         if (keys == NULL) {
2137                 return ldb_module_oom(ac->module);
2138         }
2139
2140         if (ldb_kv->cache->GUID_index_attribute != NULL) {
2141                 /*
2142                  * We speculate that the keys will be GUID based and so
2143                  * pre-fill in enough space for a GUID (avoiding a pile of
2144                  * small allocations)
2145                  */
2146                 struct guid_tdb_key {
2147                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2148                 } *key_values = NULL;
2149
2150                 key_values = talloc_array(keys,
2151                                           struct guid_tdb_key,
2152                                           dn_list->count);
2153
2154                 if (key_values == NULL) {
2155                         talloc_free(keys);
2156                         return ldb_module_oom(ac->module);
2157                 }
2158                 for (i = 0; i < dn_list->count; i++) {
2159                         keys[i].data = key_values[i].guid_key;
2160                         keys[i].length = sizeof(key_values[i].guid_key);
2161                 }
2162         } else {
2163                 for (i = 0; i < dn_list->count; i++) {
2164                         keys[i].data = NULL;
2165                         keys[i].length = 0;
2166                 }
2167         }
2168
2169         for (i = 0; i < dn_list->count; i++) {
2170                 int ret;
2171
2172                 ret = ldb_kv_idx_to_key(
2173                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2174                 if (ret != LDB_SUCCESS) {
2175                         talloc_free(keys);
2176                         return ret;
2177                 }
2178
2179                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2180                         /*
2181                          * If we are in GUID index mode, then the dn_list is
2182                          * sorted.  If we got a duplicate, forget about it, as
2183                          * otherwise we would send the same entry back more
2184                          * than once.
2185                          *
2186                          * This is needed in the truncated DN case, or if a
2187                          * duplicate was forced in via
2188                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2189                          */
2190
2191                         if (memcmp(previous_guid_key,
2192                                    keys[num_keys].data,
2193                                    sizeof(previous_guid_key)) == 0) {
2194                                 continue;
2195                         }
2196
2197                         memcpy(previous_guid_key,
2198                                keys[num_keys].data,
2199                                sizeof(previous_guid_key));
2200                 }
2201                 num_keys++;
2202         }
2203
2204
2205         /*
2206          * Now that the list is a safe copy, send the callbacks
2207          */
2208         for (i = 0; i < num_keys; i++) {
2209                 int ret;
2210                 bool matched;
2211                 msg = ldb_msg_new(ac);
2212                 if (!msg) {
2213                         talloc_free(keys);
2214                         return LDB_ERR_OPERATIONS_ERROR;
2215                 }
2216
2217                 ret =
2218                     ldb_kv_search_key(ac->module,
2219                                       ldb_kv,
2220                                       keys[i],
2221                                       msg,
2222                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2223                                       /*
2224                                        * The entry point ldb_kv_search_indexed is
2225                                        * only called from the read-locked
2226                                        * ldb_kv_search.
2227                                        */
2228                                       LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2229                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2230                         /*
2231                          * the record has disappeared? yes, this can
2232                          * happen if the entry is deleted by something
2233                          * operating in the callback (not another
2234                          * process, as we have a read lock)
2235                          */
2236                         talloc_free(msg);
2237                         continue;
2238                 }
2239
2240                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2241                         /* an internal error */
2242                         talloc_free(keys);
2243                         talloc_free(msg);
2244                         return LDB_ERR_OPERATIONS_ERROR;
2245                 }
2246
2247                 /*
2248                  * We trust the index for LDB_SCOPE_ONELEVEL
2249                  * unless the index key has been truncated.
2250                  *
2251                  * LDB_SCOPE_BASE is not passed in by our only caller.
2252                  */
2253                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2254                     ldb_kv->cache->one_level_indexes &&
2255                     scope_one_truncation == KEY_NOT_TRUNCATED) {
2256                         ret = ldb_match_message(ldb, msg, ac->tree,
2257                                                 ac->scope, &matched);
2258                 } else {
2259                         ret = ldb_match_msg_error(ldb, msg,
2260                                                   ac->tree, ac->base,
2261                                                   ac->scope, &matched);
2262                 }
2263
2264                 if (ret != LDB_SUCCESS) {
2265                         talloc_free(keys);
2266                         talloc_free(msg);
2267                         return ret;
2268                 }
2269                 if (!matched) {
2270                         talloc_free(msg);
2271                         continue;
2272                 }
2273
2274                 filtered_msg = ldb_msg_new(ac);
2275                 if (filtered_msg == NULL) {
2276                         TALLOC_FREE(keys);
2277                         TALLOC_FREE(msg);
2278                         return LDB_ERR_OPERATIONS_ERROR;
2279                 }
2280
2281                 filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2282
2283                 /* filter the attributes that the user wants */
2284                 ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2285
2286                 talloc_free(msg);
2287
2288                 if (ret == -1) {
2289                         TALLOC_FREE(filtered_msg);
2290                         talloc_free(keys);
2291                         return LDB_ERR_OPERATIONS_ERROR;
2292                 }
2293
2294                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2295                 if (ret != LDB_SUCCESS) {
2296                         /* Regardless of success or failure, the msg
2297                          * is the callbacks responsiblity, and should
2298                          * not be talloc_free()'ed */
2299                         ac->request_terminated = true;
2300                         talloc_free(keys);
2301                         return ret;
2302                 }
2303
2304                 (*match_count)++;
2305         }
2306
2307         TALLOC_FREE(keys);
2308         return LDB_SUCCESS;
2309 }
2310
2311 /*
2312   sort a DN list
2313  */
2314 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2315                                 struct dn_list *list)
2316 {
2317         if (list->count < 2) {
2318                 return;
2319         }
2320
2321         /* We know the list is sorted when using the GUID index */
2322         if (ltdb->cache->GUID_index_attribute != NULL) {
2323                 return;
2324         }
2325
2326         TYPESAFE_QSORT(list->dn, list->count,
2327                        ldb_val_equal_exact_for_qsort);
2328 }
2329
2330 /*
2331   search the database with a LDAP-like expression using indexes
2332   returns -1 if an indexed search is not possible, in which
2333   case the caller should call ltdb_search_full()
2334 */
2335 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2336 {
2337         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2338         struct ldb_kv_private *ldb_kv = talloc_get_type(
2339             ldb_module_get_private(ac->module), struct ldb_kv_private);
2340         struct dn_list *dn_list;
2341         int ret;
2342         enum ldb_scope index_scope;
2343         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2344
2345         /* see if indexing is enabled */
2346         if (!ldb_kv->cache->attribute_indexes &&
2347             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2348                 /* fallback to a full search */
2349                 return LDB_ERR_OPERATIONS_ERROR;
2350         }
2351
2352         dn_list = talloc_zero(ac, struct dn_list);
2353         if (dn_list == NULL) {
2354                 return ldb_module_oom(ac->module);
2355         }
2356
2357         /*
2358          * For the purposes of selecting the switch arm below, if we
2359          * don't have a one-level index then treat it like a subtree
2360          * search
2361          */
2362         if (ac->scope == LDB_SCOPE_ONELEVEL &&
2363             !ldb_kv->cache->one_level_indexes) {
2364                 index_scope = LDB_SCOPE_SUBTREE;
2365         } else {
2366                 index_scope = ac->scope;
2367         }
2368
2369         switch (index_scope) {
2370         case LDB_SCOPE_BASE:
2371                 /*
2372                  * The only caller will have filtered the operation out
2373                  * so we should never get here
2374                  */
2375                 return ldb_operr(ldb);
2376
2377         case LDB_SCOPE_ONELEVEL:
2378
2379                 /*
2380                  * First, load all the one-level child objects (regardless of
2381                  * whether they match the search filter or not). The database
2382                  * maintains a one-level index, so retrieving this is quick.
2383                  */
2384                 ret = ldb_kv_index_dn_one(ac->module,
2385                                           ldb_kv,
2386                                           ac->base,
2387                                           dn_list,
2388                                           &scope_one_truncation);
2389                 if (ret != LDB_SUCCESS) {
2390                         talloc_free(dn_list);
2391                         return ret;
2392                 }
2393
2394                 /*
2395                  * If we have too many children, running ldb_kv_index_filter()
2396                  * over all the child objects can be quite expensive. So next
2397                  * we do a separate indexed query using the search filter.
2398                  *
2399                  * This should be quick, but it may return objects that are not
2400                  * the direct one-level child objects we're interested in.
2401                  *
2402                  * We only do this in the GUID index mode, which is
2403                  * O(n*log(m)) otherwise the intersection below will
2404                  * be too costly at O(n*m).
2405                  *
2406                  * We don't set a heuristic for 'too many' but instead
2407                  * do it always and rely on the index lookup being
2408                  * fast enough in the small case.
2409                  */
2410                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2411                         struct dn_list *indexed_search_result
2412                                 = talloc_zero(ac, struct dn_list);
2413                         if (indexed_search_result == NULL) {
2414                                 talloc_free(dn_list);
2415                                 return ldb_module_oom(ac->module);
2416                         }
2417
2418                         if (!ldb_kv->cache->attribute_indexes) {
2419                                 talloc_free(indexed_search_result);
2420                                 talloc_free(dn_list);
2421                                 return LDB_ERR_OPERATIONS_ERROR;
2422                         }
2423
2424                         /*
2425                          * Try to do an indexed database search
2426                          */
2427                         ret = ldb_kv_index_dn(
2428                             ac->module, ldb_kv, ac->tree,
2429                             indexed_search_result);
2430
2431                         /*
2432                          * We can stop if we're sure the object doesn't exist
2433                          */
2434                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2435                                 talloc_free(indexed_search_result);
2436                                 talloc_free(dn_list);
2437                                 return LDB_ERR_NO_SUCH_OBJECT;
2438                         }
2439
2440                         /*
2441                          * Once we have a successful search result, we
2442                          * intersect it with the one-level children (dn_list).
2443                          * This should give us exactly the result we're after
2444                          * (we still need to run ldb_kv_index_filter() to
2445                          * handle potential index truncation cases).
2446                          *
2447                          * The indexed search may fail because we don't support
2448                          * indexing on that type of search operation, e.g.
2449                          * matching against '*'. In which case we fall through
2450                          * and run ldb_kv_index_filter() over all the one-level
2451                          * children (which is still better than bailing out here
2452                          * and falling back to a full DB scan).
2453                          */
2454                         if (ret == LDB_SUCCESS) {
2455                                 if (!list_intersect(ldb_kv,
2456                                                     dn_list,
2457                                                     indexed_search_result)) {
2458                                         talloc_free(indexed_search_result);
2459                                         talloc_free(dn_list);
2460                                         return LDB_ERR_OPERATIONS_ERROR;
2461                                 }
2462                         }
2463                 }
2464                 break;
2465
2466         case LDB_SCOPE_SUBTREE:
2467         case LDB_SCOPE_DEFAULT:
2468                 if (!ldb_kv->cache->attribute_indexes) {
2469                         talloc_free(dn_list);
2470                         return LDB_ERR_OPERATIONS_ERROR;
2471                 }
2472                 /*
2473                  * Here we load the index for the tree.  We have no
2474                  * index for the subtree.
2475                  */
2476                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2477                 if (ret != LDB_SUCCESS) {
2478                         talloc_free(dn_list);
2479                         return ret;
2480                 }
2481                 break;
2482         }
2483
2484         /*
2485          * It is critical that this function do the re-filter even
2486          * on things found by the index as the index can over-match
2487          * in cases of truncation (as well as when it decides it is
2488          * not worth further filtering)
2489          *
2490          * If this changes, then the index code above would need to
2491          * pass up a flag to say if any index was truncated during
2492          * processing as the truncation here refers only to the
2493          * SCOPE_ONELEVEL index.
2494          */
2495         ret = ldb_kv_index_filter(
2496             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2497         talloc_free(dn_list);
2498         return ret;
2499 }
2500
2501 /**
2502  * @brief Add a DN in the index list of a given attribute name/value pair
2503  *
2504  * This function will add the DN in the index list for the index for
2505  * the given attribute name and value.
2506  *
2507  * @param[in]  module       A ldb_module structure
2508  *
2509  * @param[in]  dn           The string representation of the DN as it
2510  *                          will be stored in the index entry
2511  *
2512  * @param[in]  el           A ldb_message_element array, one of the entry
2513  *                          referred by the v_idx is the attribute name and
2514  *                          value pair which will be used to construct the
2515  *                          index name
2516  *
2517  * @param[in]  v_idx        The index of element in the el array to use
2518  *
2519  * @return                  An ldb error code
2520  */
2521 static int ldb_kv_index_add1(struct ldb_module *module,
2522                              struct ldb_kv_private *ldb_kv,
2523                              const struct ldb_message *msg,
2524                              struct ldb_message_element *el,
2525                              int v_idx)
2526 {
2527         struct ldb_context *ldb;
2528         struct ldb_dn *dn_key;
2529         int ret;
2530         const struct ldb_schema_attribute *a;
2531         struct dn_list *list;
2532         unsigned alloc_len;
2533         enum key_truncation truncation = KEY_TRUNCATED;
2534
2535
2536         ldb = ldb_module_get_ctx(module);
2537
2538         list = talloc_zero(module, struct dn_list);
2539         if (list == NULL) {
2540                 return LDB_ERR_OPERATIONS_ERROR;
2541         }
2542
2543         dn_key = ldb_kv_index_key(
2544             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2545         if (!dn_key) {
2546                 talloc_free(list);
2547                 return LDB_ERR_OPERATIONS_ERROR;
2548         }
2549         /*
2550          * Samba only maintains unique indexes on the objectSID and objectGUID
2551          * so if a unique index key exceeds the maximum length there is a
2552          * problem.
2553          */
2554         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2555                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2556                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2557
2558                 ldb_asprintf_errstring(
2559                     ldb,
2560                     __location__ ": unique index key on %s in %s, "
2561                                  "exceeds maximum key length of %u (encoded).",
2562                     el->name,
2563                     ldb_dn_get_linearized(msg->dn),
2564                     ldb_kv->max_key_length);
2565                 talloc_free(list);
2566                 return LDB_ERR_CONSTRAINT_VIOLATION;
2567         }
2568         talloc_steal(list, dn_key);
2569
2570         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2571         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2572                 talloc_free(list);
2573                 return ret;
2574         }
2575
2576         /*
2577          * Check for duplicates in the @IDXDN DN -> GUID record
2578          *
2579          * This is very normal, it just means a duplicate DN creation
2580          * was attempted, so don't set the error string or print scary
2581          * messages.
2582          */
2583         if (list->count > 0 &&
2584             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2585             truncation == KEY_NOT_TRUNCATED) {
2586
2587                 talloc_free(list);
2588                 return LDB_ERR_CONSTRAINT_VIOLATION;
2589
2590         } else if (list->count > 0
2591                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2592
2593                 /*
2594                  * At least one existing entry in the DN->GUID index, which
2595                  * arises when the DN indexes have been truncated
2596                  *
2597                  * So need to pull the DN's to check if it's really a duplicate
2598                  */
2599                 unsigned int i;
2600                 for (i=0; i < list->count; i++) {
2601                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2602                         struct ldb_val key = {
2603                                 .data = guid_key,
2604                                 .length = sizeof(guid_key)
2605                         };
2606                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2607                         struct ldb_message *rec = ldb_msg_new(ldb);
2608                         if (rec == NULL) {
2609                                 return LDB_ERR_OPERATIONS_ERROR;
2610                         }
2611
2612                         ret = ldb_kv_idx_to_key(
2613                             module, ldb_kv, ldb, &list->dn[i], &key);
2614                         if (ret != LDB_SUCCESS) {
2615                                 TALLOC_FREE(list);
2616                                 TALLOC_FREE(rec);
2617                                 return ret;
2618                         }
2619
2620                         ret =
2621                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2622                         if (key.data != guid_key) {
2623                                 TALLOC_FREE(key.data);
2624                         }
2625                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2626                                 /*
2627                                  * the record has disappeared?
2628                                  * yes, this can happen
2629                                  */
2630                                 talloc_free(rec);
2631                                 continue;
2632                         }
2633
2634                         if (ret != LDB_SUCCESS) {
2635                                 /* an internal error */
2636                                 TALLOC_FREE(rec);
2637                                 TALLOC_FREE(list);
2638                                 return LDB_ERR_OPERATIONS_ERROR;
2639                         }
2640                         /*
2641                          * The DN we are trying to add to the DB and index
2642                          * is already here, so we must deny the addition
2643                          */
2644                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2645                                 TALLOC_FREE(rec);
2646                                 TALLOC_FREE(list);
2647                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2648                         }
2649                 }
2650         }
2651
2652         /*
2653          * Check for duplicates in unique indexes
2654          *
2655          * We don't need to do a loop test like the @IDXDN case
2656          * above as we have a ban on long unique index values
2657          * at the start of this function.
2658          */
2659         if (list->count > 0 &&
2660             ((a != NULL
2661               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2662                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2663                 /*
2664                  * We do not want to print info about a possibly
2665                  * confidential DN that the conflict was with in the
2666                  * user-visible error string
2667                  */
2668
2669                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2670                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2671                                   __location__
2672                                   ": unique index violation on %s in %s, "
2673                                   "conflicts with %*.*s in %s",
2674                                   el->name, ldb_dn_get_linearized(msg->dn),
2675                                   (int)list->dn[0].length,
2676                                   (int)list->dn[0].length,
2677                                   list->dn[0].data,
2678                                   ldb_dn_get_linearized(dn_key));
2679                 } else {
2680                         /* This can't fail, gives a default at worst */
2681                         const struct ldb_schema_attribute *attr =
2682                             ldb_schema_attribute_by_name(
2683                                 ldb, ldb_kv->cache->GUID_index_attribute);
2684                         struct ldb_val v;
2685                         ret = attr->syntax->ldif_write_fn(ldb, list,
2686                                                           &list->dn[0], &v);
2687                         if (ret == LDB_SUCCESS) {
2688                                 ldb_debug(ldb,
2689                                           LDB_DEBUG_WARNING,
2690                                           __location__
2691                                           ": unique index violation on %s in "
2692                                           "%s, conflicts with %s %*.*s in %s",
2693                                           el->name,
2694                                           ldb_dn_get_linearized(msg->dn),
2695                                           ldb_kv->cache->GUID_index_attribute,
2696                                           (int)v.length,
2697                                           (int)v.length,
2698                                           v.data,
2699                                           ldb_dn_get_linearized(dn_key));
2700                         }
2701                 }
2702                 ldb_asprintf_errstring(ldb,
2703                                        __location__ ": unique index violation "
2704                                        "on %s in %s",
2705                                        el->name,
2706                                        ldb_dn_get_linearized(msg->dn));
2707                 talloc_free(list);
2708                 return LDB_ERR_CONSTRAINT_VIOLATION;
2709         }
2710
2711         /* overallocate the list a bit, to reduce the number of
2712          * realloc trigered copies */
2713         alloc_len = ((list->count+1)+7) & ~7;
2714         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2715         if (list->dn == NULL) {
2716                 talloc_free(list);
2717                 return LDB_ERR_OPERATIONS_ERROR;
2718         }
2719
2720         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2721                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2722                 list->dn[list->count].data
2723                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2724                 if (list->dn[list->count].data == NULL) {
2725                         talloc_free(list);
2726                         return LDB_ERR_OPERATIONS_ERROR;
2727                 }
2728                 list->dn[list->count].length = strlen(dn_str);
2729         } else {
2730                 const struct ldb_val *key_val;
2731                 struct ldb_val *exact = NULL, *next = NULL;
2732                 key_val = ldb_msg_find_ldb_val(
2733                     msg, ldb_kv->cache->GUID_index_attribute);
2734                 if (key_val == NULL) {
2735                         talloc_free(list);
2736                         return ldb_module_operr(module);
2737                 }
2738
2739                 if (key_val->length != LDB_KV_GUID_SIZE) {
2740                         talloc_free(list);
2741                         return ldb_module_operr(module);
2742                 }
2743
2744                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2745                                         *key_val, ldb_val_equal_exact_ordered,
2746                                         exact, next);
2747
2748                 /*
2749                  * Give a warning rather than fail, this could be a
2750                  * duplicate value in the record allowed by a caller
2751                  * forcing in the value with
2752                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2753                  */
2754                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2755                         /* This can't fail, gives a default at worst */
2756                         const struct ldb_schema_attribute *attr =
2757                             ldb_schema_attribute_by_name(
2758                                 ldb, ldb_kv->cache->GUID_index_attribute);
2759                         struct ldb_val v;
2760                         ret = attr->syntax->ldif_write_fn(ldb, list,
2761                                                           exact, &v);
2762                         if (ret == LDB_SUCCESS) {
2763                                 ldb_debug(ldb,
2764                                           LDB_DEBUG_WARNING,
2765                                           __location__
2766                                           ": duplicate attribute value in %s "
2767                                           "for index on %s, "
2768                                           "duplicate of %s %*.*s in %s",
2769                                           ldb_dn_get_linearized(msg->dn),
2770                                           el->name,
2771                                           ldb_kv->cache->GUID_index_attribute,
2772                                           (int)v.length,
2773                                           (int)v.length,
2774                                           v.data,
2775                                           ldb_dn_get_linearized(dn_key));
2776                         }
2777                 }
2778
2779                 if (next == NULL) {
2780                         next = &list->dn[list->count];
2781                 } else {
2782                         memmove(&next[1], next,
2783                                 sizeof(*next) * (list->count - (next - list->dn)));
2784                 }
2785                 *next = ldb_val_dup(list->dn, key_val);
2786                 if (next->data == NULL) {
2787                         talloc_free(list);
2788                         return ldb_module_operr(module);
2789                 }
2790         }
2791         list->count++;
2792
2793         ret = ldb_kv_dn_list_store(module, dn_key, list);
2794
2795         talloc_free(list);
2796
2797         return ret;
2798 }
2799
2800 /*
2801   add index entries for one elements in a message
2802  */
2803 static int ldb_kv_index_add_el(struct ldb_module *module,
2804                                struct ldb_kv_private *ldb_kv,
2805                                const struct ldb_message *msg,
2806                                struct ldb_message_element *el)
2807 {
2808         unsigned int i;
2809         for (i = 0; i < el->num_values; i++) {
2810                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2811                 if (ret != LDB_SUCCESS) {
2812                         return ret;
2813                 }
2814         }
2815
2816         return LDB_SUCCESS;
2817 }
2818
2819 /*
2820   add index entries for all elements in a message
2821  */
2822 static int ldb_kv_index_add_all(struct ldb_module *module,
2823                                 struct ldb_kv_private *ldb_kv,
2824                                 const struct ldb_message *msg)
2825 {
2826         struct ldb_message_element *elements = msg->elements;
2827         unsigned int i;
2828         const char *dn_str;
2829         int ret;
2830
2831         if (ldb_dn_is_special(msg->dn)) {
2832                 return LDB_SUCCESS;
2833         }
2834
2835         dn_str = ldb_dn_get_linearized(msg->dn);
2836         if (dn_str == NULL) {
2837                 return LDB_ERR_OPERATIONS_ERROR;
2838         }
2839
2840         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2841         if (ret != LDB_SUCCESS) {
2842                 return ret;
2843         }
2844
2845         if (!ldb_kv->cache->attribute_indexes) {
2846                 /* no indexed fields */
2847                 return LDB_SUCCESS;
2848         }
2849
2850         for (i = 0; i < msg->num_elements; i++) {
2851                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2852                         continue;
2853                 }
2854                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2855                 if (ret != LDB_SUCCESS) {
2856                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2857                         ldb_asprintf_errstring(ldb,
2858                                                __location__ ": Failed to re-index %s in %s - %s",
2859                                                elements[i].name, dn_str,
2860                                                ldb_errstring(ldb));
2861                         return ret;
2862                 }
2863         }
2864
2865         return LDB_SUCCESS;
2866 }
2867
2868
2869 /*
2870   insert a DN index for a message
2871 */
2872 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2873                                   struct ldb_kv_private *ldb_kv,
2874                                   const struct ldb_message *msg,
2875                                   struct ldb_dn *dn,
2876                                   const char *index,
2877                                   int add)
2878 {
2879         struct ldb_message_element el;
2880         struct ldb_val val;
2881         int ret;
2882
2883         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2884         if (val.data == NULL) {
2885                 const char *dn_str = ldb_dn_get_linearized(dn);
2886                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2887                                        __location__ ": Failed to modify %s "
2888                                                     "against %s in %s: failed "
2889                                                     "to get casefold DN",
2890                                        index,
2891                                        ldb_kv->cache->GUID_index_attribute,
2892                                        dn_str);
2893                 return LDB_ERR_OPERATIONS_ERROR;
2894         }
2895
2896         val.length = strlen((char *)val.data);
2897         el.name = index;
2898         el.values = &val;
2899         el.num_values = 1;
2900
2901         if (add) {
2902                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2903         } else { /* delete */
2904                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2905         }
2906
2907         if (ret != LDB_SUCCESS) {
2908                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2909                 const char *dn_str = ldb_dn_get_linearized(dn);
2910                 ldb_asprintf_errstring(ldb,
2911                                        __location__ ": Failed to modify %s "
2912                                                     "against %s in %s - %s",
2913                                        index,
2914                                        ldb_kv->cache->GUID_index_attribute,
2915                                        dn_str,
2916                                        ldb_errstring(ldb));
2917                 return ret;
2918         }
2919         return ret;
2920 }
2921
2922 /*
2923   insert a one level index for a message
2924 */
2925 static int ldb_kv_index_onelevel(struct ldb_module *module,
2926                                  const struct ldb_message *msg,
2927                                  int add)
2928 {
2929         struct ldb_kv_private *ldb_kv = talloc_get_type(
2930             ldb_module_get_private(module), struct ldb_kv_private);
2931         struct ldb_dn *pdn;
2932         int ret;
2933
2934         /* We index for ONE Level only if requested */
2935         if (!ldb_kv->cache->one_level_indexes) {
2936                 return LDB_SUCCESS;
2937         }
2938
2939         pdn = ldb_dn_get_parent(module, msg->dn);
2940         if (pdn == NULL) {
2941                 return LDB_ERR_OPERATIONS_ERROR;
2942         }
2943         ret =
2944             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2945
2946         talloc_free(pdn);
2947
2948         return ret;
2949 }
2950
2951 /*
2952   insert a one level index for a message
2953 */
2954 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2955                                       const struct ldb_message *msg,
2956                                       int add)
2957 {
2958         int ret;
2959         struct ldb_kv_private *ldb_kv = talloc_get_type(
2960             ldb_module_get_private(module), struct ldb_kv_private);
2961
2962         /* We index for DN only if using a GUID index */
2963         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2964                 return LDB_SUCCESS;
2965         }
2966
2967         ret = ldb_kv_modify_index_dn(
2968             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2969
2970         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2971                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2972                                        "Entry %s already exists",
2973                                        ldb_dn_get_linearized(msg->dn));
2974                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2975         }
2976         return ret;
2977 }
2978
2979 /*
2980   add the index entries for a new element in a record
2981   The caller guarantees that these element values are not yet indexed
2982 */
2983 int ldb_kv_index_add_element(struct ldb_module *module,
2984                              struct ldb_kv_private *ldb_kv,
2985                              const struct ldb_message *msg,
2986                              struct ldb_message_element *el)
2987 {
2988         if (ldb_dn_is_special(msg->dn)) {
2989                 return LDB_SUCCESS;
2990         }
2991         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2992                 return LDB_SUCCESS;
2993         }
2994         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2995 }
2996
2997 /*
2998   add the index entries for a new record
2999 */
3000 int ldb_kv_index_add_new(struct ldb_module *module,
3001                          struct ldb_kv_private *ldb_kv,
3002                          const struct ldb_message *msg)
3003 {
3004         int ret;
3005
3006         if (ldb_dn_is_special(msg->dn)) {
3007                 return LDB_SUCCESS;
3008         }
3009
3010         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3011         if (ret != LDB_SUCCESS) {
3012                 /*
3013                  * Because we can't trust the caller to be doing
3014                  * transactions properly, clean up any index for this
3015                  * entry rather than relying on a transaction
3016                  * cleanup
3017                  */
3018
3019                 ldb_kv_index_delete(module, msg);
3020                 return ret;
3021         }
3022
3023         ret = ldb_kv_index_onelevel(module, msg, 1);
3024         if (ret != LDB_SUCCESS) {
3025                 /*
3026                  * Because we can't trust the caller to be doing
3027                  * transactions properly, clean up any index for this
3028                  * entry rather than relying on a transaction
3029                  * cleanup
3030                  */
3031                 ldb_kv_index_delete(module, msg);
3032                 return ret;
3033         }
3034         return ret;
3035 }
3036
3037
3038 /*
3039   delete an index entry for one message element
3040 */
3041 int ldb_kv_index_del_value(struct ldb_module *module,
3042                            struct ldb_kv_private *ldb_kv,
3043                            const struct ldb_message *msg,
3044                            struct ldb_message_element *el,
3045                            unsigned int v_idx)
3046 {
3047         struct ldb_context *ldb;
3048         struct ldb_dn *dn_key;
3049         const char *dn_str;
3050         int ret, i;
3051         unsigned int j;
3052         struct dn_list *list;
3053         struct ldb_dn *dn = msg->dn;
3054         enum key_truncation truncation = KEY_NOT_TRUNCATED;
3055
3056         ldb = ldb_module_get_ctx(module);
3057
3058         dn_str = ldb_dn_get_linearized(dn);
3059         if (dn_str == NULL) {
3060                 return LDB_ERR_OPERATIONS_ERROR;
3061         }
3062
3063         if (dn_str[0] == '@') {
3064                 return LDB_SUCCESS;
3065         }
3066
3067         dn_key = ldb_kv_index_key(
3068             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3069         /*
3070          * We ignore key truncation in ltdb_index_add1() so
3071          * match that by ignoring it here as well
3072          *
3073          * Multiple values are legitimate and accepted
3074          */
3075         if (!dn_key) {
3076                 return LDB_ERR_OPERATIONS_ERROR;
3077         }
3078
3079         list = talloc_zero(dn_key, struct dn_list);
3080         if (list == NULL) {
3081                 talloc_free(dn_key);
3082                 return LDB_ERR_OPERATIONS_ERROR;
3083         }
3084
3085         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
3086         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3087                 /* it wasn't indexed. Did we have an earlier error? If we did then
3088                    its gone now */
3089                 talloc_free(dn_key);
3090                 return LDB_SUCCESS;
3091         }
3092
3093         if (ret != LDB_SUCCESS) {
3094                 talloc_free(dn_key);
3095                 return ret;
3096         }
3097
3098         /*
3099          * Find one of the values matching this message to remove
3100          */
3101         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3102         if (i == -1) {
3103                 /* nothing to delete */
3104                 talloc_free(dn_key);
3105                 return LDB_SUCCESS;
3106         }
3107
3108         j = (unsigned int) i;
3109         if (j != list->count - 1) {
3110                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3111         }
3112         list->count--;
3113         if (list->count == 0) {
3114                 talloc_free(list->dn);
3115                 list->dn = NULL;
3116         } else {
3117                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3118         }
3119
3120         ret = ldb_kv_dn_list_store(module, dn_key, list);
3121
3122         talloc_free(dn_key);
3123
3124         return ret;
3125 }
3126
3127 /*
3128   delete the index entries for a element
3129   return -1 on failure
3130 */
3131 int ldb_kv_index_del_element(struct ldb_module *module,
3132                              struct ldb_kv_private *ldb_kv,
3133                              const struct ldb_message *msg,
3134                              struct ldb_message_element *el)
3135 {
3136         const char *dn_str;
3137         int ret;
3138         unsigned int i;
3139
3140         if (!ldb_kv->cache->attribute_indexes) {
3141                 /* no indexed fields */
3142                 return LDB_SUCCESS;
3143         }
3144
3145         dn_str = ldb_dn_get_linearized(msg->dn);
3146         if (dn_str == NULL) {
3147                 return LDB_ERR_OPERATIONS_ERROR;
3148         }
3149
3150         if (dn_str[0] == '@') {
3151                 return LDB_SUCCESS;
3152         }
3153
3154         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3155                 return LDB_SUCCESS;
3156         }
3157         for (i = 0; i < el->num_values; i++) {
3158                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3159                 if (ret != LDB_SUCCESS) {
3160                         return ret;
3161                 }
3162         }
3163
3164         return LDB_SUCCESS;
3165 }
3166
3167 /*
3168   delete the index entries for a record
3169   return -1 on failure
3170 */
3171 int ldb_kv_index_delete(struct ldb_module *module,
3172                         const struct ldb_message *msg)
3173 {
3174         struct ldb_kv_private *ldb_kv = talloc_get_type(
3175             ldb_module_get_private(module), struct ldb_kv_private);
3176         int ret;
3177         unsigned int i;
3178
3179         if (ldb_dn_is_special(msg->dn)) {
3180                 return LDB_SUCCESS;
3181         }
3182
3183         ret = ldb_kv_index_onelevel(module, msg, 0);
3184         if (ret != LDB_SUCCESS) {
3185                 return ret;
3186         }
3187
3188         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3189         if (ret != LDB_SUCCESS) {
3190                 return ret;
3191         }
3192
3193         if (!ldb_kv->cache->attribute_indexes) {
3194                 /* no indexed fields */
3195                 return LDB_SUCCESS;
3196         }
3197
3198         for (i = 0; i < msg->num_elements; i++) {
3199                 ret = ldb_kv_index_del_element(
3200                     module, ldb_kv, msg, &msg->elements[i]);
3201                 if (ret != LDB_SUCCESS) {
3202                         return ret;
3203                 }
3204         }
3205
3206         return LDB_SUCCESS;
3207 }
3208
3209
3210 /*
3211   traversal function that deletes all @INDEX records in the in-memory
3212   TDB.
3213
3214   This does not touch the actual DB, that is done at transaction
3215   commit, which in turn greatly reduces DB churn as we will likely
3216   be able to do a direct update into the old record.
3217 */
3218 static int delete_index(struct ldb_kv_private *ldb_kv,
3219                         struct ldb_val key,
3220                         _UNUSED_ struct ldb_val data,
3221                         void *state)
3222 {
3223         struct ldb_module *module = state;
3224         const char *dnstr = "DN=" LDB_KV_INDEX ":";
3225         struct dn_list list;
3226         struct ldb_dn *dn;
3227         struct ldb_val v;
3228         int ret;
3229
3230         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3231                 return 0;
3232         }
3233         /* we need to put a empty list in the internal tdb for this
3234          * index entry */
3235         list.dn = NULL;
3236         list.count = 0;
3237
3238         /* the offset of 3 is to remove the DN= prefix. */
3239         v.data = key.data + 3;
3240         v.length = strnlen((char *)key.data, key.length) - 3;
3241
3242         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3243
3244         /*
3245          * This does not actually touch the DB quite yet, just
3246          * the in-memory index cache
3247          */
3248         ret = ldb_kv_dn_list_store(module, dn, &list);
3249         if (ret != LDB_SUCCESS) {
3250                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3251                                        "Unable to store null index for %s\n",
3252                                                 ldb_dn_get_linearized(dn));
3253                 talloc_free(dn);
3254                 return -1;
3255         }
3256         talloc_free(dn);
3257         return 0;
3258 }
3259
3260 /*
3261   traversal function that adds @INDEX records during a re index TODO wrong comment
3262 */
3263 static int re_key(struct ldb_kv_private *ldb_kv,
3264                   struct ldb_val key,
3265                   struct ldb_val val,
3266                   void *state)
3267 {
3268         struct ldb_context *ldb;
3269         struct ldb_kv_reindex_context *ctx =
3270             (struct ldb_kv_reindex_context *)state;
3271         struct ldb_module *module = ldb_kv->module;
3272         struct ldb_message *msg;
3273         int ret;
3274         struct ldb_val key2;
3275         bool is_record;
3276
3277         ldb = ldb_module_get_ctx(module);
3278
3279         is_record = ldb_kv_key_is_normal_record(key);
3280         if (is_record == false) {
3281                 return 0;
3282         }
3283
3284         msg = ldb_msg_new(module);
3285         if (msg == NULL) {
3286                 return -1;
3287         }
3288
3289         ret = ldb_unpack_data(ldb, &val, msg);
3290         if (ret != 0) {
3291                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3292                                                 ldb_dn_get_linearized(msg->dn));
3293                 ctx->error = ret;
3294                 talloc_free(msg);
3295                 return -1;
3296         }
3297
3298         if (msg->dn == NULL) {
3299                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3300                           "Refusing to re-index as GUID "
3301                           "key %*.*s with no DN\n",
3302                           (int)key.length, (int)key.length,
3303                           (char *)key.data);
3304                 talloc_free(msg);
3305                 return -1;
3306         }
3307
3308         /* check if the DN key has changed, perhaps due to the case
3309            insensitivity of an element changing, or a change from DN
3310            to GUID keys */
3311         key2 = ldb_kv_key_msg(module, msg, msg);
3312         if (key2.data == NULL) {
3313                 /* probably a corrupt record ... darn */
3314                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3315                                                 ldb_dn_get_linearized(msg->dn));
3316                 talloc_free(msg);
3317                 return 0;
3318         }
3319         if (key.length != key2.length ||
3320             (memcmp(key.data, key2.data, key.length) != 0)) {
3321                 ldb_kv->kv_ops->update_in_iterate(
3322                     ldb_kv, key, key2, val, ctx);
3323         }
3324         talloc_free(key2.data);
3325
3326         talloc_free(msg);
3327
3328         ctx->count++;
3329         if (ctx->count % 10000 == 0) {
3330                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3331                           "Reindexing: re-keyed %u records so far",
3332                           ctx->count);
3333         }
3334
3335         return 0;
3336 }
3337
3338 /*
3339   traversal function that adds @INDEX records during a re index
3340 */
3341 static int re_index(struct ldb_kv_private *ldb_kv,
3342                     struct ldb_val key,
3343                     struct ldb_val val,
3344                     void *state)
3345 {
3346         struct ldb_context *ldb;
3347         struct ldb_kv_reindex_context *ctx =
3348             (struct ldb_kv_reindex_context *)state;
3349         struct ldb_module *module = ldb_kv->module;
3350         struct ldb_message *msg;
3351         int ret;
3352         bool is_record;
3353
3354         ldb = ldb_module_get_ctx(module);
3355
3356         is_record = ldb_kv_key_is_normal_record(key);
3357         if (is_record == false) {
3358                 return 0;
3359         }
3360
3361         msg = ldb_msg_new(module);
3362         if (msg == NULL) {
3363                 return -1;
3364         }
3365
3366         ret = ldb_unpack_data(ldb, &val, msg);
3367         if (ret != 0) {
3368                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3369                                                 ldb_dn_get_linearized(msg->dn));
3370                 ctx->error = ret;
3371                 talloc_free(msg);
3372                 return -1;
3373         }
3374
3375         if (msg->dn == NULL) {
3376                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3377                           "Refusing to re-index as GUID "
3378                           "key %*.*s with no DN\n",
3379                           (int)key.length, (int)key.length,
3380                           (char *)key.data);
3381                 talloc_free(msg);
3382                 return -1;
3383         }
3384
3385         ret = ldb_kv_index_onelevel(module, msg, 1);
3386         if (ret != LDB_SUCCESS) {
3387                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3388                           "Adding special ONE LEVEL index failed (%s)!",
3389                                                 ldb_dn_get_linearized(msg->dn));
3390                 talloc_free(msg);
3391                 return -1;
3392         }
3393
3394         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3395
3396         if (ret != LDB_SUCCESS) {
3397                 ctx->error = ret;
3398                 talloc_free(msg);
3399                 return -1;
3400         }
3401
3402         talloc_free(msg);
3403
3404         ctx->count++;
3405         if (ctx->count % 10000 == 0) {
3406                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3407                           "Reindexing: re-indexed %u records so far",
3408                           ctx->count);
3409         }
3410
3411         return 0;
3412 }
3413
3414 static int re_pack(struct ldb_kv_private *ldb_kv,
3415                    struct ldb_val key,
3416                    struct ldb_val val,
3417                    void *state)
3418 {
3419         struct ldb_context *ldb;
3420         struct ldb_message *msg;
3421         struct ldb_module *module = ldb_kv->module;
3422         struct ldb_kv_repack_context *ctx =
3423             (struct ldb_kv_repack_context *)state;
3424         int ret;
3425
3426         ldb = ldb_module_get_ctx(module);
3427
3428         msg = ldb_msg_new(module);
3429         if (msg == NULL) {
3430                 return -1;
3431         }
3432
3433         ret = ldb_unpack_data(ldb, &val, msg);
3434         if (ret != 0) {
3435                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3436                           ldb_dn_get_linearized(msg->dn));
3437                 ctx->error = ret;
3438                 talloc_free(msg);
3439                 return -1;
3440         }
3441
3442         ret = ldb_kv_store(module, msg, TDB_MODIFY);
3443         if (ret != LDB_SUCCESS) {
3444                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3445                           ldb_dn_get_linearized(msg->dn));
3446                 ctx->error = ret;
3447                 talloc_free(msg);
3448                 return -1;
3449         }
3450
3451         /*
3452          * Warn the user that we're repacking the first time we see a normal
3453          * record. This means we never warn if we're repacking a database with
3454          * only @ records. This is because during database initialisation,
3455          * we might repack as initial settings are written out, and we don't
3456          * want to spam the log.
3457          */
3458         if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3459                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3460                           "Repacking database with format %#010x",
3461                           ldb_kv->pack_format_version);
3462                 ctx->normal_record_seen = true;
3463         }
3464
3465         ctx->count++;
3466         if (ctx->count % 10000 == 0) {
3467                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3468                           "Repack: re-packed %u records so far",
3469                           ctx->count);
3470         }
3471
3472         return 0;
3473 }
3474
3475 int ldb_kv_repack(struct ldb_module *module)
3476 {
3477         struct ldb_kv_private *ldb_kv = talloc_get_type(
3478             ldb_module_get_private(module), struct ldb_kv_private);
3479         struct ldb_context *ldb = ldb_module_get_ctx(module);
3480         struct ldb_kv_repack_context ctx;
3481         int ret;
3482
3483         ctx.count = 0;
3484         ctx.error = LDB_SUCCESS;
3485         ctx.normal_record_seen = false;
3486
3487         /* Iterate all database records and repack them in the new format */
3488         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3489         if (ret < 0) {
3490                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3491                           ldb_errstring(ldb));
3492                 return LDB_ERR_OPERATIONS_ERROR;
3493         }
3494
3495         if (ctx.error != LDB_SUCCESS) {
3496                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3497                           ldb_errstring(ldb));
3498                 return ctx.error;
3499         }
3500
3501         return LDB_SUCCESS;
3502 }
3503
3504 /*
3505   force a complete reindex of the database
3506 */
3507 int ldb_kv_reindex(struct ldb_module *module)
3508 {
3509         struct ldb_kv_private *ldb_kv = talloc_get_type(
3510             ldb_module_get_private(module), struct ldb_kv_private);
3511         int ret;
3512         struct ldb_kv_reindex_context ctx;
3513         size_t index_cache_size = 0;
3514
3515         /*
3516          * Only triggered after a modification, but make clear we do
3517          * not re-index a read-only DB
3518          */
3519         if (ldb_kv->read_only) {
3520                 return LDB_ERR_UNWILLING_TO_PERFORM;
3521         }
3522
3523         if (ldb_kv_cache_reload(module) != 0) {
3524                 return LDB_ERR_OPERATIONS_ERROR;
3525         }
3526
3527         /*
3528          * Ensure we read (and so remove) the entries from the real
3529          * DB, no values stored so far are any use as we want to do a
3530          * re-index
3531          */
3532         ldb_kv_index_transaction_cancel(module);
3533
3534         /*
3535          * Calculate the size of the index cache that we'll need for
3536          * the re-index
3537          */
3538         index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3539         if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3540                 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3541         }
3542
3543         ret = ldb_kv_index_transaction_start(module, index_cache_size);
3544         if (ret != LDB_SUCCESS) {
3545                 return ret;
3546         }
3547
3548         /* first traverse the database deleting any @INDEX records by
3549          * putting NULL entries in the in-memory tdb
3550          */
3551         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3552         if (ret < 0) {
3553                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3554                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3555                                        ldb_errstring(ldb));
3556                 return LDB_ERR_OPERATIONS_ERROR;
3557         }
3558
3559         ctx.error = 0;
3560         ctx.count = 0;
3561
3562         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3563         if (ret < 0) {
3564                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3565                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3566                                        ldb_errstring(ldb));
3567                 return LDB_ERR_OPERATIONS_ERROR;
3568         }
3569
3570         if (ctx.error != LDB_SUCCESS) {
3571                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3572                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3573                 return ctx.error;
3574         }
3575
3576         ctx.error = 0;
3577         ctx.count = 0;
3578
3579         /* now traverse adding any indexes for normal LDB records */
3580         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3581         if (ret < 0) {
3582                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3583                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3584                                        ldb_errstring(ldb));
3585                 return LDB_ERR_OPERATIONS_ERROR;
3586         }
3587
3588         if (ctx.error != LDB_SUCCESS) {
3589                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3590                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3591                 return ctx.error;
3592         }
3593
3594         if (ctx.count > 10000) {
3595                 ldb_debug(ldb_module_get_ctx(module),
3596                           LDB_DEBUG_WARNING,
3597                           "Reindexing: re_index successful on %s, "
3598                           "final index write-out will be in transaction commit",
3599                           ldb_kv->kv_ops->name(ldb_kv));
3600         }
3601         return LDB_SUCCESS;
3602 }