lib ldb key value: use TALLOC_FREE() per README.Coding
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151 #include "lib/util/attr.h"
152
153 struct dn_list {
154         unsigned int count;
155         struct ldb_val *dn;
156         /*
157          * Do not optimise the intersection of this list,
158          * we must never return an entry not in this
159          * list.  This allows the index for
160          * SCOPE_ONELEVEL to be trusted.
161          */
162         bool strict;
163 };
164
165 struct ldb_kv_idxptr {
166         /*
167          * In memory tdb to cache the index updates performed during a
168          * transaction.  This improves the performance of operations like
169          * re-index and join
170          */
171         struct tdb_context *itdb;
172         int error;
173 };
174
175 enum key_truncation {
176         KEY_NOT_TRUNCATED,
177         KEY_TRUNCATED,
178 };
179
180 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
181                                       const struct ldb_message *msg,
182                                       int add);
183 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
184                                    struct ldb_kv_private *ldb_kv,
185                                    struct ldb_dn *base_dn,
186                                    struct dn_list *dn_list,
187                                    enum key_truncation *truncation);
188
189 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
190                                 struct dn_list *list);
191
192 /* we put a @IDXVERSION attribute on index entries. This
193    allows us to tell if it was written by an older version
194 */
195 #define LDB_KV_INDEXING_VERSION 2
196
197 #define LDB_KV_GUID_INDEXING_VERSION 3
198
199 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
200 {
201         if (ldb_kv->max_key_length == 0) {
202                 return UINT_MAX;
203         }
204         return ldb_kv->max_key_length;
205 }
206
207 /* enable the idxptr mode when transactions start */
208 int ldb_kv_index_transaction_start(
209         struct ldb_module *module,
210         size_t cache_size)
211 {
212         struct ldb_kv_private *ldb_kv = talloc_get_type(
213             ldb_module_get_private(module), struct ldb_kv_private);
214         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
215         if (ldb_kv->idxptr == NULL) {
216                 return ldb_oom(ldb_module_get_ctx(module));
217         }
218
219         ldb_kv->idxptr->itdb = tdb_open(
220                 NULL,
221                 cache_size,
222                 TDB_INTERNAL,
223                 O_RDWR,
224                 0);
225         if (ldb_kv->idxptr->itdb == NULL) {
226                 return LDB_ERR_OPERATIONS_ERROR;
227         }
228
229         return LDB_SUCCESS;
230 }
231
232 /*
233   see if two ldb_val structures contain exactly the same data
234   return -1 or 1 for a mismatch, 0 for match
235 */
236 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
237                                          const struct ldb_val *v2)
238 {
239         if (v1->length > v2->length) {
240                 return -1;
241         }
242         if (v1->length < v2->length) {
243                 return 1;
244         }
245         return memcmp(v1->data, v2->data, v1->length);
246 }
247
248 /*
249   see if two ldb_val structures contain exactly the same data
250   return -1 or 1 for a mismatch, 0 for match
251 */
252 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
253                                        const struct ldb_val *v2)
254 {
255         if (v1.length > v2->length) {
256                 return -1;
257         }
258         if (v1.length < v2->length) {
259                 return 1;
260         }
261         return memcmp(v1.data, v2->data, v1.length);
262 }
263
264
265 /*
266   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
267   binary-safe comparison for the 'dn' returns -1 if not found
268
269   This is therefore safe when the value is a GUID in the future
270  */
271 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
272                                    const struct dn_list *list,
273                                    const struct ldb_val *v)
274 {
275         unsigned int i;
276         struct ldb_val *exact = NULL, *next = NULL;
277
278         if (ldb_kv->cache->GUID_index_attribute == NULL) {
279                 for (i=0; i<list->count; i++) {
280                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
281                                 return i;
282                         }
283                 }
284                 return -1;
285         }
286
287         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
288                                 *v, ldb_val_equal_exact_ordered,
289                                 exact, next);
290         if (exact == NULL) {
291                 return -1;
292         }
293         /* Not required, but keeps the compiler quiet */
294         if (next != NULL) {
295                 return -1;
296         }
297
298         i = exact - list->dn;
299         return i;
300 }
301
302 /*
303   find a entry in a dn_list. Uses a case sensitive comparison with the dn
304   returns -1 if not found
305  */
306 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
307                                    struct dn_list *list,
308                                    const struct ldb_message *msg)
309 {
310         struct ldb_val v;
311         const struct ldb_val *key_val;
312         if (ldb_kv->cache->GUID_index_attribute == NULL) {
313                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
314                 v.data = discard_const_p(unsigned char, dn_str);
315                 v.length = strlen(dn_str);
316         } else {
317                 key_val = ldb_msg_find_ldb_val(
318                     msg, ldb_kv->cache->GUID_index_attribute);
319                 if (key_val == NULL) {
320                         return -1;
321                 }
322                 v = *key_val;
323         }
324         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
325 }
326
327 /*
328   this is effectively a cast function, but with lots of paranoia
329   checks and also copes with CPUs that are fussy about pointer
330   alignment
331  */
332 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
333                                            TDB_DATA rec)
334 {
335         struct dn_list *list;
336         if (rec.dsize != sizeof(void *)) {
337                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
338                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
339                 return NULL;
340         }
341         /* note that we can't just use a cast here, as rec.dptr may
342            not be aligned sufficiently for a pointer. A cast would cause
343            platforms like some ARM CPUs to crash */
344         memcpy(&list, rec.dptr, sizeof(void *));
345         list = talloc_get_type(list, struct dn_list);
346         if (list == NULL) {
347                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
348                                        "Bad type '%s' for idxptr",
349                                        talloc_get_name(list));
350                 return NULL;
351         }
352         return list;
353 }
354
355 enum dn_list_will_be_read_only {
356         DN_LIST_MUTABLE = 0,
357         DN_LIST_WILL_BE_READ_ONLY = 1,
358 };
359
360 /*
361   return the @IDX list in an index entry for a dn as a
362   struct dn_list
363  */
364 static int ldb_kv_dn_list_load(struct ldb_module *module,
365                                struct ldb_kv_private *ldb_kv,
366                                struct ldb_dn *dn,
367                                struct dn_list *list,
368                                enum dn_list_will_be_read_only read_only)
369 {
370         struct ldb_message *msg;
371         int ret, version;
372         struct ldb_message_element *el;
373         TDB_DATA rec = {0};
374         struct dn_list *list2;
375         bool from_primary_cache = false;
376         TDB_DATA key = {0};
377
378         list->dn = NULL;
379         list->count = 0;
380         list->strict = false;
381
382         /*
383          * See if we have an in memory index cache
384          */
385         if (ldb_kv->idxptr == NULL) {
386                 goto normal_index;
387         }
388
389         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
390         key.dsize = strlen((char *)key.dptr);
391
392         /*
393          * Have we cached this index record?
394          * If we have a nested transaction cache try that first.
395          * then try the transaction cache.
396          * if the record is not cached it will need to be read from disk.
397          */
398         if (ldb_kv->nested_idx_ptr != NULL) {
399                 rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
400         }
401         if (rec.dptr == NULL) {
402                 from_primary_cache = true;
403                 rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
404         }
405         if (rec.dptr == NULL) {
406                 goto normal_index;
407         }
408
409         /* we've found an in-memory index entry */
410         list2 = ldb_kv_index_idxptr(module, rec);
411         if (list2 == NULL) {
412                 free(rec.dptr);
413                 return LDB_ERR_OPERATIONS_ERROR;
414         }
415         free(rec.dptr);
416
417         /*
418          * If this is a read only transaction the indexes will not be
419          * changed so we don't need a copy in the event of a rollback
420          *
421          * In this case make an early return
422          */
423         if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
424                 *list = *list2;
425                 return LDB_SUCCESS;
426         }
427
428         /*
429          * record was read from the sub transaction cache, so we have
430          * already copied the primary cache record
431          */
432         if (!from_primary_cache) {
433                 *list = *list2;
434                 return LDB_SUCCESS;
435         }
436
437         /*
438          * No index sub transaction active, so no need to cache a copy
439          */
440         if (ldb_kv->nested_idx_ptr == NULL) {
441                 *list = *list2;
442                 return LDB_SUCCESS;
443         }
444
445         /*
446          * There is an active index sub transaction, and the record was
447          * found in the primary index transaction cache.  A copy of the
448          * record needs be taken to prevent the original entry being
449          * altered, until the index sub transaction is committed.
450          */
451
452         {
453                 struct ldb_val *dns = NULL;
454                 size_t x = 0;
455
456                 dns = talloc_array(
457                         list,
458                         struct ldb_val,
459                         list2->count);
460                 if (dns == NULL) {
461                         return LDB_ERR_OPERATIONS_ERROR;
462                 }
463                 for (x = 0; x < list2->count; x++) {
464                         dns[x].length = list2->dn[x].length;
465                         dns[x].data = talloc_memdup(
466                                 dns,
467                                 list2->dn[x].data,
468                                 list2->dn[x].length);
469                         if (dns[x].data == NULL) {
470                                 TALLOC_FREE(dns);
471                                 return LDB_ERR_OPERATIONS_ERROR;
472                         }
473                 }
474                 list->dn = dns;
475                 list->count = list2->count;
476         }
477         return LDB_SUCCESS;
478
479         /*
480          * Index record not found in the caches, read it from the
481          * database.
482          */
483 normal_index:
484         msg = ldb_msg_new(list);
485         if (msg == NULL) {
486                 return LDB_ERR_OPERATIONS_ERROR;
487         }
488
489         ret = ldb_kv_search_dn1(module,
490                                 dn,
491                                 msg,
492                                 LDB_UNPACK_DATA_FLAG_NO_DN |
493                                 /*
494                                  * The entry point ldb_kv_search_indexed is
495                                  * only called from the read-locked
496                                  * ldb_kv_search.
497                                  */
498                                 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
499         if (ret != LDB_SUCCESS) {
500                 talloc_free(msg);
501                 return ret;
502         }
503
504         el = ldb_msg_find_element(msg, LDB_KV_IDX);
505         if (!el) {
506                 talloc_free(msg);
507                 return LDB_SUCCESS;
508         }
509
510         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
511
512         /*
513          * we avoid copying the strings by stealing the list.  We have
514          * to steal msg onto el->values (which looks odd) because
515          * the memory is allocated on msg, not on each value.
516          */
517         if (ldb_kv->cache->GUID_index_attribute == NULL) {
518                 /* check indexing version number */
519                 if (version != LDB_KV_INDEXING_VERSION) {
520                         ldb_debug_set(ldb_module_get_ctx(module),
521                                       LDB_DEBUG_ERROR,
522                                       "Wrong DN index version %d "
523                                       "expected %d for %s",
524                                       version, LDB_KV_INDEXING_VERSION,
525                                       ldb_dn_get_linearized(dn));
526                         talloc_free(msg);
527                         return LDB_ERR_OPERATIONS_ERROR;
528                 }
529
530                 talloc_steal(el->values, msg);
531                 list->dn = talloc_steal(list, el->values);
532                 list->count = el->num_values;
533         } else {
534                 unsigned int i;
535                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
536                         /* This is quite likely during the DB startup
537                            on first upgrade to using a GUID index */
538                         ldb_debug_set(ldb_module_get_ctx(module),
539                                       LDB_DEBUG_ERROR,
540                                       "Wrong GUID index version %d "
541                                       "expected %d for %s",
542                                       version, LDB_KV_GUID_INDEXING_VERSION,
543                                       ldb_dn_get_linearized(dn));
544                         talloc_free(msg);
545                         return LDB_ERR_OPERATIONS_ERROR;
546                 }
547
548                 if (el->num_values == 0) {
549                         talloc_free(msg);
550                         return LDB_ERR_OPERATIONS_ERROR;
551                 }
552
553                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
554                         talloc_free(msg);
555                         return LDB_ERR_OPERATIONS_ERROR;
556                 }
557
558                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
559                 list->dn = talloc_array(list, struct ldb_val, list->count);
560                 if (list->dn == NULL) {
561                         talloc_free(msg);
562                         return LDB_ERR_OPERATIONS_ERROR;
563                 }
564
565                 /*
566                  * The actual data is on msg.
567                  */
568                 talloc_steal(list->dn, msg);
569                 for (i = 0; i < list->count; i++) {
570                         list->dn[i].data
571                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
572                         list->dn[i].length = LDB_KV_GUID_SIZE;
573                 }
574         }
575
576         /* We don't need msg->elements any more */
577         talloc_free(msg->elements);
578         return LDB_SUCCESS;
579 }
580
581 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
582                            struct ldb_kv_private *ldb_kv,
583                            TALLOC_CTX *mem_ctx,
584                            struct ldb_dn *dn,
585                            struct ldb_val *ldb_key)
586 {
587         struct ldb_context *ldb = ldb_module_get_ctx(module);
588         int ret;
589         int index = 0;
590         enum key_truncation truncation = KEY_NOT_TRUNCATED;
591         struct dn_list *list = talloc(mem_ctx, struct dn_list);
592         if (list == NULL) {
593                 ldb_oom(ldb);
594                 return LDB_ERR_OPERATIONS_ERROR;
595         }
596
597         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
598         if (ret != LDB_SUCCESS) {
599                 TALLOC_FREE(list);
600                 return ret;
601         }
602
603         if (list->count == 0) {
604                 TALLOC_FREE(list);
605                 return LDB_ERR_NO_SUCH_OBJECT;
606         }
607
608         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
609                 const char *dn_str = ldb_dn_get_linearized(dn);
610                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
611                                        __location__
612                                        ": Failed to read DN index "
613                                        "against %s for %s: too many "
614                                        "values (%u > 1)",
615                                        ldb_kv->cache->GUID_index_attribute,
616                                        dn_str,
617                                        list->count);
618                 TALLOC_FREE(list);
619                 return LDB_ERR_CONSTRAINT_VIOLATION;
620         }
621
622         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
623                 /*
624                  * DN key has been truncated, need to inspect the actual
625                  * records to locate the actual DN
626                  */
627                 unsigned int i;
628                 index = -1;
629                 for (i=0; i < list->count; i++) {
630                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
631                         struct ldb_val key = {
632                                 .data = guid_key,
633                                 .length = sizeof(guid_key)
634                         };
635                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
636                         struct ldb_message *rec = ldb_msg_new(ldb);
637                         if (rec == NULL) {
638                                 TALLOC_FREE(list);
639                                 return LDB_ERR_OPERATIONS_ERROR;
640                         }
641
642                         ret = ldb_kv_idx_to_key(
643                             module, ldb_kv, ldb, &list->dn[i], &key);
644                         if (ret != LDB_SUCCESS) {
645                                 TALLOC_FREE(list);
646                                 TALLOC_FREE(rec);
647                                 return ret;
648                         }
649
650                         ret =
651                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
652                         if (key.data != guid_key) {
653                                 TALLOC_FREE(key.data);
654                         }
655                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
656                                 /*
657                                  * the record has disappeared?
658                                  * yes, this can happen
659                                  */
660                                 TALLOC_FREE(rec);
661                                 continue;
662                         }
663
664                         if (ret != LDB_SUCCESS) {
665                                 /* an internal error */
666                                 TALLOC_FREE(rec);
667                                 TALLOC_FREE(list);
668                                 return LDB_ERR_OPERATIONS_ERROR;
669                         }
670
671                         /*
672                          * We found the actual DN that we wanted from in the
673                          * multiple values that matched the index
674                          * (due to truncation), so return that.
675                          *
676                          */
677                         if (ldb_dn_compare(dn, rec->dn) == 0) {
678                                 index = i;
679                                 TALLOC_FREE(rec);
680                                 break;
681                         }
682                 }
683
684                 /*
685                  * We matched the index but the actual DN we wanted
686                  * was not here.
687                  */
688                 if (index == -1) {
689                         TALLOC_FREE(list);
690                         return LDB_ERR_NO_SUCH_OBJECT;
691                 }
692         }
693
694         /* The ldb_key memory is allocated by the caller */
695         ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
696         TALLOC_FREE(list);
697
698         if (ret != LDB_SUCCESS) {
699                 return LDB_ERR_OPERATIONS_ERROR;
700         }
701
702         return LDB_SUCCESS;
703 }
704
705
706
707 /*
708   save a dn_list into a full @IDX style record
709  */
710 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
711                                      struct ldb_kv_private *ldb_kv,
712                                      struct ldb_dn *dn,
713                                      struct dn_list *list)
714 {
715         struct ldb_message *msg;
716         int ret;
717
718         msg = ldb_msg_new(module);
719         if (!msg) {
720                 return ldb_module_oom(module);
721         }
722
723         msg->dn = dn;
724
725         if (list->count == 0) {
726                 ret = ldb_kv_delete_noindex(module, msg);
727                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
728                         ret = LDB_SUCCESS;
729                 }
730                 TALLOC_FREE(msg);
731                 return ret;
732         }
733
734         if (ldb_kv->cache->GUID_index_attribute == NULL) {
735                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
736                                       LDB_KV_INDEXING_VERSION);
737                 if (ret != LDB_SUCCESS) {
738                         TALLOC_FREE(msg);
739                         return ldb_module_oom(module);
740                 }
741         } else {
742                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
743                                       LDB_KV_GUID_INDEXING_VERSION);
744                 if (ret != LDB_SUCCESS) {
745                         TALLOC_FREE(msg);
746                         return ldb_module_oom(module);
747                 }
748         }
749
750         if (list->count > 0) {
751                 struct ldb_message_element *el;
752
753                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
754                 if (ret != LDB_SUCCESS) {
755                         TALLOC_FREE(msg);
756                         return ldb_module_oom(module);
757                 }
758
759                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
760                         el->values = list->dn;
761                         el->num_values = list->count;
762                 } else {
763                         struct ldb_val v;
764                         unsigned int i;
765                         el->values = talloc_array(msg,
766                                                   struct ldb_val, 1);
767                         if (el->values == NULL) {
768                                 TALLOC_FREE(msg);
769                                 return ldb_module_oom(module);
770                         }
771
772                         v.data = talloc_array_size(el->values,
773                                                    list->count,
774                                                    LDB_KV_GUID_SIZE);
775                         if (v.data == NULL) {
776                                 TALLOC_FREE(msg);
777                                 return ldb_module_oom(module);
778                         }
779
780                         v.length = talloc_get_size(v.data);
781
782                         for (i = 0; i < list->count; i++) {
783                                 if (list->dn[i].length !=
784                                     LDB_KV_GUID_SIZE) {
785                                         TALLOC_FREE(msg);
786                                         return ldb_module_operr(module);
787                                 }
788                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
789                                        list->dn[i].data,
790                                        LDB_KV_GUID_SIZE);
791                         }
792                         el->values[0] = v;
793                         el->num_values = 1;
794                 }
795         }
796
797         ret = ldb_kv_store(module, msg, TDB_REPLACE);
798         TALLOC_FREE(msg);
799         return ret;
800 }
801
802 /*
803   save a dn_list into the database, in either @IDX or internal format
804  */
805 static int ldb_kv_dn_list_store(struct ldb_module *module,
806                                 struct ldb_dn *dn,
807                                 struct dn_list *list)
808 {
809         struct ldb_kv_private *ldb_kv = talloc_get_type(
810             ldb_module_get_private(module), struct ldb_kv_private);
811         TDB_DATA rec = {0};
812         TDB_DATA key = {0};
813
814         int ret = LDB_SUCCESS;
815         struct dn_list *list2 = NULL;
816         struct ldb_kv_idxptr *idxptr = NULL;
817
818         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
819         if (key.dptr == NULL) {
820                 return LDB_ERR_OPERATIONS_ERROR;
821         }
822         key.dsize = strlen((char *)key.dptr);
823
824         /*
825          * If there is an index sub transaction active, update the
826          * sub transaction index cache.  Otherwise update the
827          * primary index cache
828          */
829         if (ldb_kv->nested_idx_ptr != NULL) {
830                 idxptr = ldb_kv->nested_idx_ptr;
831         } else {
832                 idxptr = ldb_kv->idxptr;
833         }
834         /*
835          * Get the cache entry for the index
836          *
837          * As the value in the cache is a pointer to a dn_list we update
838          * the dn_list directly.
839          *
840          */
841         rec = tdb_fetch(idxptr->itdb, key);
842         if (rec.dptr != NULL) {
843                 list2 = ldb_kv_index_idxptr(module, rec);
844                 if (list2 == NULL) {
845                         free(rec.dptr);
846                         return LDB_ERR_OPERATIONS_ERROR;
847                 }
848                 free(rec.dptr);
849                 /* Now put the updated pointer back in the cache */
850                 if (list->dn == NULL) {
851                         list2->dn = NULL;
852                         list2->count = 0;
853                 } else {
854                         list2->dn = talloc_steal(list2, list->dn);
855                         list2->count = list->count;
856                 }
857                 return LDB_SUCCESS;
858         }
859
860         list2 = talloc(idxptr, struct dn_list);
861         if (list2 == NULL) {
862                 return LDB_ERR_OPERATIONS_ERROR;
863         }
864         list2->dn = talloc_steal(list2, list->dn);
865         list2->count = list->count;
866
867         rec.dptr = (uint8_t *)&list2;
868         rec.dsize = sizeof(void *);
869
870
871         /*
872          * This is not a store into the main DB, but into an in-memory
873          * TDB, so we don't need a guard on ltdb->read_only
874          *
875          * Also as we directly update the in memory dn_list for existing
876          * cache entries we must be adding a new entry to the cache.
877          */
878         ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
879         if (ret != 0) {
880                 return ltdb_err_map( tdb_error(idxptr->itdb));
881         }
882         return LDB_SUCCESS;
883 }
884
885 /*
886   traverse function for storing the in-memory index entries on disk
887  */
888 static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
889                                        TDB_DATA key,
890                                        TDB_DATA data,
891                                        void *state)
892 {
893         struct ldb_module *module = state;
894         struct ldb_kv_private *ldb_kv = talloc_get_type(
895             ldb_module_get_private(module), struct ldb_kv_private);
896         struct ldb_dn *dn;
897         struct ldb_context *ldb = ldb_module_get_ctx(module);
898         struct ldb_val v;
899         struct dn_list *list;
900
901         list = ldb_kv_index_idxptr(module, data);
902         if (list == NULL) {
903                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
904                 return -1;
905         }
906
907         v.data = key.dptr;
908         v.length = strnlen((char *)key.dptr, key.dsize);
909
910         dn = ldb_dn_from_ldb_val(module, ldb, &v);
911         if (dn == NULL) {
912                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
913                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
914                 return -1;
915         }
916
917         ldb_kv->idxptr->error =
918             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
919         talloc_free(dn);
920         if (ldb_kv->idxptr->error != 0) {
921                 return -1;
922         }
923         return 0;
924 }
925
926 /* cleanup the idxptr mode when transaction commits */
927 int ldb_kv_index_transaction_commit(struct ldb_module *module)
928 {
929         struct ldb_kv_private *ldb_kv = talloc_get_type(
930             ldb_module_get_private(module), struct ldb_kv_private);
931         int ret;
932
933         struct ldb_context *ldb = ldb_module_get_ctx(module);
934
935         ldb_reset_err_string(ldb);
936
937         if (ldb_kv->idxptr->itdb) {
938                 tdb_traverse(
939                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
940                 tdb_close(ldb_kv->idxptr->itdb);
941         }
942
943         ret = ldb_kv->idxptr->error;
944         if (ret != LDB_SUCCESS) {
945                 if (!ldb_errstring(ldb)) {
946                         ldb_set_errstring(ldb, ldb_strerror(ret));
947                 }
948                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
949         }
950
951         talloc_free(ldb_kv->idxptr);
952         ldb_kv->idxptr = NULL;
953         return ret;
954 }
955
956 /* cleanup the idxptr mode when transaction cancels */
957 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
958 {
959         struct ldb_kv_private *ldb_kv = talloc_get_type(
960             ldb_module_get_private(module), struct ldb_kv_private);
961         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
962                 tdb_close(ldb_kv->idxptr->itdb);
963         }
964         TALLOC_FREE(ldb_kv->idxptr);
965         if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
966                 tdb_close(ldb_kv->nested_idx_ptr->itdb);
967         }
968         TALLOC_FREE(ldb_kv->nested_idx_ptr);
969         return LDB_SUCCESS;
970 }
971
972
973 /*
974   return the dn key to be used for an index
975   the caller is responsible for freeing
976 */
977 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
978                                        struct ldb_kv_private *ldb_kv,
979                                        const char *attr,
980                                        const struct ldb_val *value,
981                                        const struct ldb_schema_attribute **ap,
982                                        enum key_truncation *truncation)
983 {
984         struct ldb_dn *ret;
985         struct ldb_val v;
986         const struct ldb_schema_attribute *a = NULL;
987         char *attr_folded = NULL;
988         const char *attr_for_dn = NULL;
989         int r;
990         bool should_b64_encode;
991
992         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
993         size_t key_len = 0;
994         size_t attr_len = 0;
995         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
996         unsigned frmt_len = 0;
997         const size_t additional_key_length = 4;
998         unsigned int num_separators = 3; /* Estimate for overflow check */
999         const size_t min_data = 1;
1000         const size_t min_key_length = additional_key_length
1001                 + indx_len + num_separators + min_data;
1002         struct ldb_val empty;
1003
1004         /*
1005          * Accept a NULL value as a request for a key with no value.  This is
1006          * different from passing an empty value, which might be given
1007          * significance by some canonicalise functions.
1008          */
1009         bool empty_val = value == NULL;
1010         if (empty_val) {
1011                 empty.length = 0;
1012                 empty.data = discard_const_p(unsigned char, "");
1013                 value = &empty;
1014         }
1015
1016         if (attr[0] == '@') {
1017                 attr_for_dn = attr;
1018                 v = *value;
1019                 if (ap != NULL) {
1020                         *ap = NULL;
1021                 }
1022         } else {
1023                 attr_folded = ldb_attr_casefold(ldb, attr);
1024                 if (!attr_folded) {
1025                         return NULL;
1026                 }
1027
1028                 attr_for_dn = attr_folded;
1029
1030                 a = ldb_schema_attribute_by_name(ldb, attr);
1031                 if (ap) {
1032                         *ap = a;
1033                 }
1034
1035                 if (empty_val) {
1036                         v = *value;
1037                 } else {
1038                         ldb_attr_handler_t fn;
1039                         if (a->syntax->index_format_fn &&
1040                             ldb_kv->cache->GUID_index_attribute != NULL) {
1041                                 fn = a->syntax->index_format_fn;
1042                         } else {
1043                                 fn = a->syntax->canonicalise_fn;
1044                         }
1045                         r = fn(ldb, ldb, value, &v);
1046                         if (r != LDB_SUCCESS) {
1047                                 const char *errstr = ldb_errstring(ldb);
1048                                 /* canonicalisation can be refused. For
1049                                    example, a attribute that takes wildcards
1050                                    will refuse to canonicalise if the value
1051                                    contains a wildcard */
1052                                 ldb_asprintf_errstring(ldb,
1053                                                        "Failed to create "
1054                                                        "index key for "
1055                                                        "attribute '%s':%s%s%s",
1056                                                        attr, ldb_strerror(r),
1057                                                        (errstr?":":""),
1058                                                        (errstr?errstr:""));
1059                                 talloc_free(attr_folded);
1060                                 return NULL;
1061                         }
1062                 }
1063         }
1064         attr_len = strlen(attr_for_dn);
1065
1066         /*
1067          * Check if there is any hope this will fit into the DB.
1068          * Overflow here is not actually critical the code below
1069          * checks again to make the printf and the DB does another
1070          * check for too long keys
1071          */
1072         if (max_key_length - attr_len < min_key_length) {
1073                 ldb_asprintf_errstring(
1074                         ldb,
1075                         __location__ ": max_key_length "
1076                         "is too small (%u) < (%u)",
1077                         max_key_length,
1078                         (unsigned)(min_key_length + attr_len));
1079                 talloc_free(attr_folded);
1080                 return NULL;
1081         }
1082
1083         /*
1084          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
1085          * "DN=" and a trailing string terminator
1086          */
1087         max_key_length -= additional_key_length;
1088
1089         /*
1090          * We do not base 64 encode a DN in a key, it has already been
1091          * casefold and lineraized, that is good enough.  That already
1092          * avoids embedded NUL etc.
1093          */
1094         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1095                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
1096                         should_b64_encode = false;
1097                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
1098                         /*
1099                          * We can only change the behaviour for IDXONE
1100                          * when the GUID index is enabled
1101                          */
1102                         should_b64_encode = false;
1103                 } else {
1104                         should_b64_encode
1105                                 = ldb_should_b64_encode(ldb, &v);
1106                 }
1107         } else {
1108                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
1109         }
1110
1111         if (should_b64_encode) {
1112                 size_t vstr_len = 0;
1113                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1114                 if (!vstr) {
1115                         talloc_free(attr_folded);
1116                         return NULL;
1117                 }
1118                 vstr_len = strlen(vstr);
1119                 /*
1120                  * Overflow here is not critical as we only use this
1121                  * to choose the printf truncation
1122                  */
1123                 key_len = num_separators + indx_len + attr_len + vstr_len;
1124                 if (key_len > max_key_length) {
1125                         size_t excess = key_len - max_key_length;
1126                         frmt_len = vstr_len - excess;
1127                         *truncation = KEY_TRUNCATED;
1128                         /*
1129                         * Truncated keys are placed in a separate key space
1130                         * from the non truncated keys
1131                         * Note: the double hash "##" is not a typo and
1132                         * indicates that the following value is base64 encoded
1133                         */
1134                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1135                                              LDB_KV_INDEX, attr_for_dn,
1136                                              frmt_len, vstr);
1137                 } else {
1138                         frmt_len = vstr_len;
1139                         *truncation = KEY_NOT_TRUNCATED;
1140                         /*
1141                          * Note: the double colon "::" is not a typo and
1142                          * indicates that the following value is base64 encoded
1143                          */
1144                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1145                                              LDB_KV_INDEX, attr_for_dn,
1146                                              frmt_len, vstr);
1147                 }
1148                 talloc_free(vstr);
1149         } else {
1150                 /* Only need two seperators */
1151                 num_separators = 2;
1152
1153                 /*
1154                  * Overflow here is not critical as we only use this
1155                  * to choose the printf truncation
1156                  */
1157                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1158                 if (key_len > max_key_length) {
1159                         size_t excess = key_len - max_key_length;
1160                         frmt_len = v.length - excess;
1161                         *truncation = KEY_TRUNCATED;
1162                         /*
1163                          * Truncated keys are placed in a separate key space
1164                          * from the non truncated keys
1165                          */
1166                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1167                                              LDB_KV_INDEX, attr_for_dn,
1168                                              frmt_len, (char *)v.data);
1169                 } else {
1170                         frmt_len = v.length;
1171                         *truncation = KEY_NOT_TRUNCATED;
1172                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1173                                              LDB_KV_INDEX, attr_for_dn,
1174                                              frmt_len, (char *)v.data);
1175                 }
1176         }
1177
1178         if (v.data != value->data && !empty_val) {
1179                 talloc_free(v.data);
1180         }
1181         talloc_free(attr_folded);
1182
1183         return ret;
1184 }
1185
1186 /*
1187   see if a attribute value is in the list of indexed attributes
1188 */
1189 static bool ldb_kv_is_indexed(struct ldb_module *module,
1190                               struct ldb_kv_private *ldb_kv,
1191                               const char *attr)
1192 {
1193         struct ldb_context *ldb = ldb_module_get_ctx(module);
1194         unsigned int i;
1195         struct ldb_message_element *el;
1196
1197         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1198             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1199                 /* Implicity covered, this is the index key */
1200                 return false;
1201         }
1202         if (ldb->schema.index_handler_override) {
1203                 const struct ldb_schema_attribute *a
1204                         = ldb_schema_attribute_by_name(ldb, attr);
1205
1206                 if (a == NULL) {
1207                         return false;
1208                 }
1209
1210                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1211                         return true;
1212                 } else {
1213                         return false;
1214                 }
1215         }
1216
1217         if (!ldb_kv->cache->attribute_indexes) {
1218                 return false;
1219         }
1220
1221         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1222         if (el == NULL) {
1223                 return false;
1224         }
1225
1226         /* TODO: this is too expensive! At least use a binary search */
1227         for (i=0; i<el->num_values; i++) {
1228                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1229                         return true;
1230                 }
1231         }
1232         return false;
1233 }
1234
1235 /*
1236   in the following logic functions, the return value is treated as
1237   follows:
1238
1239      LDB_SUCCESS: we found some matching index values
1240
1241      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1242
1243      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1244                                we'll need a full search
1245  */
1246
1247 /*
1248   return a list of dn's that might match a simple indexed search (an
1249   equality search only)
1250  */
1251 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1252                                   struct ldb_kv_private *ldb_kv,
1253                                   const struct ldb_parse_tree *tree,
1254                                   struct dn_list *list)
1255 {
1256         struct ldb_context *ldb;
1257         struct ldb_dn *dn;
1258         int ret;
1259         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1260
1261         ldb = ldb_module_get_ctx(module);
1262
1263         list->count = 0;
1264         list->dn = NULL;
1265
1266         /* if the attribute isn't in the list of indexed attributes then
1267            this node needs a full search */
1268         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1269                 return LDB_ERR_OPERATIONS_ERROR;
1270         }
1271
1272         /* the attribute is indexed. Pull the list of DNs that match the
1273            search criterion */
1274         dn = ldb_kv_index_key(ldb,
1275                               ldb_kv,
1276                               tree->u.equality.attr,
1277                               &tree->u.equality.value,
1278                               NULL,
1279                               &truncation);
1280         /*
1281          * We ignore truncation here and allow multi-valued matches
1282          * as ltdb_search_indexed will filter out the wrong one in
1283          * ltdb_index_filter() which calls ldb_match_message().
1284          */
1285         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1286
1287         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
1288                                   DN_LIST_WILL_BE_READ_ONLY);
1289         talloc_free(dn);
1290         return ret;
1291 }
1292
1293 static bool list_union(struct ldb_context *ldb,
1294                        struct ldb_kv_private *ldb_kv,
1295                        struct dn_list *list,
1296                        struct dn_list *list2);
1297
1298 /*
1299   return a list of dn's that might match a leaf indexed search
1300  */
1301 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1302                                 struct ldb_kv_private *ldb_kv,
1303                                 const struct ldb_parse_tree *tree,
1304                                 struct dn_list *list)
1305 {
1306         if (ldb_kv->disallow_dn_filter &&
1307             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1308                 /* in AD mode we do not support "(dn=...)" search filters */
1309                 list->dn = NULL;
1310                 list->count = 0;
1311                 return LDB_SUCCESS;
1312         }
1313         if (tree->u.equality.attr[0] == '@') {
1314                 /* Do not allow a indexed search against an @ */
1315                 list->dn = NULL;
1316                 list->count = 0;
1317                 return LDB_SUCCESS;
1318         }
1319         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1320                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1321                 bool valid_dn = false;
1322                 struct ldb_dn *dn
1323                         = ldb_dn_from_ldb_val(list,
1324                                               ldb_module_get_ctx(module),
1325                                               &tree->u.equality.value);
1326                 if (dn == NULL) {
1327                         /* If we can't parse it, no match */
1328                         list->dn = NULL;
1329                         list->count = 0;
1330                         return LDB_SUCCESS;
1331                 }
1332
1333                 valid_dn = ldb_dn_validate(dn);
1334                 if (valid_dn == false) {
1335                         /* If we can't parse it, no match */
1336                         list->dn = NULL;
1337                         list->count = 0;
1338                         return LDB_SUCCESS;
1339                 }
1340
1341                 /*
1342                  * Re-use the same code we use for a SCOPE_BASE
1343                  * search
1344                  *
1345                  * We can't call TALLOC_FREE(dn) as this must belong
1346                  * to list for the memory to remain valid.
1347                  */
1348                 return ldb_kv_index_dn_base_dn(
1349                     module, ldb_kv, dn, list, &truncation);
1350                 /*
1351                  * We ignore truncation here and allow multi-valued matches
1352                  * as ltdb_search_indexed will filter out the wrong one in
1353                  * ltdb_index_filter() which calls ldb_match_message().
1354                  */
1355
1356         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1357                    (ldb_attr_cmp(tree->u.equality.attr,
1358                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1359                 int ret;
1360                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1361                 list->dn = talloc_array(list, struct ldb_val, 1);
1362                 if (list->dn == NULL) {
1363                         ldb_module_oom(module);
1364                         return LDB_ERR_OPERATIONS_ERROR;
1365                 }
1366                 /*
1367                  * We need to go via the canonicalise_fn() to
1368                  * ensure we get the index in binary, rather
1369                  * than a string
1370                  */
1371                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1372                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1373                 if (ret != LDB_SUCCESS) {
1374                         return LDB_ERR_OPERATIONS_ERROR;
1375                 }
1376                 list->count = 1;
1377                 return LDB_SUCCESS;
1378         }
1379
1380         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1381 }
1382
1383
1384 /*
1385   list intersection
1386   list = list & list2
1387 */
1388 static bool list_intersect(struct ldb_kv_private *ldb_kv,
1389                            struct dn_list *list,
1390                            const struct dn_list *list2)
1391 {
1392         const struct dn_list *short_list, *long_list;
1393         struct dn_list *list3;
1394         unsigned int i;
1395
1396         if (list->count == 0) {
1397                 /* 0 & X == 0 */
1398                 return true;
1399         }
1400         if (list2->count == 0) {
1401                 /* X & 0 == 0 */
1402                 list->count = 0;
1403                 list->dn = NULL;
1404                 return true;
1405         }
1406
1407         /*
1408          * In both of the below we check for strict and in that
1409          * case do not optimise the intersection of this list,
1410          * we must never return an entry not in this
1411          * list.  This allows the index for
1412          * SCOPE_ONELEVEL to be trusted.
1413          */
1414
1415         /* the indexing code is allowed to return a longer list than
1416            what really matches, as all results are filtered by the
1417            full expression at the end - this shortcut avoids a lot of
1418            work in some cases */
1419         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1420                 return true;
1421         }
1422         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1423                 list->count = list2->count;
1424                 list->dn = list2->dn;
1425                 /* note that list2 may not be the parent of list2->dn,
1426                    as list2->dn may be owned by ltdb->idxptr. In that
1427                    case we expect this reparent call to fail, which is
1428                    OK */
1429                 talloc_reparent(list2, list, list2->dn);
1430                 return true;
1431         }
1432
1433         if (list->count > list2->count) {
1434                 short_list = list2;
1435                 long_list = list;
1436         } else {
1437                 short_list = list;
1438                 long_list = list2;
1439         }
1440
1441         list3 = talloc_zero(list, struct dn_list);
1442         if (list3 == NULL) {
1443                 return false;
1444         }
1445
1446         list3->dn = talloc_array(list3, struct ldb_val,
1447                                  MIN(list->count, list2->count));
1448         if (!list3->dn) {
1449                 talloc_free(list3);
1450                 return false;
1451         }
1452         list3->count = 0;
1453
1454         for (i=0;i<short_list->count;i++) {
1455                 /* For the GUID index case, this is a binary search */
1456                 if (ldb_kv_dn_list_find_val(
1457                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1458                         list3->dn[list3->count] = short_list->dn[i];
1459                         list3->count++;
1460                 }
1461         }
1462
1463         list->strict |= list2->strict;
1464         list->dn = talloc_steal(list, list3->dn);
1465         list->count = list3->count;
1466         talloc_free(list3);
1467
1468         return true;
1469 }
1470
1471
1472 /*
1473   list union
1474   list = list | list2
1475 */
1476 static bool list_union(struct ldb_context *ldb,
1477                        struct ldb_kv_private *ldb_kv,
1478                        struct dn_list *list,
1479                        struct dn_list *list2)
1480 {
1481         struct ldb_val *dn3;
1482         unsigned int i = 0, j = 0, k = 0;
1483
1484         if (list2->count == 0) {
1485                 /* X | 0 == X */
1486                 return true;
1487         }
1488
1489         if (list->count == 0) {
1490                 /* 0 | X == X */
1491                 list->count = list2->count;
1492                 list->dn = list2->dn;
1493                 /* note that list2 may not be the parent of list2->dn,
1494                    as list2->dn may be owned by ltdb->idxptr. In that
1495                    case we expect this reparent call to fail, which is
1496                    OK */
1497                 talloc_reparent(list2, list, list2->dn);
1498                 return true;
1499         }
1500
1501         /*
1502          * Sort the lists (if not in GUID DN mode) so we can do
1503          * the de-duplication during the merge
1504          *
1505          * NOTE: This can sort the in-memory index values, as list or
1506          * list2 might not be a copy!
1507          */
1508         ldb_kv_dn_list_sort(ldb_kv, list);
1509         ldb_kv_dn_list_sort(ldb_kv, list2);
1510
1511         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1512         if (!dn3) {
1513                 ldb_oom(ldb);
1514                 return false;
1515         }
1516
1517         while (i < list->count || j < list2->count) {
1518                 int cmp;
1519                 if (i >= list->count) {
1520                         cmp = 1;
1521                 } else if (j >= list2->count) {
1522                         cmp = -1;
1523                 } else {
1524                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1525                                                           &list2->dn[j]);
1526                 }
1527
1528                 if (cmp < 0) {
1529                         /* Take list */
1530                         dn3[k] = list->dn[i];
1531                         i++;
1532                         k++;
1533                 } else if (cmp > 0) {
1534                         /* Take list2 */
1535                         dn3[k] = list2->dn[j];
1536                         j++;
1537                         k++;
1538                 } else {
1539                         /* Equal, take list */
1540                         dn3[k] = list->dn[i];
1541                         i++;
1542                         j++;
1543                         k++;
1544                 }
1545         }
1546
1547         list->dn = dn3;
1548         list->count = k;
1549
1550         return true;
1551 }
1552
1553 static int ldb_kv_index_dn(struct ldb_module *module,
1554                            struct ldb_kv_private *ldb_kv,
1555                            const struct ldb_parse_tree *tree,
1556                            struct dn_list *list);
1557
1558 /*
1559   process an OR list (a union)
1560  */
1561 static int ldb_kv_index_dn_or(struct ldb_module *module,
1562                               struct ldb_kv_private *ldb_kv,
1563                               const struct ldb_parse_tree *tree,
1564                               struct dn_list *list)
1565 {
1566         struct ldb_context *ldb;
1567         unsigned int i;
1568
1569         ldb = ldb_module_get_ctx(module);
1570
1571         list->dn = NULL;
1572         list->count = 0;
1573
1574         for (i=0; i<tree->u.list.num_elements; i++) {
1575                 struct dn_list *list2;
1576                 int ret;
1577
1578                 list2 = talloc_zero(list, struct dn_list);
1579                 if (list2 == NULL) {
1580                         return LDB_ERR_OPERATIONS_ERROR;
1581                 }
1582
1583                 ret = ldb_kv_index_dn(
1584                     module, ldb_kv, tree->u.list.elements[i], list2);
1585
1586                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1587                         /* X || 0 == X */
1588                         talloc_free(list2);
1589                         continue;
1590                 }
1591
1592                 if (ret != LDB_SUCCESS) {
1593                         /* X || * == * */
1594                         talloc_free(list2);
1595                         return ret;
1596                 }
1597
1598                 if (!list_union(ldb, ldb_kv, list, list2)) {
1599                         talloc_free(list2);
1600                         return LDB_ERR_OPERATIONS_ERROR;
1601                 }
1602         }
1603
1604         if (list->count == 0) {
1605                 return LDB_ERR_NO_SUCH_OBJECT;
1606         }
1607
1608         return LDB_SUCCESS;
1609 }
1610
1611
1612 /*
1613   NOT an index results
1614  */
1615 static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
1616                                _UNUSED_ struct ldb_kv_private *ldb_kv,
1617                                _UNUSED_ const struct ldb_parse_tree *tree,
1618                                _UNUSED_ struct dn_list *list)
1619 {
1620         /* the only way to do an indexed not would be if we could
1621            negate the not via another not or if we knew the total
1622            number of database elements so we could know that the
1623            existing expression covered the whole database.
1624
1625            instead, we just give up, and rely on a full index scan
1626            (unless an outer & manages to reduce the list)
1627         */
1628         return LDB_ERR_OPERATIONS_ERROR;
1629 }
1630
1631 /*
1632  * These things are unique, so avoid a full scan if this is a search
1633  * by GUID, DN or a unique attribute
1634  */
1635 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1636                                 struct ldb_kv_private *ldb_kv,
1637                                 const char *attr)
1638 {
1639         const struct ldb_schema_attribute *a;
1640         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1641                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1642                     0) {
1643                         return true;
1644                 }
1645         }
1646         if (ldb_attr_dn(attr) == 0) {
1647                 return true;
1648         }
1649
1650         a = ldb_schema_attribute_by_name(ldb, attr);
1651         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1652                 return true;
1653         }
1654         return false;
1655 }
1656
1657 /*
1658   process an AND expression (intersection)
1659  */
1660 static int ldb_kv_index_dn_and(struct ldb_module *module,
1661                                struct ldb_kv_private *ldb_kv,
1662                                const struct ldb_parse_tree *tree,
1663                                struct dn_list *list)
1664 {
1665         struct ldb_context *ldb;
1666         unsigned int i;
1667         bool found;
1668
1669         ldb = ldb_module_get_ctx(module);
1670
1671         list->dn = NULL;
1672         list->count = 0;
1673
1674         /* in the first pass we only look for unique simple
1675            equality tests, in the hope of avoiding having to look
1676            at any others */
1677         for (i=0; i<tree->u.list.num_elements; i++) {
1678                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1679                 int ret;
1680
1681                 if (subtree->operation != LDB_OP_EQUALITY ||
1682                     !ldb_kv_index_unique(
1683                         ldb, ldb_kv, subtree->u.equality.attr)) {
1684                         continue;
1685                 }
1686
1687                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1688                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1689                         /* 0 && X == 0 */
1690                         return LDB_ERR_NO_SUCH_OBJECT;
1691                 }
1692                 if (ret == LDB_SUCCESS) {
1693                         /* a unique index match means we can
1694                          * stop. Note that we don't care if we return
1695                          * a few too many objects, due to later
1696                          * filtering */
1697                         return LDB_SUCCESS;
1698                 }
1699         }
1700
1701         /* now do a full intersection */
1702         found = false;
1703
1704         for (i=0; i<tree->u.list.num_elements; i++) {
1705                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1706                 struct dn_list *list2;
1707                 int ret;
1708
1709                 list2 = talloc_zero(list, struct dn_list);
1710                 if (list2 == NULL) {
1711                         return ldb_module_oom(module);
1712                 }
1713
1714                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1715
1716                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1717                         /* X && 0 == 0 */
1718                         list->dn = NULL;
1719                         list->count = 0;
1720                         talloc_free(list2);
1721                         return LDB_ERR_NO_SUCH_OBJECT;
1722                 }
1723
1724                 if (ret != LDB_SUCCESS) {
1725                         /* this didn't adding anything */
1726                         talloc_free(list2);
1727                         continue;
1728                 }
1729
1730                 if (!found) {
1731                         talloc_reparent(list2, list, list->dn);
1732                         list->dn = list2->dn;
1733                         list->count = list2->count;
1734                         found = true;
1735                 } else if (!list_intersect(ldb_kv, list, list2)) {
1736                         talloc_free(list2);
1737                         return LDB_ERR_OPERATIONS_ERROR;
1738                 }
1739
1740                 if (list->count == 0) {
1741                         list->dn = NULL;
1742                         return LDB_ERR_NO_SUCH_OBJECT;
1743                 }
1744
1745                 if (list->count < 2) {
1746                         /* it isn't worth loading the next part of the tree */
1747                         return LDB_SUCCESS;
1748                 }
1749         }
1750
1751         if (!found) {
1752                 /* none of the attributes were indexed */
1753                 return LDB_ERR_OPERATIONS_ERROR;
1754         }
1755
1756         return LDB_SUCCESS;
1757 }
1758
1759 struct ldb_kv_ordered_index_context {
1760         struct ldb_module *module;
1761         int error;
1762         struct dn_list *dn_list;
1763 };
1764
1765 static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
1766                                 _UNUSED_ struct ldb_val key,
1767                                 struct ldb_val data,
1768                                 void *state)
1769 {
1770
1771         struct ldb_context *ldb;
1772         struct ldb_kv_ordered_index_context *ctx =
1773             (struct ldb_kv_ordered_index_context *)state;
1774         struct ldb_module *module = ctx->module;
1775         struct ldb_message_element *el = NULL;
1776         struct ldb_message *msg = NULL;
1777         int version;
1778         size_t dn_array_size, additional_length;
1779         unsigned int i;
1780
1781         ldb = ldb_module_get_ctx(module);
1782
1783         msg = ldb_msg_new(module);
1784
1785         ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1786                                            LDB_UNPACK_DATA_FLAG_NO_DN);
1787
1788         if (ctx->error != LDB_SUCCESS) {
1789                 talloc_free(msg);
1790                 return ctx->error;
1791         }
1792
1793         el = ldb_msg_find_element(msg, LDB_KV_IDX);
1794         if (!el) {
1795                 talloc_free(msg);
1796                 return LDB_SUCCESS;
1797         }
1798
1799         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1800
1801         /*
1802          * we avoid copying the strings by stealing the list.  We have
1803          * to steal msg onto el->values (which looks odd) because
1804          * the memory is allocated on msg, not on each value.
1805          */
1806         if (version != LDB_KV_GUID_INDEXING_VERSION) {
1807                 /* This is quite likely during the DB startup
1808                    on first upgrade to using a GUID index */
1809                 ldb_debug_set(ldb_module_get_ctx(module),
1810                               LDB_DEBUG_ERROR, __location__
1811                               ": Wrong GUID index version %d expected %d",
1812                               version, LDB_KV_GUID_INDEXING_VERSION);
1813                 talloc_free(msg);
1814                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1815                 return ctx->error;
1816         }
1817
1818         if (el->num_values == 0) {
1819                 talloc_free(msg);
1820                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1821                 return ctx->error;
1822         }
1823
1824         if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1825             || el->values[0].length == 0) {
1826                 talloc_free(msg);
1827                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1828                 return ctx->error;
1829         }
1830
1831         dn_array_size = talloc_array_length(ctx->dn_list->dn);
1832
1833         additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1834
1835         if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1836                 talloc_free(msg);
1837                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1838                 return ctx->error;
1839         }
1840
1841         if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1842                 size_t new_array_length;
1843
1844                 if (dn_array_size * 2 < dn_array_size) {
1845                         talloc_free(msg);
1846                         ctx->error = LDB_ERR_OPERATIONS_ERROR;
1847                         return ctx->error;
1848                 }
1849
1850                 new_array_length = MAX(ctx->dn_list->count + additional_length,
1851                                        dn_array_size * 2);
1852
1853                 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1854                                                   ctx->dn_list->dn,
1855                                                   struct ldb_val,
1856                                                   new_array_length);
1857         }
1858
1859         if (ctx->dn_list->dn == NULL) {
1860                 talloc_free(msg);
1861                 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1862                 return ctx->error;
1863         }
1864
1865         /*
1866          * The actual data is on msg.
1867          */
1868         talloc_steal(ctx->dn_list->dn, msg);
1869         for (i = 0; i < additional_length; i++) {
1870                 ctx->dn_list->dn[i + ctx->dn_list->count].data
1871                         = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1872                 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1873
1874         }
1875
1876         ctx->dn_list->count += additional_length;
1877
1878         talloc_free(msg->elements);
1879
1880         return LDB_SUCCESS;
1881 }
1882
1883 /*
1884  * >= and <= indexing implemented using lexicographically sorted keys
1885  *
1886  * We only run this in GUID indexing mode and when there is no write
1887  * transaction (only implicit read locks are being held). Otherwise, we would
1888  * have to deal with the in-memory index cache.
1889  *
1890  * We rely on the implementation of index_format_fn on a schema syntax which
1891  * will can help us to construct keys which can be ordered correctly, and we
1892  * terminate using schema agnostic start and end keys.
1893  *
1894  * index_format_fn must output values which can be memcmp-able to produce the
1895  * correct ordering as defined by the schema syntax class.
1896  */
1897 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1898                                    struct ldb_kv_private *ldb_kv,
1899                                    const struct ldb_parse_tree *tree,
1900                                    struct dn_list *list, bool ascending)
1901 {
1902         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1903         struct ldb_context *ldb = ldb_module_get_ctx(module);
1904
1905         struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1906         struct ldb_val start_key, end_key;
1907         struct ldb_dn *key_dn = NULL;
1908         const struct ldb_schema_attribute *a = NULL;
1909
1910         struct ldb_kv_ordered_index_context ctx;
1911         int ret;
1912
1913         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1914
1915         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1916                 return LDB_ERR_OPERATIONS_ERROR;
1917         }
1918
1919         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1920                 return LDB_ERR_OPERATIONS_ERROR;
1921         }
1922
1923         /* bail out if we're in a transaction, full search instead. */
1924         if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1925                 return LDB_ERR_OPERATIONS_ERROR;
1926         }
1927
1928         if (ldb_kv->disallow_dn_filter &&
1929             (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1930                 /* in AD mode we do not support "(dn=...)" search filters */
1931                 list->dn = NULL;
1932                 list->count = 0;
1933                 return LDB_SUCCESS;
1934         }
1935         if (tree->u.comparison.attr[0] == '@') {
1936                 /* Do not allow a indexed search against an @ */
1937                 list->dn = NULL;
1938                 list->count = 0;
1939                 return LDB_SUCCESS;
1940         }
1941
1942         a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1943
1944         /*
1945          * If there's no index format function defined for this attr, then
1946          * the lexicographic order in the database doesn't correspond to the
1947          * attr's ordering, so we can't use the iterate_range op.
1948          */
1949         if (a->syntax->index_format_fn == NULL) {
1950                 return LDB_ERR_OPERATIONS_ERROR;
1951         }
1952
1953         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1954                                   &tree->u.comparison.value,
1955                                   NULL, &truncation);
1956         if (!key_dn) {
1957                 return LDB_ERR_OPERATIONS_ERROR;
1958         } else if (truncation == KEY_TRUNCATED) {
1959                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1960                           __location__
1961                           ": ordered index violation: key dn truncated: %s\n",
1962                           ldb_dn_get_linearized(key_dn));
1963                 return LDB_ERR_OPERATIONS_ERROR;
1964         }
1965         ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1966         talloc_free(key_dn);
1967         if (ldb_key.data == NULL) {
1968                 return LDB_ERR_OPERATIONS_ERROR;
1969         }
1970
1971         key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1972                                   NULL, NULL, &truncation);
1973         if (!key_dn) {
1974                 return LDB_ERR_OPERATIONS_ERROR;
1975         } else if (truncation == KEY_TRUNCATED) {
1976                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1977                           __location__
1978                           ": ordered index violation: key dn truncated: %s\n",
1979                           ldb_dn_get_linearized(key_dn));
1980                 return LDB_ERR_OPERATIONS_ERROR;
1981         }
1982
1983         ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1984         talloc_free(key_dn);
1985         if (ldb_key2.data == NULL) {
1986                 return LDB_ERR_OPERATIONS_ERROR;
1987         }
1988
1989         /*
1990          * In order to avoid defining a start and end key for the search, we
1991          * notice that each index key is of the form:
1992          *
1993          *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1994          *
1995          * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1996          * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1997          * particular attribute.
1998          *
1999          * Our LMDB backend uses the default memcmp for key comparison.
2000          */
2001
2002         /* Eliminate NUL byte at the end of the empty key */
2003         ldb_key2.length--;
2004
2005         if (ascending) {
2006                 /* : becomes ; for pseudo end-key */
2007                 ldb_key2.data[ldb_key2.length-1]++;
2008                 start_key = ldb_key;
2009                 end_key = ldb_key2;
2010         } else {
2011                 start_key = ldb_key2;
2012                 end_key = ldb_key;
2013         }
2014
2015         ctx.module = module;
2016         ctx.error = 0;
2017         ctx.dn_list = list;
2018         ctx.dn_list->count = 0;
2019         ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
2020
2021         ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
2022                                             traverse_range_index, &ctx);
2023
2024         if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
2025                 return LDB_ERR_OPERATIONS_ERROR;
2026         }
2027
2028         TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
2029                        ldb_val_equal_exact_for_qsort);
2030
2031         talloc_free(tmp_ctx);
2032
2033         return LDB_SUCCESS;
2034 }
2035
2036 static int ldb_kv_index_dn_greater(struct ldb_module *module,
2037                                    struct ldb_kv_private *ldb_kv,
2038                                    const struct ldb_parse_tree *tree,
2039                                    struct dn_list *list)
2040 {
2041         return ldb_kv_index_dn_ordered(module,
2042                                        ldb_kv,
2043                                        tree,
2044                                        list, true);
2045 }
2046
2047 static int ldb_kv_index_dn_less(struct ldb_module *module,
2048                                    struct ldb_kv_private *ldb_kv,
2049                                    const struct ldb_parse_tree *tree,
2050                                    struct dn_list *list)
2051 {
2052         return ldb_kv_index_dn_ordered(module,
2053                                        ldb_kv,
2054                                        tree,
2055                                        list, false);
2056 }
2057
2058 /*
2059   return a list of matching objects using a one-level index
2060  */
2061 static int ldb_kv_index_dn_attr(struct ldb_module *module,
2062                                 struct ldb_kv_private *ldb_kv,
2063                                 const char *attr,
2064                                 struct ldb_dn *dn,
2065                                 struct dn_list *list,
2066                                 enum key_truncation *truncation)
2067 {
2068         struct ldb_context *ldb;
2069         struct ldb_dn *key;
2070         struct ldb_val val;
2071         int ret;
2072
2073         ldb = ldb_module_get_ctx(module);
2074
2075         /* work out the index key from the parent DN */
2076         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2077         if (val.data == NULL) {
2078                 const char *dn_str = ldb_dn_get_linearized(dn);
2079                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2080                                        __location__
2081                                        ": Failed to get casefold DN "
2082                                        "from: %s",
2083                                        dn_str);
2084                 return LDB_ERR_OPERATIONS_ERROR;
2085         }
2086         val.length = strlen((char *)val.data);
2087         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
2088         if (!key) {
2089                 ldb_oom(ldb);
2090                 return LDB_ERR_OPERATIONS_ERROR;
2091         }
2092
2093         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
2094                                   DN_LIST_WILL_BE_READ_ONLY);
2095         talloc_free(key);
2096         if (ret != LDB_SUCCESS) {
2097                 return ret;
2098         }
2099
2100         if (list->count == 0) {
2101                 return LDB_ERR_NO_SUCH_OBJECT;
2102         }
2103
2104         return LDB_SUCCESS;
2105 }
2106
2107 /*
2108   return a list of matching objects using a one-level index
2109  */
2110 static int ldb_kv_index_dn_one(struct ldb_module *module,
2111                                struct ldb_kv_private *ldb_kv,
2112                                struct ldb_dn *parent_dn,
2113                                struct dn_list *list,
2114                                enum key_truncation *truncation)
2115 {
2116         /*
2117          * Ensure we do not shortcut on intersection for this list.
2118          * We must never be lazy and return an entry not in this
2119          * list.  This allows the index for
2120          * SCOPE_ONELEVEL to be trusted.
2121          */
2122
2123         list->strict = true;
2124         return ldb_kv_index_dn_attr(
2125             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2126 }
2127
2128 /*
2129   return a list of matching objects using the DN index
2130  */
2131 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2132                                    struct ldb_kv_private *ldb_kv,
2133                                    struct ldb_dn *base_dn,
2134                                    struct dn_list *dn_list,
2135                                    enum key_truncation *truncation)
2136 {
2137         const struct ldb_val *guid_val = NULL;
2138         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2139                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2140                 if (dn_list->dn == NULL) {
2141                         return ldb_module_oom(module);
2142                 }
2143                 dn_list->dn[0].data = discard_const_p(unsigned char,
2144                                                       ldb_dn_get_linearized(base_dn));
2145                 if (dn_list->dn[0].data == NULL) {
2146                         talloc_free(dn_list->dn);
2147                         return ldb_module_oom(module);
2148                 }
2149                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2150                 dn_list->count = 1;
2151
2152                 return LDB_SUCCESS;
2153         }
2154
2155         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2156                 guid_val = ldb_dn_get_extended_component(
2157                     base_dn, ldb_kv->cache->GUID_index_dn_component);
2158         }
2159
2160         if (guid_val != NULL) {
2161                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2162                 if (dn_list->dn == NULL) {
2163                         return ldb_module_oom(module);
2164                 }
2165                 dn_list->dn[0].data = guid_val->data;
2166                 dn_list->dn[0].length = guid_val->length;
2167                 dn_list->count = 1;
2168
2169                 return LDB_SUCCESS;
2170         }
2171
2172         return ldb_kv_index_dn_attr(
2173             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2174 }
2175
2176 /*
2177   return a list of dn's that might match a indexed search or
2178   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2179  */
2180 static int ldb_kv_index_dn(struct ldb_module *module,
2181                            struct ldb_kv_private *ldb_kv,
2182                            const struct ldb_parse_tree *tree,
2183                            struct dn_list *list)
2184 {
2185         int ret = LDB_ERR_OPERATIONS_ERROR;
2186
2187         switch (tree->operation) {
2188         case LDB_OP_AND:
2189                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2190                 break;
2191
2192         case LDB_OP_OR:
2193                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2194                 break;
2195
2196         case LDB_OP_NOT:
2197                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2198                 break;
2199
2200         case LDB_OP_EQUALITY:
2201                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2202                 break;
2203
2204         case LDB_OP_GREATER:
2205                 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2206                 break;
2207
2208         case LDB_OP_LESS:
2209                 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2210                 break;
2211
2212         case LDB_OP_SUBSTRING:
2213         case LDB_OP_PRESENT:
2214         case LDB_OP_APPROX:
2215         case LDB_OP_EXTENDED:
2216                 /* we can't index with fancy bitops yet */
2217                 ret = LDB_ERR_OPERATIONS_ERROR;
2218                 break;
2219         }
2220
2221         return ret;
2222 }
2223
2224 /*
2225   filter a candidate dn_list from an indexed search into a set of results
2226   extracting just the given attributes
2227 */
2228 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2229                                const struct dn_list *dn_list,
2230                                struct ldb_kv_context *ac,
2231                                uint32_t *match_count,
2232                                enum key_truncation scope_one_truncation)
2233 {
2234         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2235         struct ldb_message *msg;
2236         struct ldb_message *filtered_msg;
2237         unsigned int i;
2238         unsigned int num_keys = 0;
2239         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2240         struct ldb_val *keys = NULL;
2241
2242         /*
2243          * We have to allocate the key list (rather than just walk the
2244          * caller supplied list) as the callback could change the list
2245          * (by modifying an indexed attribute hosted in the in-memory
2246          * index cache!)
2247          */
2248         keys = talloc_array(ac, struct ldb_val, dn_list->count);
2249         if (keys == NULL) {
2250                 return ldb_module_oom(ac->module);
2251         }
2252
2253         if (ldb_kv->cache->GUID_index_attribute != NULL) {
2254                 /*
2255                  * We speculate that the keys will be GUID based and so
2256                  * pre-fill in enough space for a GUID (avoiding a pile of
2257                  * small allocations)
2258                  */
2259                 struct guid_tdb_key {
2260                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2261                 } *key_values = NULL;
2262
2263                 key_values = talloc_array(keys,
2264                                           struct guid_tdb_key,
2265                                           dn_list->count);
2266
2267                 if (key_values == NULL) {
2268                         talloc_free(keys);
2269                         return ldb_module_oom(ac->module);
2270                 }
2271                 for (i = 0; i < dn_list->count; i++) {
2272                         keys[i].data = key_values[i].guid_key;
2273                         keys[i].length = sizeof(key_values[i].guid_key);
2274                 }
2275         } else {
2276                 for (i = 0; i < dn_list->count; i++) {
2277                         keys[i].data = NULL;
2278                         keys[i].length = 0;
2279                 }
2280         }
2281
2282         for (i = 0; i < dn_list->count; i++) {
2283                 int ret;
2284
2285                 ret = ldb_kv_idx_to_key(
2286                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2287                 if (ret != LDB_SUCCESS) {
2288                         talloc_free(keys);
2289                         return ret;
2290                 }
2291
2292                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2293                         /*
2294                          * If we are in GUID index mode, then the dn_list is
2295                          * sorted.  If we got a duplicate, forget about it, as
2296                          * otherwise we would send the same entry back more
2297                          * than once.
2298                          *
2299                          * This is needed in the truncated DN case, or if a
2300                          * duplicate was forced in via
2301                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2302                          */
2303
2304                         if (memcmp(previous_guid_key,
2305                                    keys[num_keys].data,
2306                                    sizeof(previous_guid_key)) == 0) {
2307                                 continue;
2308                         }
2309
2310                         memcpy(previous_guid_key,
2311                                keys[num_keys].data,
2312                                sizeof(previous_guid_key));
2313                 }
2314                 num_keys++;
2315         }
2316
2317
2318         /*
2319          * Now that the list is a safe copy, send the callbacks
2320          */
2321         for (i = 0; i < num_keys; i++) {
2322                 int ret;
2323                 bool matched;
2324                 msg = ldb_msg_new(ac);
2325                 if (!msg) {
2326                         talloc_free(keys);
2327                         return LDB_ERR_OPERATIONS_ERROR;
2328                 }
2329
2330                 ret =
2331                     ldb_kv_search_key(ac->module,
2332                                       ldb_kv,
2333                                       keys[i],
2334                                       msg,
2335                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2336                                       /*
2337                                        * The entry point ldb_kv_search_indexed is
2338                                        * only called from the read-locked
2339                                        * ldb_kv_search.
2340                                        */
2341                                       LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2342                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2343                         /*
2344                          * the record has disappeared? yes, this can
2345                          * happen if the entry is deleted by something
2346                          * operating in the callback (not another
2347                          * process, as we have a read lock)
2348                          */
2349                         talloc_free(msg);
2350                         continue;
2351                 }
2352
2353                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2354                         /* an internal error */
2355                         talloc_free(keys);
2356                         talloc_free(msg);
2357                         return LDB_ERR_OPERATIONS_ERROR;
2358                 }
2359
2360                 /*
2361                  * We trust the index for LDB_SCOPE_ONELEVEL
2362                  * unless the index key has been truncated.
2363                  *
2364                  * LDB_SCOPE_BASE is not passed in by our only caller.
2365                  */
2366                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2367                     ldb_kv->cache->one_level_indexes &&
2368                     scope_one_truncation == KEY_NOT_TRUNCATED) {
2369                         ret = ldb_match_message(ldb, msg, ac->tree,
2370                                                 ac->scope, &matched);
2371                 } else {
2372                         ret = ldb_match_msg_error(ldb, msg,
2373                                                   ac->tree, ac->base,
2374                                                   ac->scope, &matched);
2375                 }
2376
2377                 if (ret != LDB_SUCCESS) {
2378                         talloc_free(keys);
2379                         talloc_free(msg);
2380                         return ret;
2381                 }
2382                 if (!matched) {
2383                         talloc_free(msg);
2384                         continue;
2385                 }
2386
2387                 filtered_msg = ldb_msg_new(ac);
2388                 if (filtered_msg == NULL) {
2389                         TALLOC_FREE(keys);
2390                         TALLOC_FREE(msg);
2391                         return LDB_ERR_OPERATIONS_ERROR;
2392                 }
2393
2394                 filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2395
2396                 /* filter the attributes that the user wants */
2397                 ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2398
2399                 talloc_free(msg);
2400
2401                 if (ret == -1) {
2402                         TALLOC_FREE(filtered_msg);
2403                         talloc_free(keys);
2404                         return LDB_ERR_OPERATIONS_ERROR;
2405                 }
2406
2407                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2408                 if (ret != LDB_SUCCESS) {
2409                         /* Regardless of success or failure, the msg
2410                          * is the callbacks responsiblity, and should
2411                          * not be talloc_free()'ed */
2412                         ac->request_terminated = true;
2413                         talloc_free(keys);
2414                         return ret;
2415                 }
2416
2417                 (*match_count)++;
2418         }
2419
2420         TALLOC_FREE(keys);
2421         return LDB_SUCCESS;
2422 }
2423
2424 /*
2425   sort a DN list
2426  */
2427 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2428                                 struct dn_list *list)
2429 {
2430         if (list->count < 2) {
2431                 return;
2432         }
2433
2434         /* We know the list is sorted when using the GUID index */
2435         if (ltdb->cache->GUID_index_attribute != NULL) {
2436                 return;
2437         }
2438
2439         TYPESAFE_QSORT(list->dn, list->count,
2440                        ldb_val_equal_exact_for_qsort);
2441 }
2442
2443 /*
2444   search the database with a LDAP-like expression using indexes
2445   returns -1 if an indexed search is not possible, in which
2446   case the caller should call ltdb_search_full()
2447 */
2448 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2449 {
2450         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2451         struct ldb_kv_private *ldb_kv = talloc_get_type(
2452             ldb_module_get_private(ac->module), struct ldb_kv_private);
2453         struct dn_list *dn_list;
2454         int ret;
2455         enum ldb_scope index_scope;
2456         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2457
2458         /* see if indexing is enabled */
2459         if (!ldb_kv->cache->attribute_indexes &&
2460             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2461                 /* fallback to a full search */
2462                 return LDB_ERR_OPERATIONS_ERROR;
2463         }
2464
2465         dn_list = talloc_zero(ac, struct dn_list);
2466         if (dn_list == NULL) {
2467                 return ldb_module_oom(ac->module);
2468         }
2469
2470         /*
2471          * For the purposes of selecting the switch arm below, if we
2472          * don't have a one-level index then treat it like a subtree
2473          * search
2474          */
2475         if (ac->scope == LDB_SCOPE_ONELEVEL &&
2476             !ldb_kv->cache->one_level_indexes) {
2477                 index_scope = LDB_SCOPE_SUBTREE;
2478         } else {
2479                 index_scope = ac->scope;
2480         }
2481
2482         switch (index_scope) {
2483         case LDB_SCOPE_BASE:
2484                 /*
2485                  * The only caller will have filtered the operation out
2486                  * so we should never get here
2487                  */
2488                 return ldb_operr(ldb);
2489
2490         case LDB_SCOPE_ONELEVEL:
2491
2492                 /*
2493                  * First, load all the one-level child objects (regardless of
2494                  * whether they match the search filter or not). The database
2495                  * maintains a one-level index, so retrieving this is quick.
2496                  */
2497                 ret = ldb_kv_index_dn_one(ac->module,
2498                                           ldb_kv,
2499                                           ac->base,
2500                                           dn_list,
2501                                           &scope_one_truncation);
2502                 if (ret != LDB_SUCCESS) {
2503                         talloc_free(dn_list);
2504                         return ret;
2505                 }
2506
2507                 /*
2508                  * If we have too many children, running ldb_kv_index_filter()
2509                  * over all the child objects can be quite expensive. So next
2510                  * we do a separate indexed query using the search filter.
2511                  *
2512                  * This should be quick, but it may return objects that are not
2513                  * the direct one-level child objects we're interested in.
2514                  *
2515                  * We only do this in the GUID index mode, which is
2516                  * O(n*log(m)) otherwise the intersection below will
2517                  * be too costly at O(n*m).
2518                  *
2519                  * We don't set a heuristic for 'too many' but instead
2520                  * do it always and rely on the index lookup being
2521                  * fast enough in the small case.
2522                  */
2523                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2524                         struct dn_list *indexed_search_result
2525                                 = talloc_zero(ac, struct dn_list);
2526                         if (indexed_search_result == NULL) {
2527                                 talloc_free(dn_list);
2528                                 return ldb_module_oom(ac->module);
2529                         }
2530
2531                         if (!ldb_kv->cache->attribute_indexes) {
2532                                 talloc_free(indexed_search_result);
2533                                 talloc_free(dn_list);
2534                                 return LDB_ERR_OPERATIONS_ERROR;
2535                         }
2536
2537                         /*
2538                          * Try to do an indexed database search
2539                          */
2540                         ret = ldb_kv_index_dn(
2541                             ac->module, ldb_kv, ac->tree,
2542                             indexed_search_result);
2543
2544                         /*
2545                          * We can stop if we're sure the object doesn't exist
2546                          */
2547                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2548                                 talloc_free(indexed_search_result);
2549                                 talloc_free(dn_list);
2550                                 return LDB_ERR_NO_SUCH_OBJECT;
2551                         }
2552
2553                         /*
2554                          * Once we have a successful search result, we
2555                          * intersect it with the one-level children (dn_list).
2556                          * This should give us exactly the result we're after
2557                          * (we still need to run ldb_kv_index_filter() to
2558                          * handle potential index truncation cases).
2559                          *
2560                          * The indexed search may fail because we don't support
2561                          * indexing on that type of search operation, e.g.
2562                          * matching against '*'. In which case we fall through
2563                          * and run ldb_kv_index_filter() over all the one-level
2564                          * children (which is still better than bailing out here
2565                          * and falling back to a full DB scan).
2566                          */
2567                         if (ret == LDB_SUCCESS) {
2568                                 if (!list_intersect(ldb_kv,
2569                                                     dn_list,
2570                                                     indexed_search_result)) {
2571                                         talloc_free(indexed_search_result);
2572                                         talloc_free(dn_list);
2573                                         return LDB_ERR_OPERATIONS_ERROR;
2574                                 }
2575                         }
2576                 }
2577                 break;
2578
2579         case LDB_SCOPE_SUBTREE:
2580         case LDB_SCOPE_DEFAULT:
2581                 if (!ldb_kv->cache->attribute_indexes) {
2582                         talloc_free(dn_list);
2583                         return LDB_ERR_OPERATIONS_ERROR;
2584                 }
2585                 /*
2586                  * Here we load the index for the tree.  We have no
2587                  * index for the subtree.
2588                  */
2589                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2590                 if (ret != LDB_SUCCESS) {
2591                         talloc_free(dn_list);
2592                         return ret;
2593                 }
2594                 break;
2595         }
2596
2597         /*
2598          * It is critical that this function do the re-filter even
2599          * on things found by the index as the index can over-match
2600          * in cases of truncation (as well as when it decides it is
2601          * not worth further filtering)
2602          *
2603          * If this changes, then the index code above would need to
2604          * pass up a flag to say if any index was truncated during
2605          * processing as the truncation here refers only to the
2606          * SCOPE_ONELEVEL index.
2607          */
2608         ret = ldb_kv_index_filter(
2609             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2610         talloc_free(dn_list);
2611         return ret;
2612 }
2613
2614 /**
2615  * @brief Add a DN in the index list of a given attribute name/value pair
2616  *
2617  * This function will add the DN in the index list for the index for
2618  * the given attribute name and value.
2619  *
2620  * @param[in]  module       A ldb_module structure
2621  *
2622  * @param[in]  dn           The string representation of the DN as it
2623  *                          will be stored in the index entry
2624  *
2625  * @param[in]  el           A ldb_message_element array, one of the entry
2626  *                          referred by the v_idx is the attribute name and
2627  *                          value pair which will be used to construct the
2628  *                          index name
2629  *
2630  * @param[in]  v_idx        The index of element in the el array to use
2631  *
2632  * @return                  An ldb error code
2633  */
2634 static int ldb_kv_index_add1(struct ldb_module *module,
2635                              struct ldb_kv_private *ldb_kv,
2636                              const struct ldb_message *msg,
2637                              struct ldb_message_element *el,
2638                              int v_idx)
2639 {
2640         struct ldb_context *ldb;
2641         struct ldb_dn *dn_key;
2642         int ret;
2643         const struct ldb_schema_attribute *a;
2644         struct dn_list *list;
2645         unsigned alloc_len;
2646         enum key_truncation truncation = KEY_TRUNCATED;
2647
2648
2649         ldb = ldb_module_get_ctx(module);
2650
2651         list = talloc_zero(module, struct dn_list);
2652         if (list == NULL) {
2653                 return LDB_ERR_OPERATIONS_ERROR;
2654         }
2655
2656         dn_key = ldb_kv_index_key(
2657             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2658         if (!dn_key) {
2659                 talloc_free(list);
2660                 return LDB_ERR_OPERATIONS_ERROR;
2661         }
2662         /*
2663          * Samba only maintains unique indexes on the objectSID and objectGUID
2664          * so if a unique index key exceeds the maximum length there is a
2665          * problem.
2666          */
2667         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2668                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2669                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2670
2671                 ldb_asprintf_errstring(
2672                     ldb,
2673                     __location__ ": unique index key on %s in %s, "
2674                                  "exceeds maximum key length of %u (encoded).",
2675                     el->name,
2676                     ldb_dn_get_linearized(msg->dn),
2677                     ldb_kv->max_key_length);
2678                 talloc_free(list);
2679                 return LDB_ERR_CONSTRAINT_VIOLATION;
2680         }
2681         talloc_steal(list, dn_key);
2682
2683         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
2684                                   DN_LIST_MUTABLE);
2685         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2686                 talloc_free(list);
2687                 return ret;
2688         }
2689
2690         /*
2691          * Check for duplicates in the @IDXDN DN -> GUID record
2692          *
2693          * This is very normal, it just means a duplicate DN creation
2694          * was attempted, so don't set the error string or print scary
2695          * messages.
2696          */
2697         if (list->count > 0 &&
2698             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2699             truncation == KEY_NOT_TRUNCATED) {
2700
2701                 talloc_free(list);
2702                 return LDB_ERR_CONSTRAINT_VIOLATION;
2703
2704         } else if (list->count > 0
2705                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2706
2707                 /*
2708                  * At least one existing entry in the DN->GUID index, which
2709                  * arises when the DN indexes have been truncated
2710                  *
2711                  * So need to pull the DN's to check if it's really a duplicate
2712                  */
2713                 unsigned int i;
2714                 for (i=0; i < list->count; i++) {
2715                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2716                         struct ldb_val key = {
2717                                 .data = guid_key,
2718                                 .length = sizeof(guid_key)
2719                         };
2720                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2721                         struct ldb_message *rec = ldb_msg_new(ldb);
2722                         if (rec == NULL) {
2723                                 return LDB_ERR_OPERATIONS_ERROR;
2724                         }
2725
2726                         ret = ldb_kv_idx_to_key(
2727                             module, ldb_kv, ldb, &list->dn[i], &key);
2728                         if (ret != LDB_SUCCESS) {
2729                                 TALLOC_FREE(list);
2730                                 TALLOC_FREE(rec);
2731                                 return ret;
2732                         }
2733
2734                         ret =
2735                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2736                         if (key.data != guid_key) {
2737                                 TALLOC_FREE(key.data);
2738                         }
2739                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2740                                 /*
2741                                  * the record has disappeared?
2742                                  * yes, this can happen
2743                                  */
2744                                 talloc_free(rec);
2745                                 continue;
2746                         }
2747
2748                         if (ret != LDB_SUCCESS) {
2749                                 /* an internal error */
2750                                 TALLOC_FREE(rec);
2751                                 TALLOC_FREE(list);
2752                                 return LDB_ERR_OPERATIONS_ERROR;
2753                         }
2754                         /*
2755                          * The DN we are trying to add to the DB and index
2756                          * is already here, so we must deny the addition
2757                          */
2758                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2759                                 TALLOC_FREE(rec);
2760                                 TALLOC_FREE(list);
2761                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2762                         }
2763                 }
2764         }
2765
2766         /*
2767          * Check for duplicates in unique indexes
2768          *
2769          * We don't need to do a loop test like the @IDXDN case
2770          * above as we have a ban on long unique index values
2771          * at the start of this function.
2772          */
2773         if (list->count > 0 &&
2774             ((a != NULL
2775               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2776                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2777                 /*
2778                  * We do not want to print info about a possibly
2779                  * confidential DN that the conflict was with in the
2780                  * user-visible error string
2781                  */
2782
2783                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2784                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2785                                   __location__
2786                                   ": unique index violation on %s in %s, "
2787                                   "conflicts with %*.*s in %s",
2788                                   el->name, ldb_dn_get_linearized(msg->dn),
2789                                   (int)list->dn[0].length,
2790                                   (int)list->dn[0].length,
2791                                   list->dn[0].data,
2792                                   ldb_dn_get_linearized(dn_key));
2793                 } else {
2794                         /* This can't fail, gives a default at worst */
2795                         const struct ldb_schema_attribute *attr =
2796                             ldb_schema_attribute_by_name(
2797                                 ldb, ldb_kv->cache->GUID_index_attribute);
2798                         struct ldb_val v;
2799                         ret = attr->syntax->ldif_write_fn(ldb, list,
2800                                                           &list->dn[0], &v);
2801                         if (ret == LDB_SUCCESS) {
2802                                 ldb_debug(ldb,
2803                                           LDB_DEBUG_WARNING,
2804                                           __location__
2805                                           ": unique index violation on %s in "
2806                                           "%s, conflicts with %s %*.*s in %s",
2807                                           el->name,
2808                                           ldb_dn_get_linearized(msg->dn),
2809                                           ldb_kv->cache->GUID_index_attribute,
2810                                           (int)v.length,
2811                                           (int)v.length,
2812                                           v.data,
2813                                           ldb_dn_get_linearized(dn_key));
2814                         }
2815                 }
2816                 ldb_asprintf_errstring(ldb,
2817                                        __location__ ": unique index violation "
2818                                        "on %s in %s",
2819                                        el->name,
2820                                        ldb_dn_get_linearized(msg->dn));
2821                 talloc_free(list);
2822                 return LDB_ERR_CONSTRAINT_VIOLATION;
2823         }
2824
2825         /* overallocate the list a bit, to reduce the number of
2826          * realloc trigered copies */
2827         alloc_len = ((list->count+1)+7) & ~7;
2828         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2829         if (list->dn == NULL) {
2830                 talloc_free(list);
2831                 return LDB_ERR_OPERATIONS_ERROR;
2832         }
2833
2834         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2835                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2836                 list->dn[list->count].data
2837                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2838                 if (list->dn[list->count].data == NULL) {
2839                         talloc_free(list);
2840                         return LDB_ERR_OPERATIONS_ERROR;
2841                 }
2842                 list->dn[list->count].length = strlen(dn_str);
2843         } else {
2844                 const struct ldb_val *key_val;
2845                 struct ldb_val *exact = NULL, *next = NULL;
2846                 key_val = ldb_msg_find_ldb_val(
2847                     msg, ldb_kv->cache->GUID_index_attribute);
2848                 if (key_val == NULL) {
2849                         talloc_free(list);
2850                         return ldb_module_operr(module);
2851                 }
2852
2853                 if (key_val->length != LDB_KV_GUID_SIZE) {
2854                         talloc_free(list);
2855                         return ldb_module_operr(module);
2856                 }
2857
2858                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2859                                         *key_val, ldb_val_equal_exact_ordered,
2860                                         exact, next);
2861
2862                 /*
2863                  * Give a warning rather than fail, this could be a
2864                  * duplicate value in the record allowed by a caller
2865                  * forcing in the value with
2866                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2867                  */
2868                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2869                         /* This can't fail, gives a default at worst */
2870                         const struct ldb_schema_attribute *attr =
2871                             ldb_schema_attribute_by_name(
2872                                 ldb, ldb_kv->cache->GUID_index_attribute);
2873                         struct ldb_val v;
2874                         ret = attr->syntax->ldif_write_fn(ldb, list,
2875                                                           exact, &v);
2876                         if (ret == LDB_SUCCESS) {
2877                                 ldb_debug(ldb,
2878                                           LDB_DEBUG_WARNING,
2879                                           __location__
2880                                           ": duplicate attribute value in %s "
2881                                           "for index on %s, "
2882                                           "duplicate of %s %*.*s in %s",
2883                                           ldb_dn_get_linearized(msg->dn),
2884                                           el->name,
2885                                           ldb_kv->cache->GUID_index_attribute,
2886                                           (int)v.length,
2887                                           (int)v.length,
2888                                           v.data,
2889                                           ldb_dn_get_linearized(dn_key));
2890                         }
2891                 }
2892
2893                 if (next == NULL) {
2894                         next = &list->dn[list->count];
2895                 } else {
2896                         memmove(&next[1], next,
2897                                 sizeof(*next) * (list->count - (next - list->dn)));
2898                 }
2899                 *next = ldb_val_dup(list->dn, key_val);
2900                 if (next->data == NULL) {
2901                         talloc_free(list);
2902                         return ldb_module_operr(module);
2903                 }
2904         }
2905         list->count++;
2906
2907         ret = ldb_kv_dn_list_store(module, dn_key, list);
2908
2909         talloc_free(list);
2910
2911         return ret;
2912 }
2913
2914 /*
2915   add index entries for one elements in a message
2916  */
2917 static int ldb_kv_index_add_el(struct ldb_module *module,
2918                                struct ldb_kv_private *ldb_kv,
2919                                const struct ldb_message *msg,
2920                                struct ldb_message_element *el)
2921 {
2922         unsigned int i;
2923         for (i = 0; i < el->num_values; i++) {
2924                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2925                 if (ret != LDB_SUCCESS) {
2926                         return ret;
2927                 }
2928         }
2929
2930         return LDB_SUCCESS;
2931 }
2932
2933 /*
2934   add index entries for all elements in a message
2935  */
2936 static int ldb_kv_index_add_all(struct ldb_module *module,
2937                                 struct ldb_kv_private *ldb_kv,
2938                                 const struct ldb_message *msg)
2939 {
2940         struct ldb_message_element *elements = msg->elements;
2941         unsigned int i;
2942         const char *dn_str;
2943         int ret;
2944
2945         if (ldb_dn_is_special(msg->dn)) {
2946                 return LDB_SUCCESS;
2947         }
2948
2949         dn_str = ldb_dn_get_linearized(msg->dn);
2950         if (dn_str == NULL) {
2951                 return LDB_ERR_OPERATIONS_ERROR;
2952         }
2953
2954         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2955         if (ret != LDB_SUCCESS) {
2956                 return ret;
2957         }
2958
2959         if (!ldb_kv->cache->attribute_indexes) {
2960                 /* no indexed fields */
2961                 return LDB_SUCCESS;
2962         }
2963
2964         for (i = 0; i < msg->num_elements; i++) {
2965                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2966                         continue;
2967                 }
2968                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2969                 if (ret != LDB_SUCCESS) {
2970                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2971                         ldb_asprintf_errstring(ldb,
2972                                                __location__ ": Failed to re-index %s in %s - %s",
2973                                                elements[i].name, dn_str,
2974                                                ldb_errstring(ldb));
2975                         return ret;
2976                 }
2977         }
2978
2979         return LDB_SUCCESS;
2980 }
2981
2982
2983 /*
2984   insert a DN index for a message
2985 */
2986 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2987                                   struct ldb_kv_private *ldb_kv,
2988                                   const struct ldb_message *msg,
2989                                   struct ldb_dn *dn,
2990                                   const char *index,
2991                                   int add)
2992 {
2993         struct ldb_message_element el;
2994         struct ldb_val val;
2995         int ret;
2996
2997         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2998         if (val.data == NULL) {
2999                 const char *dn_str = ldb_dn_get_linearized(dn);
3000                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3001                                        __location__ ": Failed to modify %s "
3002                                                     "against %s in %s: failed "
3003                                                     "to get casefold DN",
3004                                        index,
3005                                        ldb_kv->cache->GUID_index_attribute,
3006                                        dn_str);
3007                 return LDB_ERR_OPERATIONS_ERROR;
3008         }
3009
3010         val.length = strlen((char *)val.data);
3011         el.name = index;
3012         el.values = &val;
3013         el.num_values = 1;
3014
3015         if (add) {
3016                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
3017         } else { /* delete */
3018                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
3019         }
3020
3021         if (ret != LDB_SUCCESS) {
3022                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3023                 const char *dn_str = ldb_dn_get_linearized(dn);
3024                 ldb_asprintf_errstring(ldb,
3025                                        __location__ ": Failed to modify %s "
3026                                                     "against %s in %s - %s",
3027                                        index,
3028                                        ldb_kv->cache->GUID_index_attribute,
3029                                        dn_str,
3030                                        ldb_errstring(ldb));
3031                 return ret;
3032         }
3033         return ret;
3034 }
3035
3036 /*
3037   insert a one level index for a message
3038 */
3039 static int ldb_kv_index_onelevel(struct ldb_module *module,
3040                                  const struct ldb_message *msg,
3041                                  int add)
3042 {
3043         struct ldb_kv_private *ldb_kv = talloc_get_type(
3044             ldb_module_get_private(module), struct ldb_kv_private);
3045         struct ldb_dn *pdn;
3046         int ret;
3047
3048         /* We index for ONE Level only if requested */
3049         if (!ldb_kv->cache->one_level_indexes) {
3050                 return LDB_SUCCESS;
3051         }
3052
3053         pdn = ldb_dn_get_parent(module, msg->dn);
3054         if (pdn == NULL) {
3055                 return LDB_ERR_OPERATIONS_ERROR;
3056         }
3057         ret =
3058             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
3059
3060         talloc_free(pdn);
3061
3062         return ret;
3063 }
3064
3065 /*
3066   insert a one level index for a message
3067 */
3068 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
3069                                       const struct ldb_message *msg,
3070                                       int add)
3071 {
3072         int ret;
3073         struct ldb_kv_private *ldb_kv = talloc_get_type(
3074             ldb_module_get_private(module), struct ldb_kv_private);
3075
3076         /* We index for DN only if using a GUID index */
3077         if (ldb_kv->cache->GUID_index_attribute == NULL) {
3078                 return LDB_SUCCESS;
3079         }
3080
3081         ret = ldb_kv_modify_index_dn(
3082             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
3083
3084         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
3085                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3086                                        "Entry %s already exists",
3087                                        ldb_dn_get_linearized(msg->dn));
3088                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
3089         }
3090         return ret;
3091 }
3092
3093 /*
3094   add the index entries for a new element in a record
3095   The caller guarantees that these element values are not yet indexed
3096 */
3097 int ldb_kv_index_add_element(struct ldb_module *module,
3098                              struct ldb_kv_private *ldb_kv,
3099                              const struct ldb_message *msg,
3100                              struct ldb_message_element *el)
3101 {
3102         if (ldb_dn_is_special(msg->dn)) {
3103                 return LDB_SUCCESS;
3104         }
3105         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3106                 return LDB_SUCCESS;
3107         }
3108         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
3109 }
3110
3111 /*
3112   add the index entries for a new record
3113 */
3114 int ldb_kv_index_add_new(struct ldb_module *module,
3115                          struct ldb_kv_private *ldb_kv,
3116                          const struct ldb_message *msg)
3117 {
3118         int ret;
3119
3120         if (ldb_dn_is_special(msg->dn)) {
3121                 return LDB_SUCCESS;
3122         }
3123
3124         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3125         if (ret != LDB_SUCCESS) {
3126                 /*
3127                  * Because we can't trust the caller to be doing
3128                  * transactions properly, clean up any index for this
3129                  * entry rather than relying on a transaction
3130                  * cleanup
3131                  */
3132
3133                 ldb_kv_index_delete(module, msg);
3134                 return ret;
3135         }
3136
3137         ret = ldb_kv_index_onelevel(module, msg, 1);
3138         if (ret != LDB_SUCCESS) {
3139                 /*
3140                  * Because we can't trust the caller to be doing
3141                  * transactions properly, clean up any index for this
3142                  * entry rather than relying on a transaction
3143                  * cleanup
3144                  */
3145                 ldb_kv_index_delete(module, msg);
3146                 return ret;
3147         }
3148         return ret;
3149 }
3150
3151
3152 /*
3153   delete an index entry for one message element
3154 */
3155 int ldb_kv_index_del_value(struct ldb_module *module,
3156                            struct ldb_kv_private *ldb_kv,
3157                            const struct ldb_message *msg,
3158                            struct ldb_message_element *el,
3159                            unsigned int v_idx)
3160 {
3161         struct ldb_context *ldb;
3162         struct ldb_dn *dn_key;
3163         const char *dn_str;
3164         int ret, i;
3165         unsigned int j;
3166         struct dn_list *list;
3167         struct ldb_dn *dn = msg->dn;
3168         enum key_truncation truncation = KEY_NOT_TRUNCATED;
3169
3170         ldb = ldb_module_get_ctx(module);
3171
3172         dn_str = ldb_dn_get_linearized(dn);
3173         if (dn_str == NULL) {
3174                 return LDB_ERR_OPERATIONS_ERROR;
3175         }
3176
3177         if (dn_str[0] == '@') {
3178                 return LDB_SUCCESS;
3179         }
3180
3181         dn_key = ldb_kv_index_key(
3182             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3183         /*
3184          * We ignore key truncation in ltdb_index_add1() so
3185          * match that by ignoring it here as well
3186          *
3187          * Multiple values are legitimate and accepted
3188          */
3189         if (!dn_key) {
3190                 return LDB_ERR_OPERATIONS_ERROR;
3191         }
3192
3193         list = talloc_zero(dn_key, struct dn_list);
3194         if (list == NULL) {
3195                 talloc_free(dn_key);
3196                 return LDB_ERR_OPERATIONS_ERROR;
3197         }
3198
3199         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
3200                                   DN_LIST_MUTABLE);
3201         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3202                 /* it wasn't indexed. Did we have an earlier error? If we did then
3203                    its gone now */
3204                 talloc_free(dn_key);
3205                 return LDB_SUCCESS;
3206         }
3207
3208         if (ret != LDB_SUCCESS) {
3209                 talloc_free(dn_key);
3210                 return ret;
3211         }
3212
3213         /*
3214          * Find one of the values matching this message to remove
3215          */
3216         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3217         if (i == -1) {
3218                 /* nothing to delete */
3219                 talloc_free(dn_key);
3220                 return LDB_SUCCESS;
3221         }
3222
3223         j = (unsigned int) i;
3224         if (j != list->count - 1) {
3225                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3226         }
3227         list->count--;
3228         if (list->count == 0) {
3229                 talloc_free(list->dn);
3230                 list->dn = NULL;
3231         } else {
3232                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3233         }
3234
3235         ret = ldb_kv_dn_list_store(module, dn_key, list);
3236
3237         talloc_free(dn_key);
3238
3239         return ret;
3240 }
3241
3242 /*
3243   delete the index entries for a element
3244   return -1 on failure
3245 */
3246 int ldb_kv_index_del_element(struct ldb_module *module,
3247                              struct ldb_kv_private *ldb_kv,
3248                              const struct ldb_message *msg,
3249                              struct ldb_message_element *el)
3250 {
3251         const char *dn_str;
3252         int ret;
3253         unsigned int i;
3254
3255         if (!ldb_kv->cache->attribute_indexes) {
3256                 /* no indexed fields */
3257                 return LDB_SUCCESS;
3258         }
3259
3260         dn_str = ldb_dn_get_linearized(msg->dn);
3261         if (dn_str == NULL) {
3262                 return LDB_ERR_OPERATIONS_ERROR;
3263         }
3264
3265         if (dn_str[0] == '@') {
3266                 return LDB_SUCCESS;
3267         }
3268
3269         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3270                 return LDB_SUCCESS;
3271         }
3272         for (i = 0; i < el->num_values; i++) {
3273                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3274                 if (ret != LDB_SUCCESS) {
3275                         return ret;
3276                 }
3277         }
3278
3279         return LDB_SUCCESS;
3280 }
3281
3282 /*
3283   delete the index entries for a record
3284   return -1 on failure
3285 */
3286 int ldb_kv_index_delete(struct ldb_module *module,
3287                         const struct ldb_message *msg)
3288 {
3289         struct ldb_kv_private *ldb_kv = talloc_get_type(
3290             ldb_module_get_private(module), struct ldb_kv_private);
3291         int ret;
3292         unsigned int i;
3293
3294         if (ldb_dn_is_special(msg->dn)) {
3295                 return LDB_SUCCESS;
3296         }
3297
3298         ret = ldb_kv_index_onelevel(module, msg, 0);
3299         if (ret != LDB_SUCCESS) {
3300                 return ret;
3301         }
3302
3303         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3304         if (ret != LDB_SUCCESS) {
3305                 return ret;
3306         }
3307
3308         if (!ldb_kv->cache->attribute_indexes) {
3309                 /* no indexed fields */
3310                 return LDB_SUCCESS;
3311         }
3312
3313         for (i = 0; i < msg->num_elements; i++) {
3314                 ret = ldb_kv_index_del_element(
3315                     module, ldb_kv, msg, &msg->elements[i]);
3316                 if (ret != LDB_SUCCESS) {
3317                         return ret;
3318                 }
3319         }
3320
3321         return LDB_SUCCESS;
3322 }
3323
3324
3325 /*
3326   traversal function that deletes all @INDEX records in the in-memory
3327   TDB.
3328
3329   This does not touch the actual DB, that is done at transaction
3330   commit, which in turn greatly reduces DB churn as we will likely
3331   be able to do a direct update into the old record.
3332 */
3333 static int delete_index(struct ldb_kv_private *ldb_kv,
3334                         struct ldb_val key,
3335                         _UNUSED_ struct ldb_val data,
3336                         void *state)
3337 {
3338         struct ldb_module *module = state;
3339         const char *dnstr = "DN=" LDB_KV_INDEX ":";
3340         struct dn_list list;
3341         struct ldb_dn *dn;
3342         struct ldb_val v;
3343         int ret;
3344
3345         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3346                 return 0;
3347         }
3348         /* we need to put a empty list in the internal tdb for this
3349          * index entry */
3350         list.dn = NULL;
3351         list.count = 0;
3352
3353         /* the offset of 3 is to remove the DN= prefix. */
3354         v.data = key.data + 3;
3355         v.length = strnlen((char *)key.data, key.length) - 3;
3356
3357         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3358
3359         /*
3360          * This does not actually touch the DB quite yet, just
3361          * the in-memory index cache
3362          */
3363         ret = ldb_kv_dn_list_store(module, dn, &list);
3364         if (ret != LDB_SUCCESS) {
3365                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3366                                        "Unable to store null index for %s\n",
3367                                                 ldb_dn_get_linearized(dn));
3368                 talloc_free(dn);
3369                 return -1;
3370         }
3371         talloc_free(dn);
3372         return 0;
3373 }
3374
3375 /*
3376   traversal function that adds @INDEX records during a re index TODO wrong comment
3377 */
3378 static int re_key(struct ldb_kv_private *ldb_kv,
3379                   struct ldb_val key,
3380                   struct ldb_val val,
3381                   void *state)
3382 {
3383         struct ldb_context *ldb;
3384         struct ldb_kv_reindex_context *ctx =
3385             (struct ldb_kv_reindex_context *)state;
3386         struct ldb_module *module = ldb_kv->module;
3387         struct ldb_message *msg;
3388         int ret;
3389         struct ldb_val key2;
3390         bool is_record;
3391
3392         ldb = ldb_module_get_ctx(module);
3393
3394         is_record = ldb_kv_key_is_normal_record(key);
3395         if (is_record == false) {
3396                 return 0;
3397         }
3398
3399         msg = ldb_msg_new(module);
3400         if (msg == NULL) {
3401                 return -1;
3402         }
3403
3404         ret = ldb_unpack_data(ldb, &val, msg);
3405         if (ret != 0) {
3406                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3407                                                 ldb_dn_get_linearized(msg->dn));
3408                 ctx->error = ret;
3409                 talloc_free(msg);
3410                 return -1;
3411         }
3412
3413         if (msg->dn == NULL) {
3414                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3415                           "Refusing to re-index as GUID "
3416                           "key %*.*s with no DN\n",
3417                           (int)key.length, (int)key.length,
3418                           (char *)key.data);
3419                 talloc_free(msg);
3420                 return -1;
3421         }
3422
3423         /* check if the DN key has changed, perhaps due to the case
3424            insensitivity of an element changing, or a change from DN
3425            to GUID keys */
3426         key2 = ldb_kv_key_msg(module, msg, msg);
3427         if (key2.data == NULL) {
3428                 /* probably a corrupt record ... darn */
3429                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3430                                                 ldb_dn_get_linearized(msg->dn));
3431                 talloc_free(msg);
3432                 return 0;
3433         }
3434         if (key.length != key2.length ||
3435             (memcmp(key.data, key2.data, key.length) != 0)) {
3436                 ldb_kv->kv_ops->update_in_iterate(
3437                     ldb_kv, key, key2, val, ctx);
3438         }
3439         talloc_free(key2.data);
3440
3441         talloc_free(msg);
3442
3443         ctx->count++;
3444         if (ctx->count % 10000 == 0) {
3445                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3446                           "Reindexing: re-keyed %u records so far",
3447                           ctx->count);
3448         }
3449
3450         return 0;
3451 }
3452
3453 /*
3454   traversal function that adds @INDEX records during a re index
3455 */
3456 static int re_index(struct ldb_kv_private *ldb_kv,
3457                     struct ldb_val key,
3458                     struct ldb_val val,
3459                     void *state)
3460 {
3461         struct ldb_context *ldb;
3462         struct ldb_kv_reindex_context *ctx =
3463             (struct ldb_kv_reindex_context *)state;
3464         struct ldb_module *module = ldb_kv->module;
3465         struct ldb_message *msg;
3466         int ret;
3467         bool is_record;
3468
3469         ldb = ldb_module_get_ctx(module);
3470
3471         is_record = ldb_kv_key_is_normal_record(key);
3472         if (is_record == false) {
3473                 return 0;
3474         }
3475
3476         msg = ldb_msg_new(module);
3477         if (msg == NULL) {
3478                 return -1;
3479         }
3480
3481         ret = ldb_unpack_data(ldb, &val, msg);
3482         if (ret != 0) {
3483                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3484                                                 ldb_dn_get_linearized(msg->dn));
3485                 ctx->error = ret;
3486                 talloc_free(msg);
3487                 return -1;
3488         }
3489
3490         if (msg->dn == NULL) {
3491                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3492                           "Refusing to re-index as GUID "
3493                           "key %*.*s with no DN\n",
3494                           (int)key.length, (int)key.length,
3495                           (char *)key.data);
3496                 talloc_free(msg);
3497                 return -1;
3498         }
3499
3500         ret = ldb_kv_index_onelevel(module, msg, 1);
3501         if (ret != LDB_SUCCESS) {
3502                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3503                           "Adding special ONE LEVEL index failed (%s)!",
3504                                                 ldb_dn_get_linearized(msg->dn));
3505                 talloc_free(msg);
3506                 return -1;
3507         }
3508
3509         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3510
3511         if (ret != LDB_SUCCESS) {
3512                 ctx->error = ret;
3513                 talloc_free(msg);
3514                 return -1;
3515         }
3516
3517         talloc_free(msg);
3518
3519         ctx->count++;
3520         if (ctx->count % 10000 == 0) {
3521                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3522                           "Reindexing: re-indexed %u records so far",
3523                           ctx->count);
3524         }
3525
3526         return 0;
3527 }
3528
3529 static int re_pack(struct ldb_kv_private *ldb_kv,
3530                    _UNUSED_ struct ldb_val key,
3531                    struct ldb_val val,
3532                    void *state)
3533 {
3534         struct ldb_context *ldb;
3535         struct ldb_message *msg;
3536         struct ldb_module *module = ldb_kv->module;
3537         struct ldb_kv_repack_context *ctx =
3538             (struct ldb_kv_repack_context *)state;
3539         int ret;
3540
3541         ldb = ldb_module_get_ctx(module);
3542
3543         msg = ldb_msg_new(module);
3544         if (msg == NULL) {
3545                 return -1;
3546         }
3547
3548         ret = ldb_unpack_data(ldb, &val, msg);
3549         if (ret != 0) {
3550                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3551                           ldb_dn_get_linearized(msg->dn));
3552                 ctx->error = ret;
3553                 talloc_free(msg);
3554                 return -1;
3555         }
3556
3557         ret = ldb_kv_store(module, msg, TDB_MODIFY);
3558         if (ret != LDB_SUCCESS) {
3559                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3560                           ldb_dn_get_linearized(msg->dn));
3561                 ctx->error = ret;
3562                 talloc_free(msg);
3563                 return -1;
3564         }
3565
3566         /*
3567          * Warn the user that we're repacking the first time we see a normal
3568          * record. This means we never warn if we're repacking a database with
3569          * only @ records. This is because during database initialisation,
3570          * we might repack as initial settings are written out, and we don't
3571          * want to spam the log.
3572          */
3573         if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3574                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3575                           "Repacking database with format %#010x",
3576                           ldb_kv->pack_format_version);
3577                 ctx->normal_record_seen = true;
3578         }
3579
3580         ctx->count++;
3581         if (ctx->count % 10000 == 0) {
3582                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3583                           "Repack: re-packed %u records so far",
3584                           ctx->count);
3585         }
3586
3587         return 0;
3588 }
3589
3590 int ldb_kv_repack(struct ldb_module *module)
3591 {
3592         struct ldb_kv_private *ldb_kv = talloc_get_type(
3593             ldb_module_get_private(module), struct ldb_kv_private);
3594         struct ldb_context *ldb = ldb_module_get_ctx(module);
3595         struct ldb_kv_repack_context ctx;
3596         int ret;
3597
3598         ctx.count = 0;
3599         ctx.error = LDB_SUCCESS;
3600         ctx.normal_record_seen = false;
3601
3602         /* Iterate all database records and repack them in the new format */
3603         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3604         if (ret < 0) {
3605                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3606                           ldb_errstring(ldb));
3607                 return LDB_ERR_OPERATIONS_ERROR;
3608         }
3609
3610         if (ctx.error != LDB_SUCCESS) {
3611                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3612                           ldb_errstring(ldb));
3613                 return ctx.error;
3614         }
3615
3616         return LDB_SUCCESS;
3617 }
3618
3619 /*
3620   force a complete reindex of the database
3621 */
3622 int ldb_kv_reindex(struct ldb_module *module)
3623 {
3624         struct ldb_kv_private *ldb_kv = talloc_get_type(
3625             ldb_module_get_private(module), struct ldb_kv_private);
3626         int ret;
3627         struct ldb_kv_reindex_context ctx;
3628         size_t index_cache_size = 0;
3629
3630         /*
3631          * Only triggered after a modification, but make clear we do
3632          * not re-index a read-only DB
3633          */
3634         if (ldb_kv->read_only) {
3635                 return LDB_ERR_UNWILLING_TO_PERFORM;
3636         }
3637
3638         if (ldb_kv_cache_reload(module) != 0) {
3639                 return LDB_ERR_OPERATIONS_ERROR;
3640         }
3641
3642         /*
3643          * Ensure we read (and so remove) the entries from the real
3644          * DB, no values stored so far are any use as we want to do a
3645          * re-index
3646          */
3647         ldb_kv_index_transaction_cancel(module);
3648         if (ldb_kv->nested_idx_ptr != NULL) {
3649                 ldb_kv_index_sub_transaction_cancel(ldb_kv);
3650         }
3651
3652         /*
3653          * Calculate the size of the index cache that we'll need for
3654          * the re-index
3655          */
3656         index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3657         if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3658                 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3659         }
3660
3661         /*
3662          * Note that we don't start an index sub transaction for re-indexing
3663          */
3664         ret = ldb_kv_index_transaction_start(module, index_cache_size);
3665         if (ret != LDB_SUCCESS) {
3666                 return ret;
3667         }
3668
3669         /* first traverse the database deleting any @INDEX records by
3670          * putting NULL entries in the in-memory tdb
3671          */
3672         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3673         if (ret < 0) {
3674                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3675                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3676                                        ldb_errstring(ldb));
3677                 return LDB_ERR_OPERATIONS_ERROR;
3678         }
3679
3680         ctx.error = 0;
3681         ctx.count = 0;
3682
3683         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3684         if (ret < 0) {
3685                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3686                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3687                                        ldb_errstring(ldb));
3688                 return LDB_ERR_OPERATIONS_ERROR;
3689         }
3690
3691         if (ctx.error != LDB_SUCCESS) {
3692                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3693                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3694                 return ctx.error;
3695         }
3696
3697         ctx.error = 0;
3698         ctx.count = 0;
3699
3700         /* now traverse adding any indexes for normal LDB records */
3701         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3702         if (ret < 0) {
3703                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3704                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3705                                        ldb_errstring(ldb));
3706                 return LDB_ERR_OPERATIONS_ERROR;
3707         }
3708
3709         if (ctx.error != LDB_SUCCESS) {
3710                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3711                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3712                 return ctx.error;
3713         }
3714
3715         if (ctx.count > 10000) {
3716                 ldb_debug(ldb_module_get_ctx(module),
3717                           LDB_DEBUG_WARNING,
3718                           "Reindexing: re_index successful on %s, "
3719                           "final index write-out will be in transaction commit",
3720                           ldb_kv->kv_ops->name(ldb_kv));
3721         }
3722         return LDB_SUCCESS;
3723 }
3724
3725 /*
3726  * Copy the contents of the nested transaction index cache record to the
3727  * transaction index cache.
3728  *
3729  * During this 'commit' of the subtransaction to the main transaction
3730  * (cache), care must be taken to free any existing index at the top
3731  * level because otherwise we would leak memory.
3732  */
3733 static int ldb_kv_sub_transaction_traverse(
3734         struct tdb_context *tdb,
3735         TDB_DATA key,
3736         TDB_DATA data,
3737         void *state)
3738 {
3739         struct ldb_module *module = state;
3740         struct ldb_kv_private *ldb_kv = talloc_get_type(
3741             ldb_module_get_private(module), struct ldb_kv_private);
3742         TDB_DATA rec = {0};
3743         struct dn_list *index_in_subtransaction = NULL;
3744         struct dn_list *index_in_top_level = NULL;
3745         int ret = 0;
3746
3747         /*
3748          * This unwraps the pointer in the DB into a pointer in
3749          * memory, we are abusing TDB as a hash map, not a linearised
3750          * database store
3751          */
3752         index_in_subtransaction = ldb_kv_index_idxptr(module, data);
3753         if (index_in_subtransaction == NULL) {
3754                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3755                 return -1;
3756         }
3757
3758         /*
3759          * Do we already have an entry in the primary transaction cache
3760          * If so free it's dn_list and replace it with the dn_list from
3761          * the secondary cache
3762          *
3763          * The TDB and so the fetched rec contains NO DATA, just a
3764          * pointer to data held in memory.
3765          */
3766         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
3767         if (rec.dptr != NULL) {
3768                 index_in_top_level = ldb_kv_index_idxptr(module, rec);
3769                 free(rec.dptr);
3770                 if (index_in_top_level == NULL) {
3771                         abort();
3772                 }
3773                 /*
3774                  * We had this key at the top level.  However we made a copy
3775                  * at the sub-transaction level so that we could possibly
3776                  * roll back.  We have to free the top level index memory
3777                  * otherwise we would leak
3778                  */
3779                 if (index_in_top_level->count > 0) {
3780                         TALLOC_FREE(index_in_top_level->dn);
3781                 }
3782                 index_in_top_level->dn
3783                         = talloc_steal(index_in_top_level,
3784                                        index_in_subtransaction->dn);
3785                 index_in_top_level->count = index_in_subtransaction->count;
3786                 return 0;
3787         }
3788
3789         index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
3790         if (index_in_top_level == NULL) {
3791                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3792                 return -1;
3793         }
3794         index_in_top_level->dn
3795                 = talloc_steal(index_in_top_level,
3796                                index_in_subtransaction->dn);
3797         index_in_top_level->count = index_in_subtransaction->count;
3798
3799         rec.dptr = (uint8_t *)&index_in_top_level;
3800         rec.dsize = sizeof(void *);
3801
3802
3803         /*
3804          * This is not a store into the main DB, but into an in-memory
3805          * TDB, so we don't need a guard on ltdb->read_only
3806          */
3807         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
3808         if (ret != 0) {
3809                 ldb_kv->idxptr->error = ltdb_err_map(
3810                     tdb_error(ldb_kv->idxptr->itdb));
3811                 return -1;
3812         }
3813         return 0;
3814 }
3815
3816 /*
3817  * Initialise the index cache for a sub transaction.
3818  */
3819 int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
3820 {
3821         ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
3822         if (ldb_kv->nested_idx_ptr == NULL) {
3823                 return LDB_ERR_OPERATIONS_ERROR;
3824         }
3825
3826         /*
3827          * We use a tiny hash size for the sub-database (11).
3828          *
3829          * The sub-transaction is only for one record at a time, we
3830          * would use a linked list but that would make the code even
3831          * more complex when manipulating the index, as it would have
3832          * to know if we were in a nested transaction (normal
3833          * operations) or the top one (a reindex).
3834          */
3835         ldb_kv->nested_idx_ptr->itdb =
3836                 tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
3837         if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3838                 return LDB_ERR_OPERATIONS_ERROR;
3839         }
3840         return LDB_SUCCESS;
3841 }
3842
3843 /*
3844  * Clear the contents of the nested transaction index cache when the nested
3845  * transaction is cancelled.
3846  */
3847 int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
3848 {
3849         if (ldb_kv->nested_idx_ptr != NULL) {
3850                 tdb_close(ldb_kv->nested_idx_ptr->itdb);
3851                 TALLOC_FREE(ldb_kv->nested_idx_ptr);
3852         }
3853         return LDB_SUCCESS;
3854 }
3855
3856 /*
3857  * Commit a nested transaction,
3858  * Copy the contents of the nested transaction index cache to the
3859  * transaction index cache.
3860  */
3861 int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
3862 {
3863         int ret = 0;
3864
3865         if (ldb_kv->nested_idx_ptr == NULL) {
3866                 return LDB_SUCCESS;
3867         }
3868         if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3869                 return LDB_SUCCESS;
3870         }
3871         tdb_traverse(
3872             ldb_kv->nested_idx_ptr->itdb,
3873             ldb_kv_sub_transaction_traverse,
3874             ldb_kv->module);
3875         tdb_close(ldb_kv->nested_idx_ptr->itdb);
3876         ldb_kv->nested_idx_ptr->itdb = NULL;
3877
3878         ret = ldb_kv->nested_idx_ptr->error;
3879         if (ret != LDB_SUCCESS) {
3880                 struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
3881                 if (!ldb_errstring(ldb)) {
3882                         ldb_set_errstring(ldb, ldb_strerror(ret));
3883                 }
3884                 ldb_asprintf_errstring(
3885                         ldb,
3886                         __location__": Failed to update index records in "
3887                         "sub transaction commit: %s",
3888                         ldb_errstring(ldb));
3889         }
3890         TALLOC_FREE(ldb_kv->nested_idx_ptr);
3891         return ret;
3892 }