Spelling fixes s/conficts/conflicts/
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(struct ldb_module *module)
203 {
204         struct ldb_kv_private *ldb_kv = talloc_get_type(
205             ldb_module_get_private(module), struct ldb_kv_private);
206         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
207         if (ldb_kv->idxptr == NULL) {
208                 return ldb_oom(ldb_module_get_ctx(module));
209         }
210
211         return LDB_SUCCESS;
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
219                                          const struct ldb_val *v2)
220 {
221         if (v1->length > v2->length) {
222                 return -1;
223         }
224         if (v1->length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1->data, v2->data, v1->length);
228 }
229
230 /*
231   see if two ldb_val structures contain exactly the same data
232   return -1 or 1 for a mismatch, 0 for match
233 */
234 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
235                                        const struct ldb_val *v2)
236 {
237         if (v1.length > v2->length) {
238                 return -1;
239         }
240         if (v1.length < v2->length) {
241                 return 1;
242         }
243         return memcmp(v1.data, v2->data, v1.length);
244 }
245
246
247 /*
248   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
249   binary-safe comparison for the 'dn' returns -1 if not found
250
251   This is therefore safe when the value is a GUID in the future
252  */
253 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
254                                    const struct dn_list *list,
255                                    const struct ldb_val *v)
256 {
257         unsigned int i;
258         struct ldb_val *exact = NULL, *next = NULL;
259
260         if (ldb_kv->cache->GUID_index_attribute == NULL) {
261                 for (i=0; i<list->count; i++) {
262                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
263                                 return i;
264                         }
265                 }
266                 return -1;
267         }
268
269         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
270                                 *v, ldb_val_equal_exact_ordered,
271                                 exact, next);
272         if (exact == NULL) {
273                 return -1;
274         }
275         /* Not required, but keeps the compiler quiet */
276         if (next != NULL) {
277                 return -1;
278         }
279
280         i = exact - list->dn;
281         return i;
282 }
283
284 /*
285   find a entry in a dn_list. Uses a case sensitive comparison with the dn
286   returns -1 if not found
287  */
288 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
289                                    struct dn_list *list,
290                                    const struct ldb_message *msg)
291 {
292         struct ldb_val v;
293         const struct ldb_val *key_val;
294         if (ldb_kv->cache->GUID_index_attribute == NULL) {
295                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
296                 v.data = discard_const_p(unsigned char, dn_str);
297                 v.length = strlen(dn_str);
298         } else {
299                 key_val = ldb_msg_find_ldb_val(
300                     msg, ldb_kv->cache->GUID_index_attribute);
301                 if (key_val == NULL) {
302                         return -1;
303                 }
304                 v = *key_val;
305         }
306         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
307 }
308
309 /*
310   this is effectively a cast function, but with lots of paranoia
311   checks and also copes with CPUs that are fussy about pointer
312   alignment
313  */
314 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
315                                            TDB_DATA rec,
316                                            bool check_parent)
317 {
318         struct dn_list *list;
319         if (rec.dsize != sizeof(void *)) {
320                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
321                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
322                 return NULL;
323         }
324         /* note that we can't just use a cast here, as rec.dptr may
325            not be aligned sufficiently for a pointer. A cast would cause
326            platforms like some ARM CPUs to crash */
327         memcpy(&list, rec.dptr, sizeof(void *));
328         list = talloc_get_type(list, struct dn_list);
329         if (list == NULL) {
330                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
331                                        "Bad type '%s' for idxptr",
332                                        talloc_get_name(list));
333                 return NULL;
334         }
335         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
336                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
337                                        "Bad parent '%s' for idxptr",
338                                        talloc_get_name(talloc_parent(list->dn)));
339                 return NULL;
340         }
341         return list;
342 }
343
344 /*
345   return the @IDX list in an index entry for a dn as a
346   struct dn_list
347  */
348 static int ldb_kv_dn_list_load(struct ldb_module *module,
349                                struct ldb_kv_private *ldb_kv,
350                                struct ldb_dn *dn,
351                                struct dn_list *list)
352 {
353         struct ldb_message *msg;
354         int ret, version;
355         struct ldb_message_element *el;
356         TDB_DATA rec;
357         struct dn_list *list2;
358         TDB_DATA key;
359
360         list->dn = NULL;
361         list->count = 0;
362
363         /* see if we have any in-memory index entries */
364         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
365                 goto normal_index;
366         }
367
368         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
369         key.dsize = strlen((char *)key.dptr);
370
371         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
372         if (rec.dptr == NULL) {
373                 goto normal_index;
374         }
375
376         /* we've found an in-memory index entry */
377         list2 = ldb_kv_index_idxptr(module, rec, true);
378         if (list2 == NULL) {
379                 free(rec.dptr);
380                 return LDB_ERR_OPERATIONS_ERROR;
381         }
382         free(rec.dptr);
383
384         *list = *list2;
385         return LDB_SUCCESS;
386
387 normal_index:
388         msg = ldb_msg_new(list);
389         if (msg == NULL) {
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392
393         ret = ldb_kv_search_dn1(module,
394                                 dn,
395                                 msg,
396                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
397                                     LDB_UNPACK_DATA_FLAG_NO_DN);
398         if (ret != LDB_SUCCESS) {
399                 talloc_free(msg);
400                 return ret;
401         }
402
403         el = ldb_msg_find_element(msg, LDB_KV_IDX);
404         if (!el) {
405                 talloc_free(msg);
406                 return LDB_SUCCESS;
407         }
408
409         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
410
411         /*
412          * we avoid copying the strings by stealing the list.  We have
413          * to steal msg onto el->values (which looks odd) because we
414          * asked for the memory to be allocated on msg, not on each
415          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
416          */
417         if (ldb_kv->cache->GUID_index_attribute == NULL) {
418                 /* check indexing version number */
419                 if (version != LDB_KV_INDEXING_VERSION) {
420                         ldb_debug_set(ldb_module_get_ctx(module),
421                                       LDB_DEBUG_ERROR,
422                                       "Wrong DN index version %d "
423                                       "expected %d for %s",
424                                       version, LDB_KV_INDEXING_VERSION,
425                                       ldb_dn_get_linearized(dn));
426                         talloc_free(msg);
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 talloc_steal(el->values, msg);
431                 list->dn = talloc_steal(list, el->values);
432                 list->count = el->num_values;
433         } else {
434                 unsigned int i;
435                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
436                         /* This is quite likely during the DB startup
437                            on first upgrade to using a GUID index */
438                         ldb_debug_set(ldb_module_get_ctx(module),
439                                       LDB_DEBUG_ERROR,
440                                       "Wrong GUID index version %d "
441                                       "expected %d for %s",
442                                       version, LDB_KV_GUID_INDEXING_VERSION,
443                                       ldb_dn_get_linearized(dn));
444                         talloc_free(msg);
445                         return LDB_ERR_OPERATIONS_ERROR;
446                 }
447
448                 if (el->num_values == 0) {
449                         talloc_free(msg);
450                         return LDB_ERR_OPERATIONS_ERROR;
451                 }
452
453                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
459                 list->dn = talloc_array(list, struct ldb_val, list->count);
460                 if (list->dn == NULL) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 /*
466                  * The actual data is on msg, due to
467                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
468                  */
469                 talloc_steal(list->dn, msg);
470                 for (i = 0; i < list->count; i++) {
471                         list->dn[i].data
472                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
473                         list->dn[i].length = LDB_KV_GUID_SIZE;
474                 }
475         }
476
477         /* We don't need msg->elements any more */
478         talloc_free(msg->elements);
479         return LDB_SUCCESS;
480 }
481
482 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
483                            struct ldb_kv_private *ldb_kv,
484                            TALLOC_CTX *mem_ctx,
485                            struct ldb_dn *dn,
486                            struct ldb_val *ldb_key)
487 {
488         struct ldb_context *ldb = ldb_module_get_ctx(module);
489         int ret;
490         int index = 0;
491         enum key_truncation truncation = KEY_NOT_TRUNCATED;
492         struct dn_list *list = talloc(mem_ctx, struct dn_list);
493         if (list == NULL) {
494                 ldb_oom(ldb);
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
499         if (ret != LDB_SUCCESS) {
500                 TALLOC_FREE(list);
501                 return ret;
502         }
503
504         if (list->count == 0) {
505                 TALLOC_FREE(list);
506                 return LDB_ERR_NO_SUCH_OBJECT;
507         }
508
509         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
510                 const char *dn_str = ldb_dn_get_linearized(dn);
511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
512                                        __location__
513                                        ": Failed to read DN index "
514                                        "against %s for %s: too many "
515                                        "values (%u > 1)",
516                                        ldb_kv->cache->GUID_index_attribute,
517                                        dn_str,
518                                        list->count);
519                 TALLOC_FREE(list);
520                 return LDB_ERR_CONSTRAINT_VIOLATION;
521         }
522
523         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
524                 /*
525                  * DN key has been truncated, need to inspect the actual
526                  * records to locate the actual DN
527                  */
528                 int i;
529                 index = -1;
530                 for (i=0; i < list->count; i++) {
531                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
532                         struct ldb_val key = {
533                                 .data = guid_key,
534                                 .length = sizeof(guid_key)
535                         };
536                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
537                         struct ldb_message *rec = ldb_msg_new(ldb);
538                         if (rec == NULL) {
539                                 TALLOC_FREE(list);
540                                 return LDB_ERR_OPERATIONS_ERROR;
541                         }
542
543                         ret = ldb_kv_idx_to_key(
544                             module, ldb_kv, ldb, &list->dn[i], &key);
545                         if (ret != LDB_SUCCESS) {
546                                 TALLOC_FREE(list);
547                                 TALLOC_FREE(rec);
548                                 return ret;
549                         }
550
551                         ret =
552                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
553                         if (key.data != guid_key) {
554                                 TALLOC_FREE(key.data);
555                         }
556                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
557                                 /*
558                                  * the record has disappeared?
559                                  * yes, this can happen
560                                  */
561                                 TALLOC_FREE(rec);
562                                 continue;
563                         }
564
565                         if (ret != LDB_SUCCESS) {
566                                 /* an internal error */
567                                 TALLOC_FREE(rec);
568                                 TALLOC_FREE(list);
569                                 return LDB_ERR_OPERATIONS_ERROR;
570                         }
571
572                         /*
573                          * We found the actual DN that we wanted from in the
574                          * multiple values that matched the index
575                          * (due to truncation), so return that.
576                          *
577                          */
578                         if (ldb_dn_compare(dn, rec->dn) == 0) {
579                                 index = i;
580                                 TALLOC_FREE(rec);
581                                 break;
582                         }
583                 }
584
585                 /*
586                  * We matched the index but the actual DN we wanted
587                  * was not here.
588                  */
589                 if (index == -1) {
590                         TALLOC_FREE(list);
591                         return LDB_ERR_NO_SUCH_OBJECT;
592                 }
593         }
594
595         /* The ldb_key memory is allocated by the caller */
596         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
597         TALLOC_FREE(list);
598
599         if (ret != LDB_SUCCESS) {
600                 return LDB_ERR_OPERATIONS_ERROR;
601         }
602
603         return LDB_SUCCESS;
604 }
605
606
607
608 /*
609   save a dn_list into a full @IDX style record
610  */
611 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
612                                      struct ldb_kv_private *ldb_kv,
613                                      struct ldb_dn *dn,
614                                      struct dn_list *list)
615 {
616         struct ldb_message *msg;
617         int ret;
618
619         msg = ldb_msg_new(module);
620         if (!msg) {
621                 return ldb_module_oom(module);
622         }
623
624         msg->dn = dn;
625
626         if (list->count == 0) {
627                 ret = ldb_kv_delete_noindex(module, msg);
628                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
629                         ret = LDB_SUCCESS;
630                 }
631                 talloc_free(msg);
632                 return ret;
633         }
634
635         if (ldb_kv->cache->GUID_index_attribute == NULL) {
636                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
637                                       LDB_KV_INDEXING_VERSION);
638                 if (ret != LDB_SUCCESS) {
639                         talloc_free(msg);
640                         return ldb_module_oom(module);
641                 }
642         } else {
643                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
644                                       LDB_KV_GUID_INDEXING_VERSION);
645                 if (ret != LDB_SUCCESS) {
646                         talloc_free(msg);
647                         return ldb_module_oom(module);
648                 }
649         }
650
651         if (list->count > 0) {
652                 struct ldb_message_element *el;
653
654                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
655                 if (ret != LDB_SUCCESS) {
656                         talloc_free(msg);
657                         return ldb_module_oom(module);
658                 }
659
660                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
661                         el->values = list->dn;
662                         el->num_values = list->count;
663                 } else {
664                         struct ldb_val v;
665                         unsigned int i;
666                         el->values = talloc_array(msg,
667                                                   struct ldb_val, 1);
668                         if (el->values == NULL) {
669                                 talloc_free(msg);
670                                 return ldb_module_oom(module);
671                         }
672
673                         v.data = talloc_array_size(el->values,
674                                                    list->count,
675                                                    LDB_KV_GUID_SIZE);
676                         if (v.data == NULL) {
677                                 talloc_free(msg);
678                                 return ldb_module_oom(module);
679                         }
680
681                         v.length = talloc_get_size(v.data);
682
683                         for (i = 0; i < list->count; i++) {
684                                 if (list->dn[i].length !=
685                                     LDB_KV_GUID_SIZE) {
686                                         talloc_free(msg);
687                                         return ldb_module_operr(module);
688                                 }
689                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
690                                        list->dn[i].data,
691                                        LDB_KV_GUID_SIZE);
692                         }
693                         el->values[0] = v;
694                         el->num_values = 1;
695                 }
696         }
697
698         ret = ldb_kv_store(module, msg, TDB_REPLACE);
699         talloc_free(msg);
700         return ret;
701 }
702
703 /*
704   save a dn_list into the database, in either @IDX or internal format
705  */
706 static int ldb_kv_dn_list_store(struct ldb_module *module,
707                                 struct ldb_dn *dn,
708                                 struct dn_list *list)
709 {
710         struct ldb_kv_private *ldb_kv = talloc_get_type(
711             ldb_module_get_private(module), struct ldb_kv_private);
712         TDB_DATA rec, key;
713         int ret;
714         struct dn_list *list2;
715
716         if (ldb_kv->idxptr == NULL) {
717                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
718         }
719
720         if (ldb_kv->idxptr->itdb == NULL) {
721                 ldb_kv->idxptr->itdb =
722                     tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
723                 if (ldb_kv->idxptr->itdb == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
729         if (key.dptr == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         key.dsize = strlen((char *)key.dptr);
733
734         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
735         if (rec.dptr != NULL) {
736                 list2 = ldb_kv_index_idxptr(module, rec, false);
737                 if (list2 == NULL) {
738                         free(rec.dptr);
739                         return LDB_ERR_OPERATIONS_ERROR;
740                 }
741                 free(rec.dptr);
742                 list2->dn = talloc_steal(list2, list->dn);
743                 list2->count = list->count;
744                 return LDB_SUCCESS;
745         }
746
747         list2 = talloc(ldb_kv->idxptr, struct dn_list);
748         if (list2 == NULL) {
749                 return LDB_ERR_OPERATIONS_ERROR;
750         }
751         list2->dn = talloc_steal(list2, list->dn);
752         list2->count = list->count;
753
754         rec.dptr = (uint8_t *)&list2;
755         rec.dsize = sizeof(void *);
756
757
758         /*
759          * This is not a store into the main DB, but into an in-memory
760          * TDB, so we don't need a guard on ltdb->read_only
761          */
762         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
763         if (ret != 0) {
764                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
765         }
766         return LDB_SUCCESS;
767 }
768
769 /*
770   traverse function for storing the in-memory index entries on disk
771  */
772 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
773                                        TDB_DATA key,
774                                        TDB_DATA data,
775                                        void *state)
776 {
777         struct ldb_module *module = state;
778         struct ldb_kv_private *ldb_kv = talloc_get_type(
779             ldb_module_get_private(module), struct ldb_kv_private);
780         struct ldb_dn *dn;
781         struct ldb_context *ldb = ldb_module_get_ctx(module);
782         struct ldb_val v;
783         struct dn_list *list;
784
785         list = ldb_kv_index_idxptr(module, data, true);
786         if (list == NULL) {
787                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
788                 return -1;
789         }
790
791         v.data = key.dptr;
792         v.length = strnlen((char *)key.dptr, key.dsize);
793
794         dn = ldb_dn_from_ldb_val(module, ldb, &v);
795         if (dn == NULL) {
796                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
797                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
798                 return -1;
799         }
800
801         ldb_kv->idxptr->error =
802             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
803         talloc_free(dn);
804         if (ldb_kv->idxptr->error != 0) {
805                 return -1;
806         }
807         return 0;
808 }
809
810 /* cleanup the idxptr mode when transaction commits */
811 int ldb_kv_index_transaction_commit(struct ldb_module *module)
812 {
813         struct ldb_kv_private *ldb_kv = talloc_get_type(
814             ldb_module_get_private(module), struct ldb_kv_private);
815         int ret;
816
817         struct ldb_context *ldb = ldb_module_get_ctx(module);
818
819         ldb_reset_err_string(ldb);
820
821         if (ldb_kv->idxptr->itdb) {
822                 tdb_traverse(
823                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
824                 tdb_close(ldb_kv->idxptr->itdb);
825         }
826
827         ret = ldb_kv->idxptr->error;
828         if (ret != LDB_SUCCESS) {
829                 if (!ldb_errstring(ldb)) {
830                         ldb_set_errstring(ldb, ldb_strerror(ret));
831                 }
832                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
833         }
834
835         talloc_free(ldb_kv->idxptr);
836         ldb_kv->idxptr = NULL;
837         return ret;
838 }
839
840 /* cleanup the idxptr mode when transaction cancels */
841 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
842 {
843         struct ldb_kv_private *ldb_kv = talloc_get_type(
844             ldb_module_get_private(module), struct ldb_kv_private);
845         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
846                 tdb_close(ldb_kv->idxptr->itdb);
847         }
848         talloc_free(ldb_kv->idxptr);
849         ldb_kv->idxptr = NULL;
850         return LDB_SUCCESS;
851 }
852
853
854 /*
855   return the dn key to be used for an index
856   the caller is responsible for freeing
857 */
858 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
859                                        struct ldb_kv_private *ldb_kv,
860                                        const char *attr,
861                                        const struct ldb_val *value,
862                                        const struct ldb_schema_attribute **ap,
863                                        enum key_truncation *truncation)
864 {
865         struct ldb_dn *ret;
866         struct ldb_val v;
867         const struct ldb_schema_attribute *a = NULL;
868         char *attr_folded = NULL;
869         const char *attr_for_dn = NULL;
870         int r;
871         bool should_b64_encode;
872
873         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
874         size_t key_len = 0;
875         size_t attr_len = 0;
876         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
877         unsigned frmt_len = 0;
878         const size_t additional_key_length = 4;
879         unsigned int num_separators = 3; /* Estimate for overflow check */
880         const size_t min_data = 1;
881         const size_t min_key_length = additional_key_length
882                 + indx_len + num_separators + min_data;
883
884         if (attr[0] == '@') {
885                 attr_for_dn = attr;
886                 v = *value;
887                 if (ap != NULL) {
888                         *ap = NULL;
889                 }
890         } else {
891                 attr_folded = ldb_attr_casefold(ldb, attr);
892                 if (!attr_folded) {
893                         return NULL;
894                 }
895
896                 attr_for_dn = attr_folded;
897
898                 a = ldb_schema_attribute_by_name(ldb, attr);
899                 if (ap) {
900                         *ap = a;
901                 }
902                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
903                 if (r != LDB_SUCCESS) {
904                         const char *errstr = ldb_errstring(ldb);
905                         /* canonicalisation can be refused. For
906                            example, a attribute that takes wildcards
907                            will refuse to canonicalise if the value
908                            contains a wildcard */
909                         ldb_asprintf_errstring(ldb,
910                                                "Failed to create index "
911                                                "key for attribute '%s':%s%s%s",
912                                                attr, ldb_strerror(r),
913                                                (errstr?":":""),
914                                                (errstr?errstr:""));
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918         }
919         attr_len = strlen(attr_for_dn);
920
921         /*
922          * Check if there is any hope this will fit into the DB.
923          * Overflow here is not actually critical the code below
924          * checks again to make the printf and the DB does another
925          * check for too long keys
926          */
927         if (max_key_length - attr_len < min_key_length) {
928                 ldb_asprintf_errstring(
929                         ldb,
930                         __location__ ": max_key_length "
931                         "is too small (%u) < (%u)",
932                         max_key_length,
933                         (unsigned)(min_key_length + attr_len));
934                 talloc_free(attr_folded);
935                 return NULL;
936         }
937
938         /*
939          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
940          * "DN=" and a trailing string terminator
941          */
942         max_key_length -= additional_key_length;
943
944         /*
945          * We do not base 64 encode a DN in a key, it has already been
946          * casefold and lineraized, that is good enough.  That already
947          * avoids embedded NUL etc.
948          */
949         if (ldb_kv->cache->GUID_index_attribute != NULL) {
950                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
951                         should_b64_encode = false;
952                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
953                         /*
954                          * We can only change the behaviour for IDXONE
955                          * when the GUID index is enabled
956                          */
957                         should_b64_encode = false;
958                 } else {
959                         should_b64_encode
960                                 = ldb_should_b64_encode(ldb, &v);
961                 }
962         } else {
963                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
964         }
965
966         if (should_b64_encode) {
967                 size_t vstr_len = 0;
968                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
969                 if (!vstr) {
970                         talloc_free(attr_folded);
971                         return NULL;
972                 }
973                 vstr_len = strlen(vstr);
974                 /*
975                  * Overflow here is not critical as we only use this
976                  * to choose the printf truncation
977                  */
978                 key_len = num_separators + indx_len + attr_len + vstr_len;
979                 if (key_len > max_key_length) {
980                         size_t excess = key_len - max_key_length;
981                         frmt_len = vstr_len - excess;
982                         *truncation = KEY_TRUNCATED;
983                         /*
984                         * Truncated keys are placed in a separate key space
985                         * from the non truncated keys
986                         * Note: the double hash "##" is not a typo and
987                         * indicates that the following value is base64 encoded
988                         */
989                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
990                                              LDB_KV_INDEX, attr_for_dn,
991                                              frmt_len, vstr);
992                 } else {
993                         frmt_len = vstr_len;
994                         *truncation = KEY_NOT_TRUNCATED;
995                         /*
996                          * Note: the double colon "::" is not a typo and
997                          * indicates that the following value is base64 encoded
998                          */
999                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1000                                              LDB_KV_INDEX, attr_for_dn,
1001                                              frmt_len, vstr);
1002                 }
1003                 talloc_free(vstr);
1004         } else {
1005                 /* Only need two seperators */
1006                 num_separators = 2;
1007
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = v.length - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                          * Truncated keys are placed in a separate key space
1019                          * from the non truncated keys
1020                          */
1021                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1022                                              LDB_KV_INDEX, attr_for_dn,
1023                                              frmt_len, (char *)v.data);
1024                 } else {
1025                         frmt_len = v.length;
1026                         *truncation = KEY_NOT_TRUNCATED;
1027                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1028                                              LDB_KV_INDEX, attr_for_dn,
1029                                              frmt_len, (char *)v.data);
1030                 }
1031         }
1032
1033         if (v.data != value->data) {
1034                 talloc_free(v.data);
1035         }
1036         talloc_free(attr_folded);
1037
1038         return ret;
1039 }
1040
1041 /*
1042   see if a attribute value is in the list of indexed attributes
1043 */
1044 static bool ldb_kv_is_indexed(struct ldb_module *module,
1045                               struct ldb_kv_private *ldb_kv,
1046                               const char *attr)
1047 {
1048         struct ldb_context *ldb = ldb_module_get_ctx(module);
1049         unsigned int i;
1050         struct ldb_message_element *el;
1051
1052         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1053             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1054                 /* Implicity covered, this is the index key */
1055                 return false;
1056         }
1057         if (ldb->schema.index_handler_override) {
1058                 const struct ldb_schema_attribute *a
1059                         = ldb_schema_attribute_by_name(ldb, attr);
1060
1061                 if (a == NULL) {
1062                         return false;
1063                 }
1064
1065                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1066                         return true;
1067                 } else {
1068                         return false;
1069                 }
1070         }
1071
1072         if (!ldb_kv->cache->attribute_indexes) {
1073                 return false;
1074         }
1075
1076         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1077         if (el == NULL) {
1078                 return false;
1079         }
1080
1081         /* TODO: this is too expensive! At least use a binary search */
1082         for (i=0; i<el->num_values; i++) {
1083                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1084                         return true;
1085                 }
1086         }
1087         return false;
1088 }
1089
1090 /*
1091   in the following logic functions, the return value is treated as
1092   follows:
1093
1094      LDB_SUCCESS: we found some matching index values
1095
1096      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1097
1098      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1099                                we'll need a full search
1100  */
1101
1102 /*
1103   return a list of dn's that might match a simple indexed search (an
1104   equality search only)
1105  */
1106 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1107                                   struct ldb_kv_private *ldb_kv,
1108                                   const struct ldb_parse_tree *tree,
1109                                   struct dn_list *list)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_dn *dn;
1113         int ret;
1114         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1115
1116         ldb = ldb_module_get_ctx(module);
1117
1118         list->count = 0;
1119         list->dn = NULL;
1120
1121         /* if the attribute isn't in the list of indexed attributes then
1122            this node needs a full search */
1123         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* the attribute is indexed. Pull the list of DNs that match the
1128            search criterion */
1129         dn = ldb_kv_index_key(ldb,
1130                               ldb_kv,
1131                               tree->u.equality.attr,
1132                               &tree->u.equality.value,
1133                               NULL,
1134                               &truncation);
1135         /*
1136          * We ignore truncation here and allow multi-valued matches
1137          * as ltdb_search_indexed will filter out the wrong one in
1138          * ltdb_index_filter() which calls ldb_match_message().
1139          */
1140         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1141
1142         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1143         talloc_free(dn);
1144         return ret;
1145 }
1146
1147 static bool list_union(struct ldb_context *ldb,
1148                        struct ldb_kv_private *ldb_kv,
1149                        struct dn_list *list,
1150                        struct dn_list *list2);
1151
1152 /*
1153   return a list of dn's that might match a leaf indexed search
1154  */
1155 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1156                                 struct ldb_kv_private *ldb_kv,
1157                                 const struct ldb_parse_tree *tree,
1158                                 struct dn_list *list)
1159 {
1160         if (ldb_kv->disallow_dn_filter &&
1161             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1162                 /* in AD mode we do not support "(dn=...)" search filters */
1163                 list->dn = NULL;
1164                 list->count = 0;
1165                 return LDB_SUCCESS;
1166         }
1167         if (tree->u.equality.attr[0] == '@') {
1168                 /* Do not allow a indexed search against an @ */
1169                 list->dn = NULL;
1170                 list->count = 0;
1171                 return LDB_SUCCESS;
1172         }
1173         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1174                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1175                 bool valid_dn = false;
1176                 struct ldb_dn *dn
1177                         = ldb_dn_from_ldb_val(list,
1178                                               ldb_module_get_ctx(module),
1179                                               &tree->u.equality.value);
1180                 if (dn == NULL) {
1181                         /* If we can't parse it, no match */
1182                         list->dn = NULL;
1183                         list->count = 0;
1184                         return LDB_SUCCESS;
1185                 }
1186
1187                 valid_dn = ldb_dn_validate(dn);
1188                 if (valid_dn == false) {
1189                         /* If we can't parse it, no match */
1190                         list->dn = NULL;
1191                         list->count = 0;
1192                         return LDB_SUCCESS;
1193                 }
1194
1195                 /*
1196                  * Re-use the same code we use for a SCOPE_BASE
1197                  * search
1198                  *
1199                  * We can't call TALLOC_FREE(dn) as this must belong
1200                  * to list for the memory to remain valid.
1201                  */
1202                 return ldb_kv_index_dn_base_dn(
1203                     module, ldb_kv, dn, list, &truncation);
1204                 /*
1205                  * We ignore truncation here and allow multi-valued matches
1206                  * as ltdb_search_indexed will filter out the wrong one in
1207                  * ltdb_index_filter() which calls ldb_match_message().
1208                  */
1209
1210         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1211                    (ldb_attr_cmp(tree->u.equality.attr,
1212                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1213                 int ret;
1214                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1215                 list->dn = talloc_array(list, struct ldb_val, 1);
1216                 if (list->dn == NULL) {
1217                         ldb_module_oom(module);
1218                         return LDB_ERR_OPERATIONS_ERROR;
1219                 }
1220                 /*
1221                  * We need to go via the canonicalise_fn() to
1222                  * ensure we get the index in binary, rather
1223                  * than a string
1224                  */
1225                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1226                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1227                 if (ret != LDB_SUCCESS) {
1228                         return LDB_ERR_OPERATIONS_ERROR;
1229                 }
1230                 list->count = 1;
1231                 return LDB_SUCCESS;
1232         }
1233
1234         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1235 }
1236
1237
1238 /*
1239   list intersection
1240   list = list & list2
1241 */
1242 static bool list_intersect(struct ldb_context *ldb,
1243                            struct ldb_kv_private *ldb_kv,
1244                            struct dn_list *list,
1245                            const struct dn_list *list2)
1246 {
1247         const struct dn_list *short_list, *long_list;
1248         struct dn_list *list3;
1249         unsigned int i;
1250
1251         if (list->count == 0) {
1252                 /* 0 & X == 0 */
1253                 return true;
1254         }
1255         if (list2->count == 0) {
1256                 /* X & 0 == 0 */
1257                 list->count = 0;
1258                 list->dn = NULL;
1259                 return true;
1260         }
1261
1262         /*
1263          * In both of the below we check for strict and in that
1264          * case do not optimise the intersection of this list,
1265          * we must never return an entry not in this
1266          * list.  This allows the index for
1267          * SCOPE_ONELEVEL to be trusted.
1268          */
1269
1270         /* the indexing code is allowed to return a longer list than
1271            what really matches, as all results are filtered by the
1272            full expression at the end - this shortcut avoids a lot of
1273            work in some cases */
1274         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1275                 return true;
1276         }
1277         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1278                 list->count = list2->count;
1279                 list->dn = list2->dn;
1280                 /* note that list2 may not be the parent of list2->dn,
1281                    as list2->dn may be owned by ltdb->idxptr. In that
1282                    case we expect this reparent call to fail, which is
1283                    OK */
1284                 talloc_reparent(list2, list, list2->dn);
1285                 return true;
1286         }
1287
1288         if (list->count > list2->count) {
1289                 short_list = list2;
1290                 long_list = list;
1291         } else {
1292                 short_list = list;
1293                 long_list = list2;
1294         }
1295
1296         list3 = talloc_zero(list, struct dn_list);
1297         if (list3 == NULL) {
1298                 return false;
1299         }
1300
1301         list3->dn = talloc_array(list3, struct ldb_val,
1302                                  MIN(list->count, list2->count));
1303         if (!list3->dn) {
1304                 talloc_free(list3);
1305                 return false;
1306         }
1307         list3->count = 0;
1308
1309         for (i=0;i<short_list->count;i++) {
1310                 /* For the GUID index case, this is a binary search */
1311                 if (ldb_kv_dn_list_find_val(
1312                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1313                         list3->dn[list3->count] = short_list->dn[i];
1314                         list3->count++;
1315                 }
1316         }
1317
1318         list->strict |= list2->strict;
1319         list->dn = talloc_steal(list, list3->dn);
1320         list->count = list3->count;
1321         talloc_free(list3);
1322
1323         return true;
1324 }
1325
1326
1327 /*
1328   list union
1329   list = list | list2
1330 */
1331 static bool list_union(struct ldb_context *ldb,
1332                        struct ldb_kv_private *ldb_kv,
1333                        struct dn_list *list,
1334                        struct dn_list *list2)
1335 {
1336         struct ldb_val *dn3;
1337         unsigned int i = 0, j = 0, k = 0;
1338
1339         if (list2->count == 0) {
1340                 /* X | 0 == X */
1341                 return true;
1342         }
1343
1344         if (list->count == 0) {
1345                 /* 0 | X == X */
1346                 list->count = list2->count;
1347                 list->dn = list2->dn;
1348                 /* note that list2 may not be the parent of list2->dn,
1349                    as list2->dn may be owned by ltdb->idxptr. In that
1350                    case we expect this reparent call to fail, which is
1351                    OK */
1352                 talloc_reparent(list2, list, list2->dn);
1353                 return true;
1354         }
1355
1356         /*
1357          * Sort the lists (if not in GUID DN mode) so we can do
1358          * the de-duplication during the merge
1359          *
1360          * NOTE: This can sort the in-memory index values, as list or
1361          * list2 might not be a copy!
1362          */
1363         ldb_kv_dn_list_sort(ldb_kv, list);
1364         ldb_kv_dn_list_sort(ldb_kv, list2);
1365
1366         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1367         if (!dn3) {
1368                 ldb_oom(ldb);
1369                 return false;
1370         }
1371
1372         while (i < list->count || j < list2->count) {
1373                 int cmp;
1374                 if (i >= list->count) {
1375                         cmp = 1;
1376                 } else if (j >= list2->count) {
1377                         cmp = -1;
1378                 } else {
1379                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1380                                                           &list2->dn[j]);
1381                 }
1382
1383                 if (cmp < 0) {
1384                         /* Take list */
1385                         dn3[k] = list->dn[i];
1386                         i++;
1387                         k++;
1388                 } else if (cmp > 0) {
1389                         /* Take list2 */
1390                         dn3[k] = list2->dn[j];
1391                         j++;
1392                         k++;
1393                 } else {
1394                         /* Equal, take list */
1395                         dn3[k] = list->dn[i];
1396                         i++;
1397                         j++;
1398                         k++;
1399                 }
1400         }
1401
1402         list->dn = dn3;
1403         list->count = k;
1404
1405         return true;
1406 }
1407
1408 static int ldb_kv_index_dn(struct ldb_module *module,
1409                            struct ldb_kv_private *ldb_kv,
1410                            const struct ldb_parse_tree *tree,
1411                            struct dn_list *list);
1412
1413 /*
1414   process an OR list (a union)
1415  */
1416 static int ldb_kv_index_dn_or(struct ldb_module *module,
1417                               struct ldb_kv_private *ldb_kv,
1418                               const struct ldb_parse_tree *tree,
1419                               struct dn_list *list)
1420 {
1421         struct ldb_context *ldb;
1422         unsigned int i;
1423
1424         ldb = ldb_module_get_ctx(module);
1425
1426         list->dn = NULL;
1427         list->count = 0;
1428
1429         for (i=0; i<tree->u.list.num_elements; i++) {
1430                 struct dn_list *list2;
1431                 int ret;
1432
1433                 list2 = talloc_zero(list, struct dn_list);
1434                 if (list2 == NULL) {
1435                         return LDB_ERR_OPERATIONS_ERROR;
1436                 }
1437
1438                 ret = ldb_kv_index_dn(
1439                     module, ldb_kv, tree->u.list.elements[i], list2);
1440
1441                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1442                         /* X || 0 == X */
1443                         talloc_free(list2);
1444                         continue;
1445                 }
1446
1447                 if (ret != LDB_SUCCESS) {
1448                         /* X || * == * */
1449                         talloc_free(list2);
1450                         return ret;
1451                 }
1452
1453                 if (!list_union(ldb, ldb_kv, list, list2)) {
1454                         talloc_free(list2);
1455                         return LDB_ERR_OPERATIONS_ERROR;
1456                 }
1457         }
1458
1459         if (list->count == 0) {
1460                 return LDB_ERR_NO_SUCH_OBJECT;
1461         }
1462
1463         return LDB_SUCCESS;
1464 }
1465
1466
1467 /*
1468   NOT an index results
1469  */
1470 static int ldb_kv_index_dn_not(struct ldb_module *module,
1471                                struct ldb_kv_private *ldb_kv,
1472                                const struct ldb_parse_tree *tree,
1473                                struct dn_list *list)
1474 {
1475         /* the only way to do an indexed not would be if we could
1476            negate the not via another not or if we knew the total
1477            number of database elements so we could know that the
1478            existing expression covered the whole database.
1479
1480            instead, we just give up, and rely on a full index scan
1481            (unless an outer & manages to reduce the list)
1482         */
1483         return LDB_ERR_OPERATIONS_ERROR;
1484 }
1485
1486 /*
1487  * These things are unique, so avoid a full scan if this is a search
1488  * by GUID, DN or a unique attribute
1489  */
1490 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1491                                 struct ldb_kv_private *ldb_kv,
1492                                 const char *attr)
1493 {
1494         const struct ldb_schema_attribute *a;
1495         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1496                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1497                     0) {
1498                         return true;
1499                 }
1500         }
1501         if (ldb_attr_dn(attr) == 0) {
1502                 return true;
1503         }
1504
1505         a = ldb_schema_attribute_by_name(ldb, attr);
1506         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1507                 return true;
1508         }
1509         return false;
1510 }
1511
1512 /*
1513   process an AND expression (intersection)
1514  */
1515 static int ldb_kv_index_dn_and(struct ldb_module *module,
1516                                struct ldb_kv_private *ldb_kv,
1517                                const struct ldb_parse_tree *tree,
1518                                struct dn_list *list)
1519 {
1520         struct ldb_context *ldb;
1521         unsigned int i;
1522         bool found;
1523
1524         ldb = ldb_module_get_ctx(module);
1525
1526         list->dn = NULL;
1527         list->count = 0;
1528
1529         /* in the first pass we only look for unique simple
1530            equality tests, in the hope of avoiding having to look
1531            at any others */
1532         for (i=0; i<tree->u.list.num_elements; i++) {
1533                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1534                 int ret;
1535
1536                 if (subtree->operation != LDB_OP_EQUALITY ||
1537                     !ldb_kv_index_unique(
1538                         ldb, ldb_kv, subtree->u.equality.attr)) {
1539                         continue;
1540                 }
1541
1542                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1543                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1544                         /* 0 && X == 0 */
1545                         return LDB_ERR_NO_SUCH_OBJECT;
1546                 }
1547                 if (ret == LDB_SUCCESS) {
1548                         /* a unique index match means we can
1549                          * stop. Note that we don't care if we return
1550                          * a few too many objects, due to later
1551                          * filtering */
1552                         return LDB_SUCCESS;
1553                 }
1554         }
1555
1556         /* now do a full intersection */
1557         found = false;
1558
1559         for (i=0; i<tree->u.list.num_elements; i++) {
1560                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1561                 struct dn_list *list2;
1562                 int ret;
1563
1564                 list2 = talloc_zero(list, struct dn_list);
1565                 if (list2 == NULL) {
1566                         return ldb_module_oom(module);
1567                 }
1568
1569                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1570
1571                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1572                         /* X && 0 == 0 */
1573                         list->dn = NULL;
1574                         list->count = 0;
1575                         talloc_free(list2);
1576                         return LDB_ERR_NO_SUCH_OBJECT;
1577                 }
1578
1579                 if (ret != LDB_SUCCESS) {
1580                         /* this didn't adding anything */
1581                         talloc_free(list2);
1582                         continue;
1583                 }
1584
1585                 if (!found) {
1586                         talloc_reparent(list2, list, list->dn);
1587                         list->dn = list2->dn;
1588                         list->count = list2->count;
1589                         found = true;
1590                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1591                         talloc_free(list2);
1592                         return LDB_ERR_OPERATIONS_ERROR;
1593                 }
1594
1595                 if (list->count == 0) {
1596                         list->dn = NULL;
1597                         return LDB_ERR_NO_SUCH_OBJECT;
1598                 }
1599
1600                 if (list->count < 2) {
1601                         /* it isn't worth loading the next part of the tree */
1602                         return LDB_SUCCESS;
1603                 }
1604         }
1605
1606         if (!found) {
1607                 /* none of the attributes were indexed */
1608                 return LDB_ERR_OPERATIONS_ERROR;
1609         }
1610
1611         return LDB_SUCCESS;
1612 }
1613
1614 /*
1615   return a list of matching objects using a one-level index
1616  */
1617 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1618                                 struct ldb_kv_private *ldb_kv,
1619                                 const char *attr,
1620                                 struct ldb_dn *dn,
1621                                 struct dn_list *list,
1622                                 enum key_truncation *truncation)
1623 {
1624         struct ldb_context *ldb;
1625         struct ldb_dn *key;
1626         struct ldb_val val;
1627         int ret;
1628
1629         ldb = ldb_module_get_ctx(module);
1630
1631         /* work out the index key from the parent DN */
1632         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1633         if (val.data == NULL) {
1634                 const char *dn_str = ldb_dn_get_linearized(dn);
1635                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1636                                        __location__
1637                                        ": Failed to get casefold DN "
1638                                        "from: %s",
1639                                        dn_str);
1640                 return LDB_ERR_OPERATIONS_ERROR;
1641         }
1642         val.length = strlen((char *)val.data);
1643         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1644         if (!key) {
1645                 ldb_oom(ldb);
1646                 return LDB_ERR_OPERATIONS_ERROR;
1647         }
1648
1649         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1650         talloc_free(key);
1651         if (ret != LDB_SUCCESS) {
1652                 return ret;
1653         }
1654
1655         if (list->count == 0) {
1656                 return LDB_ERR_NO_SUCH_OBJECT;
1657         }
1658
1659         return LDB_SUCCESS;
1660 }
1661
1662 /*
1663   return a list of matching objects using a one-level index
1664  */
1665 static int ldb_kv_index_dn_one(struct ldb_module *module,
1666                                struct ldb_kv_private *ldb_kv,
1667                                struct ldb_dn *parent_dn,
1668                                struct dn_list *list,
1669                                enum key_truncation *truncation)
1670 {
1671         /*
1672          * Ensure we do not shortcut on intersection for this list.
1673          * We must never be lazy and return an entry not in this
1674          * list.  This allows the index for
1675          * SCOPE_ONELEVEL to be trusted.
1676          */
1677
1678         list->strict = true;
1679         return ldb_kv_index_dn_attr(
1680             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
1681 }
1682
1683 /*
1684   return a list of matching objects using the DN index
1685  */
1686 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1687                                    struct ldb_kv_private *ldb_kv,
1688                                    struct ldb_dn *base_dn,
1689                                    struct dn_list *dn_list,
1690                                    enum key_truncation *truncation)
1691 {
1692         const struct ldb_val *guid_val = NULL;
1693         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1694                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1695                 if (dn_list->dn == NULL) {
1696                         return ldb_module_oom(module);
1697                 }
1698                 dn_list->dn[0].data = discard_const_p(unsigned char,
1699                                                       ldb_dn_get_linearized(base_dn));
1700                 if (dn_list->dn[0].data == NULL) {
1701                         talloc_free(dn_list->dn);
1702                         return ldb_module_oom(module);
1703                 }
1704                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1705                 dn_list->count = 1;
1706
1707                 return LDB_SUCCESS;
1708         }
1709
1710         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1711                 guid_val = ldb_dn_get_extended_component(
1712                     base_dn, ldb_kv->cache->GUID_index_dn_component);
1713         }
1714
1715         if (guid_val != NULL) {
1716                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1717                 if (dn_list->dn == NULL) {
1718                         return ldb_module_oom(module);
1719                 }
1720                 dn_list->dn[0].data = guid_val->data;
1721                 dn_list->dn[0].length = guid_val->length;
1722                 dn_list->count = 1;
1723
1724                 return LDB_SUCCESS;
1725         }
1726
1727         return ldb_kv_index_dn_attr(
1728             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
1729 }
1730
1731 /*
1732   return a list of dn's that might match a indexed search or
1733   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1734  */
1735 static int ldb_kv_index_dn(struct ldb_module *module,
1736                            struct ldb_kv_private *ldb_kv,
1737                            const struct ldb_parse_tree *tree,
1738                            struct dn_list *list)
1739 {
1740         int ret = LDB_ERR_OPERATIONS_ERROR;
1741
1742         switch (tree->operation) {
1743         case LDB_OP_AND:
1744                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1745                 break;
1746
1747         case LDB_OP_OR:
1748                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1749                 break;
1750
1751         case LDB_OP_NOT:
1752                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1753                 break;
1754
1755         case LDB_OP_EQUALITY:
1756                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1757                 break;
1758
1759         case LDB_OP_SUBSTRING:
1760         case LDB_OP_GREATER:
1761         case LDB_OP_LESS:
1762         case LDB_OP_PRESENT:
1763         case LDB_OP_APPROX:
1764         case LDB_OP_EXTENDED:
1765                 /* we can't index with fancy bitops yet */
1766                 ret = LDB_ERR_OPERATIONS_ERROR;
1767                 break;
1768         }
1769
1770         return ret;
1771 }
1772
1773 /*
1774   filter a candidate dn_list from an indexed search into a set of results
1775   extracting just the given attributes
1776 */
1777 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1778                                const struct dn_list *dn_list,
1779                                struct ldb_kv_context *ac,
1780                                uint32_t *match_count,
1781                                enum key_truncation scope_one_truncation)
1782 {
1783         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1784         struct ldb_message *msg;
1785         struct ldb_message *filtered_msg;
1786         unsigned int i;
1787         unsigned int num_keys = 0;
1788         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
1789         struct ldb_val *keys = NULL;
1790
1791         /*
1792          * We have to allocate the key list (rather than just walk the
1793          * caller supplied list) as the callback could change the list
1794          * (by modifying an indexed attribute hosted in the in-memory
1795          * index cache!)
1796          */
1797         keys = talloc_array(ac, struct ldb_val, dn_list->count);
1798         if (keys == NULL) {
1799                 return ldb_module_oom(ac->module);
1800         }
1801
1802         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1803                 /*
1804                  * We speculate that the keys will be GUID based and so
1805                  * pre-fill in enough space for a GUID (avoiding a pile of
1806                  * small allocations)
1807                  */
1808                 struct guid_tdb_key {
1809                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
1810                 } *key_values = NULL;
1811
1812                 key_values = talloc_array(keys,
1813                                           struct guid_tdb_key,
1814                                           dn_list->count);
1815
1816                 if (key_values == NULL) {
1817                         talloc_free(keys);
1818                         return ldb_module_oom(ac->module);
1819                 }
1820                 for (i = 0; i < dn_list->count; i++) {
1821                         keys[i].data = key_values[i].guid_key;
1822                         keys[i].length = sizeof(key_values[i].guid_key);
1823                 }
1824         } else {
1825                 for (i = 0; i < dn_list->count; i++) {
1826                         keys[i].data = NULL;
1827                         keys[i].length = 0;
1828                 }
1829         }
1830
1831         for (i = 0; i < dn_list->count; i++) {
1832                 int ret;
1833
1834                 ret = ldb_kv_idx_to_key(
1835                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1836                 if (ret != LDB_SUCCESS) {
1837                         talloc_free(keys);
1838                         return ret;
1839                 }
1840
1841                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1842                         /*
1843                          * If we are in GUID index mode, then the dn_list is
1844                          * sorted.  If we got a duplicate, forget about it, as
1845                          * otherwise we would send the same entry back more
1846                          * than once.
1847                          *
1848                          * This is needed in the truncated DN case, or if a
1849                          * duplicate was forced in via
1850                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1851                          */
1852
1853                         if (memcmp(previous_guid_key,
1854                                    keys[num_keys].data,
1855                                    sizeof(previous_guid_key)) == 0) {
1856                                 continue;
1857                         }
1858
1859                         memcpy(previous_guid_key,
1860                                keys[num_keys].data,
1861                                sizeof(previous_guid_key));
1862                 }
1863                 num_keys++;
1864         }
1865
1866
1867         /*
1868          * Now that the list is a safe copy, send the callbacks
1869          */
1870         for (i = 0; i < num_keys; i++) {
1871                 int ret;
1872                 bool matched;
1873                 msg = ldb_msg_new(ac);
1874                 if (!msg) {
1875                         talloc_free(keys);
1876                         return LDB_ERR_OPERATIONS_ERROR;
1877                 }
1878
1879                 ret =
1880                     ldb_kv_search_key(ac->module,
1881                                       ldb_kv,
1882                                       keys[i],
1883                                       msg,
1884                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1885                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1886                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1887                         /*
1888                          * the record has disappeared? yes, this can
1889                          * happen if the entry is deleted by something
1890                          * operating in the callback (not another
1891                          * process, as we have a read lock)
1892                          */
1893                         talloc_free(msg);
1894                         continue;
1895                 }
1896
1897                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1898                         /* an internal error */
1899                         talloc_free(keys);
1900                         talloc_free(msg);
1901                         return LDB_ERR_OPERATIONS_ERROR;
1902                 }
1903
1904                 /*
1905                  * We trust the index for LDB_SCOPE_ONELEVEL
1906                  * unless the index key has been truncated.
1907                  *
1908                  * LDB_SCOPE_BASE is not passed in by our only caller.
1909                  */
1910                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
1911                     ldb_kv->cache->one_level_indexes &&
1912                     scope_one_truncation == KEY_NOT_TRUNCATED) {
1913                         ret = ldb_match_message(ldb, msg, ac->tree,
1914                                                 ac->scope, &matched);
1915                 } else {
1916                         ret = ldb_match_msg_error(ldb, msg,
1917                                                   ac->tree, ac->base,
1918                                                   ac->scope, &matched);
1919                 }
1920
1921                 if (ret != LDB_SUCCESS) {
1922                         talloc_free(keys);
1923                         talloc_free(msg);
1924                         return ret;
1925                 }
1926                 if (!matched) {
1927                         talloc_free(msg);
1928                         continue;
1929                 }
1930
1931                 /* filter the attributes that the user wants */
1932                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1933
1934                 talloc_free(msg);
1935
1936                 if (ret == -1) {
1937                         talloc_free(keys);
1938                         return LDB_ERR_OPERATIONS_ERROR;
1939                 }
1940
1941                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1942                 if (ret != LDB_SUCCESS) {
1943                         /* Regardless of success or failure, the msg
1944                          * is the callbacks responsiblity, and should
1945                          * not be talloc_free()'ed */
1946                         ac->request_terminated = true;
1947                         talloc_free(keys);
1948                         return ret;
1949                 }
1950
1951                 (*match_count)++;
1952         }
1953
1954         TALLOC_FREE(keys);
1955         return LDB_SUCCESS;
1956 }
1957
1958 /*
1959   sort a DN list
1960  */
1961 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
1962                                 struct dn_list *list)
1963 {
1964         if (list->count < 2) {
1965                 return;
1966         }
1967
1968         /* We know the list is sorted when using the GUID index */
1969         if (ltdb->cache->GUID_index_attribute != NULL) {
1970                 return;
1971         }
1972
1973         TYPESAFE_QSORT(list->dn, list->count,
1974                        ldb_val_equal_exact_for_qsort);
1975 }
1976
1977 /*
1978   search the database with a LDAP-like expression using indexes
1979   returns -1 if an indexed search is not possible, in which
1980   case the caller should call ltdb_search_full()
1981 */
1982 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1983 {
1984         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1985         struct ldb_kv_private *ldb_kv = talloc_get_type(
1986             ldb_module_get_private(ac->module), struct ldb_kv_private);
1987         struct dn_list *dn_list;
1988         int ret;
1989         enum ldb_scope index_scope;
1990         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1991
1992         /* see if indexing is enabled */
1993         if (!ldb_kv->cache->attribute_indexes &&
1994             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
1995                 /* fallback to a full search */
1996                 return LDB_ERR_OPERATIONS_ERROR;
1997         }
1998
1999         dn_list = talloc_zero(ac, struct dn_list);
2000         if (dn_list == NULL) {
2001                 return ldb_module_oom(ac->module);
2002         }
2003
2004         /*
2005          * For the purposes of selecting the switch arm below, if we
2006          * don't have a one-level index then treat it like a subtree
2007          * search
2008          */
2009         if (ac->scope == LDB_SCOPE_ONELEVEL &&
2010             !ldb_kv->cache->one_level_indexes) {
2011                 index_scope = LDB_SCOPE_SUBTREE;
2012         } else {
2013                 index_scope = ac->scope;
2014         }
2015
2016         switch (index_scope) {
2017         case LDB_SCOPE_BASE:
2018                 /*
2019                  * The only caller will have filtered the operation out
2020                  * so we should never get here
2021                  */
2022                 return ldb_operr(ldb);
2023
2024         case LDB_SCOPE_ONELEVEL:
2025
2026                 /*
2027                  * First, load all the one-level child objects (regardless of
2028                  * whether they match the search filter or not). The database
2029                  * maintains a one-level index, so retrieving this is quick.
2030                  */
2031                 ret = ldb_kv_index_dn_one(ac->module,
2032                                           ldb_kv,
2033                                           ac->base,
2034                                           dn_list,
2035                                           &scope_one_truncation);
2036                 if (ret != LDB_SUCCESS) {
2037                         talloc_free(dn_list);
2038                         return ret;
2039                 }
2040
2041                 /*
2042                  * If we have too many children, running ldb_kv_index_filter()
2043                  * over all the child objects can be quite expensive. So next
2044                  * we do a separate indexed query using the search filter.
2045                  *
2046                  * This should be quick, but it may return objects that are not
2047                  * the direct one-level child objects we're interested in.
2048                  *
2049                  * We only do this in the GUID index mode, which is
2050                  * O(n*log(m)) otherwise the intersection below will
2051                  * be too costly at O(n*m).
2052                  *
2053                  * We don't set a heuristic for 'too many' but instead
2054                  * do it always and rely on the index lookup being
2055                  * fast enough in the small case.
2056                  */
2057                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2058                         struct dn_list *indexed_search_result
2059                                 = talloc_zero(ac, struct dn_list);
2060                         if (indexed_search_result == NULL) {
2061                                 talloc_free(dn_list);
2062                                 return ldb_module_oom(ac->module);
2063                         }
2064
2065                         if (!ldb_kv->cache->attribute_indexes) {
2066                                 talloc_free(indexed_search_result);
2067                                 talloc_free(dn_list);
2068                                 return LDB_ERR_OPERATIONS_ERROR;
2069                         }
2070
2071                         /*
2072                          * Try to do an indexed database search
2073                          */
2074                         ret = ldb_kv_index_dn(
2075                             ac->module, ldb_kv, ac->tree,
2076                             indexed_search_result);
2077
2078                         /*
2079                          * We can stop if we're sure the object doesn't exist
2080                          */
2081                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2082                                 talloc_free(indexed_search_result);
2083                                 talloc_free(dn_list);
2084                                 return LDB_ERR_NO_SUCH_OBJECT;
2085                         }
2086
2087                         /*
2088                          * Once we have a successful search result, we
2089                          * intersect it with the one-level children (dn_list).
2090                          * This should give us exactly the result we're after
2091                          * (we still need to run ldb_kv_index_filter() to
2092                          * handle potential index truncation cases).
2093                          *
2094                          * The indexed search may fail because we don't support
2095                          * indexing on that type of search operation, e.g.
2096                          * matching against '*'. In which case we fall through
2097                          * and run ldb_kv_index_filter() over all the one-level
2098                          * children (which is still better than bailing out here
2099                          * and falling back to a full DB scan).
2100                          */
2101                         if (ret == LDB_SUCCESS) {
2102                                 if (!list_intersect(ldb,
2103                                                     ldb_kv,
2104                                                     dn_list,
2105                                                     indexed_search_result)) {
2106                                         talloc_free(indexed_search_result);
2107                                         talloc_free(dn_list);
2108                                         return LDB_ERR_OPERATIONS_ERROR;
2109                                 }
2110                         }
2111                 }
2112                 break;
2113
2114         case LDB_SCOPE_SUBTREE:
2115         case LDB_SCOPE_DEFAULT:
2116                 if (!ldb_kv->cache->attribute_indexes) {
2117                         talloc_free(dn_list);
2118                         return LDB_ERR_OPERATIONS_ERROR;
2119                 }
2120                 /*
2121                  * Here we load the index for the tree.  We have no
2122                  * index for the subtree.
2123                  */
2124                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2125                 if (ret != LDB_SUCCESS) {
2126                         talloc_free(dn_list);
2127                         return ret;
2128                 }
2129                 break;
2130         }
2131
2132         /*
2133          * It is critical that this function do the re-filter even
2134          * on things found by the index as the index can over-match
2135          * in cases of truncation (as well as when it decides it is
2136          * not worth further filtering)
2137          *
2138          * If this changes, then the index code above would need to
2139          * pass up a flag to say if any index was truncated during
2140          * processing as the truncation here refers only to the
2141          * SCOPE_ONELEVEL index.
2142          */
2143         ret = ldb_kv_index_filter(
2144             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2145         talloc_free(dn_list);
2146         return ret;
2147 }
2148
2149 /**
2150  * @brief Add a DN in the index list of a given attribute name/value pair
2151  *
2152  * This function will add the DN in the index list for the index for
2153  * the given attribute name and value.
2154  *
2155  * @param[in]  module       A ldb_module structure
2156  *
2157  * @param[in]  dn           The string representation of the DN as it
2158  *                          will be stored in the index entry
2159  *
2160  * @param[in]  el           A ldb_message_element array, one of the entry
2161  *                          referred by the v_idx is the attribute name and
2162  *                          value pair which will be used to construct the
2163  *                          index name
2164  *
2165  * @param[in]  v_idx        The index of element in the el array to use
2166  *
2167  * @return                  An ldb error code
2168  */
2169 static int ldb_kv_index_add1(struct ldb_module *module,
2170                              struct ldb_kv_private *ldb_kv,
2171                              const struct ldb_message *msg,
2172                              struct ldb_message_element *el,
2173                              int v_idx)
2174 {
2175         struct ldb_context *ldb;
2176         struct ldb_dn *dn_key;
2177         int ret;
2178         const struct ldb_schema_attribute *a;
2179         struct dn_list *list;
2180         unsigned alloc_len;
2181         enum key_truncation truncation = KEY_TRUNCATED;
2182
2183
2184         ldb = ldb_module_get_ctx(module);
2185
2186         list = talloc_zero(module, struct dn_list);
2187         if (list == NULL) {
2188                 return LDB_ERR_OPERATIONS_ERROR;
2189         }
2190
2191         dn_key = ldb_kv_index_key(
2192             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2193         if (!dn_key) {
2194                 talloc_free(list);
2195                 return LDB_ERR_OPERATIONS_ERROR;
2196         }
2197         /*
2198          * Samba only maintains unique indexes on the objectSID and objectGUID
2199          * so if a unique index key exceeds the maximum length there is a
2200          * problem.
2201          */
2202         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2203                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2204                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2205
2206                 ldb_asprintf_errstring(
2207                     ldb,
2208                     __location__ ": unique index key on %s in %s, "
2209                                  "exceeds maximum key length of %u (encoded).",
2210                     el->name,
2211                     ldb_dn_get_linearized(msg->dn),
2212                     ldb_kv->max_key_length);
2213                 talloc_free(list);
2214                 return LDB_ERR_CONSTRAINT_VIOLATION;
2215         }
2216         talloc_steal(list, dn_key);
2217
2218         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2219         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2220                 talloc_free(list);
2221                 return ret;
2222         }
2223
2224         /*
2225          * Check for duplicates in the @IDXDN DN -> GUID record
2226          *
2227          * This is very normal, it just means a duplicate DN creation
2228          * was attempted, so don't set the error string or print scary
2229          * messages.
2230          */
2231         if (list->count > 0 &&
2232             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2233             truncation == KEY_NOT_TRUNCATED) {
2234
2235                 talloc_free(list);
2236                 return LDB_ERR_CONSTRAINT_VIOLATION;
2237
2238         } else if (list->count > 0
2239                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2240
2241                 /*
2242                  * At least one existing entry in the DN->GUID index, which
2243                  * arises when the DN indexes have been truncated
2244                  *
2245                  * So need to pull the DN's to check if it's really a duplicate
2246                  */
2247                 int i;
2248                 for (i=0; i < list->count; i++) {
2249                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2250                         struct ldb_val key = {
2251                                 .data = guid_key,
2252                                 .length = sizeof(guid_key)
2253                         };
2254                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2255                         struct ldb_message *rec = ldb_msg_new(ldb);
2256                         if (rec == NULL) {
2257                                 return LDB_ERR_OPERATIONS_ERROR;
2258                         }
2259
2260                         ret = ldb_kv_idx_to_key(
2261                             module, ldb_kv, ldb, &list->dn[i], &key);
2262                         if (ret != LDB_SUCCESS) {
2263                                 TALLOC_FREE(list);
2264                                 TALLOC_FREE(rec);
2265                                 return ret;
2266                         }
2267
2268                         ret =
2269                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2270                         if (key.data != guid_key) {
2271                                 TALLOC_FREE(key.data);
2272                         }
2273                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2274                                 /*
2275                                  * the record has disappeared?
2276                                  * yes, this can happen
2277                                  */
2278                                 talloc_free(rec);
2279                                 continue;
2280                         }
2281
2282                         if (ret != LDB_SUCCESS) {
2283                                 /* an internal error */
2284                                 TALLOC_FREE(rec);
2285                                 TALLOC_FREE(list);
2286                                 return LDB_ERR_OPERATIONS_ERROR;
2287                         }
2288                         /*
2289                          * The DN we are trying to add to the DB and index
2290                          * is already here, so we must deny the addition
2291                          */
2292                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2293                                 TALLOC_FREE(rec);
2294                                 TALLOC_FREE(list);
2295                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2296                         }
2297                 }
2298         }
2299
2300         /*
2301          * Check for duplicates in unique indexes
2302          *
2303          * We don't need to do a loop test like the @IDXDN case
2304          * above as we have a ban on long unique index values
2305          * at the start of this function.
2306          */
2307         if (list->count > 0 &&
2308             ((a != NULL
2309               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2310                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2311                 /*
2312                  * We do not want to print info about a possibly
2313                  * confidential DN that the conflict was with in the
2314                  * user-visible error string
2315                  */
2316
2317                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2318                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2319                                   __location__
2320                                   ": unique index violation on %s in %s, "
2321                                   "conflicts with %*.*s in %s",
2322                                   el->name, ldb_dn_get_linearized(msg->dn),
2323                                   (int)list->dn[0].length,
2324                                   (int)list->dn[0].length,
2325                                   list->dn[0].data,
2326                                   ldb_dn_get_linearized(dn_key));
2327                 } else {
2328                         /* This can't fail, gives a default at worst */
2329                         const struct ldb_schema_attribute *attr =
2330                             ldb_schema_attribute_by_name(
2331                                 ldb, ldb_kv->cache->GUID_index_attribute);
2332                         struct ldb_val v;
2333                         ret = attr->syntax->ldif_write_fn(ldb, list,
2334                                                           &list->dn[0], &v);
2335                         if (ret == LDB_SUCCESS) {
2336                                 ldb_debug(ldb,
2337                                           LDB_DEBUG_WARNING,
2338                                           __location__
2339                                           ": unique index violation on %s in "
2340                                           "%s, conflicts with %s %*.*s in %s",
2341                                           el->name,
2342                                           ldb_dn_get_linearized(msg->dn),
2343                                           ldb_kv->cache->GUID_index_attribute,
2344                                           (int)v.length,
2345                                           (int)v.length,
2346                                           v.data,
2347                                           ldb_dn_get_linearized(dn_key));
2348                         }
2349                 }
2350                 ldb_asprintf_errstring(ldb,
2351                                        __location__ ": unique index violation "
2352                                        "on %s in %s",
2353                                        el->name,
2354                                        ldb_dn_get_linearized(msg->dn));
2355                 talloc_free(list);
2356                 return LDB_ERR_CONSTRAINT_VIOLATION;
2357         }
2358
2359         /* overallocate the list a bit, to reduce the number of
2360          * realloc trigered copies */
2361         alloc_len = ((list->count+1)+7) & ~7;
2362         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2363         if (list->dn == NULL) {
2364                 talloc_free(list);
2365                 return LDB_ERR_OPERATIONS_ERROR;
2366         }
2367
2368         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2369                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2370                 list->dn[list->count].data
2371                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2372                 if (list->dn[list->count].data == NULL) {
2373                         talloc_free(list);
2374                         return LDB_ERR_OPERATIONS_ERROR;
2375                 }
2376                 list->dn[list->count].length = strlen(dn_str);
2377         } else {
2378                 const struct ldb_val *key_val;
2379                 struct ldb_val *exact = NULL, *next = NULL;
2380                 key_val = ldb_msg_find_ldb_val(
2381                     msg, ldb_kv->cache->GUID_index_attribute);
2382                 if (key_val == NULL) {
2383                         talloc_free(list);
2384                         return ldb_module_operr(module);
2385                 }
2386
2387                 if (key_val->length != LDB_KV_GUID_SIZE) {
2388                         talloc_free(list);
2389                         return ldb_module_operr(module);
2390                 }
2391
2392                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2393                                         *key_val, ldb_val_equal_exact_ordered,
2394                                         exact, next);
2395
2396                 /*
2397                  * Give a warning rather than fail, this could be a
2398                  * duplicate value in the record allowed by a caller
2399                  * forcing in the value with
2400                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2401                  */
2402                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2403                         /* This can't fail, gives a default at worst */
2404                         const struct ldb_schema_attribute *attr =
2405                             ldb_schema_attribute_by_name(
2406                                 ldb, ldb_kv->cache->GUID_index_attribute);
2407                         struct ldb_val v;
2408                         ret = attr->syntax->ldif_write_fn(ldb, list,
2409                                                           exact, &v);
2410                         if (ret == LDB_SUCCESS) {
2411                                 ldb_debug(ldb,
2412                                           LDB_DEBUG_WARNING,
2413                                           __location__
2414                                           ": duplicate attribute value in %s "
2415                                           "for index on %s, "
2416                                           "duplicate of %s %*.*s in %s",
2417                                           ldb_dn_get_linearized(msg->dn),
2418                                           el->name,
2419                                           ldb_kv->cache->GUID_index_attribute,
2420                                           (int)v.length,
2421                                           (int)v.length,
2422                                           v.data,
2423                                           ldb_dn_get_linearized(dn_key));
2424                         }
2425                 }
2426
2427                 if (next == NULL) {
2428                         next = &list->dn[list->count];
2429                 } else {
2430                         memmove(&next[1], next,
2431                                 sizeof(*next) * (list->count - (next - list->dn)));
2432                 }
2433                 *next = ldb_val_dup(list->dn, key_val);
2434                 if (next->data == NULL) {
2435                         talloc_free(list);
2436                         return ldb_module_operr(module);
2437                 }
2438         }
2439         list->count++;
2440
2441         ret = ldb_kv_dn_list_store(module, dn_key, list);
2442
2443         talloc_free(list);
2444
2445         return ret;
2446 }
2447
2448 /*
2449   add index entries for one elements in a message
2450  */
2451 static int ldb_kv_index_add_el(struct ldb_module *module,
2452                                struct ldb_kv_private *ldb_kv,
2453                                const struct ldb_message *msg,
2454                                struct ldb_message_element *el)
2455 {
2456         unsigned int i;
2457         for (i = 0; i < el->num_values; i++) {
2458                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2459                 if (ret != LDB_SUCCESS) {
2460                         return ret;
2461                 }
2462         }
2463
2464         return LDB_SUCCESS;
2465 }
2466
2467 /*
2468   add index entries for all elements in a message
2469  */
2470 static int ldb_kv_index_add_all(struct ldb_module *module,
2471                                 struct ldb_kv_private *ldb_kv,
2472                                 const struct ldb_message *msg)
2473 {
2474         struct ldb_message_element *elements = msg->elements;
2475         unsigned int i;
2476         const char *dn_str;
2477         int ret;
2478
2479         if (ldb_dn_is_special(msg->dn)) {
2480                 return LDB_SUCCESS;
2481         }
2482
2483         dn_str = ldb_dn_get_linearized(msg->dn);
2484         if (dn_str == NULL) {
2485                 return LDB_ERR_OPERATIONS_ERROR;
2486         }
2487
2488         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2489         if (ret != LDB_SUCCESS) {
2490                 return ret;
2491         }
2492
2493         if (!ldb_kv->cache->attribute_indexes) {
2494                 /* no indexed fields */
2495                 return LDB_SUCCESS;
2496         }
2497
2498         for (i = 0; i < msg->num_elements; i++) {
2499                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2500                         continue;
2501                 }
2502                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2503                 if (ret != LDB_SUCCESS) {
2504                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2505                         ldb_asprintf_errstring(ldb,
2506                                                __location__ ": Failed to re-index %s in %s - %s",
2507                                                elements[i].name, dn_str,
2508                                                ldb_errstring(ldb));
2509                         return ret;
2510                 }
2511         }
2512
2513         return LDB_SUCCESS;
2514 }
2515
2516
2517 /*
2518   insert a DN index for a message
2519 */
2520 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2521                                   struct ldb_kv_private *ldb_kv,
2522                                   const struct ldb_message *msg,
2523                                   struct ldb_dn *dn,
2524                                   const char *index,
2525                                   int add)
2526 {
2527         struct ldb_message_element el;
2528         struct ldb_val val;
2529         int ret;
2530
2531         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2532         if (val.data == NULL) {
2533                 const char *dn_str = ldb_dn_get_linearized(dn);
2534                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2535                                        __location__ ": Failed to modify %s "
2536                                                     "against %s in %s: failed "
2537                                                     "to get casefold DN",
2538                                        index,
2539                                        ldb_kv->cache->GUID_index_attribute,
2540                                        dn_str);
2541                 return LDB_ERR_OPERATIONS_ERROR;
2542         }
2543
2544         val.length = strlen((char *)val.data);
2545         el.name = index;
2546         el.values = &val;
2547         el.num_values = 1;
2548
2549         if (add) {
2550                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2551         } else { /* delete */
2552                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2553         }
2554
2555         if (ret != LDB_SUCCESS) {
2556                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2557                 const char *dn_str = ldb_dn_get_linearized(dn);
2558                 ldb_asprintf_errstring(ldb,
2559                                        __location__ ": Failed to modify %s "
2560                                                     "against %s in %s - %s",
2561                                        index,
2562                                        ldb_kv->cache->GUID_index_attribute,
2563                                        dn_str,
2564                                        ldb_errstring(ldb));
2565                 return ret;
2566         }
2567         return ret;
2568 }
2569
2570 /*
2571   insert a one level index for a message
2572 */
2573 static int ldb_kv_index_onelevel(struct ldb_module *module,
2574                                  const struct ldb_message *msg,
2575                                  int add)
2576 {
2577         struct ldb_kv_private *ldb_kv = talloc_get_type(
2578             ldb_module_get_private(module), struct ldb_kv_private);
2579         struct ldb_dn *pdn;
2580         int ret;
2581
2582         /* We index for ONE Level only if requested */
2583         if (!ldb_kv->cache->one_level_indexes) {
2584                 return LDB_SUCCESS;
2585         }
2586
2587         pdn = ldb_dn_get_parent(module, msg->dn);
2588         if (pdn == NULL) {
2589                 return LDB_ERR_OPERATIONS_ERROR;
2590         }
2591         ret =
2592             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2593
2594         talloc_free(pdn);
2595
2596         return ret;
2597 }
2598
2599 /*
2600   insert a one level index for a message
2601 */
2602 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2603                                       const struct ldb_message *msg,
2604                                       int add)
2605 {
2606         int ret;
2607         struct ldb_kv_private *ldb_kv = talloc_get_type(
2608             ldb_module_get_private(module), struct ldb_kv_private);
2609
2610         /* We index for DN only if using a GUID index */
2611         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2612                 return LDB_SUCCESS;
2613         }
2614
2615         ret = ldb_kv_modify_index_dn(
2616             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2617
2618         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2619                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2620                                        "Entry %s already exists",
2621                                        ldb_dn_get_linearized(msg->dn));
2622                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2623         }
2624         return ret;
2625 }
2626
2627 /*
2628   add the index entries for a new element in a record
2629   The caller guarantees that these element values are not yet indexed
2630 */
2631 int ldb_kv_index_add_element(struct ldb_module *module,
2632                              struct ldb_kv_private *ldb_kv,
2633                              const struct ldb_message *msg,
2634                              struct ldb_message_element *el)
2635 {
2636         if (ldb_dn_is_special(msg->dn)) {
2637                 return LDB_SUCCESS;
2638         }
2639         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2640                 return LDB_SUCCESS;
2641         }
2642         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2643 }
2644
2645 /*
2646   add the index entries for a new record
2647 */
2648 int ldb_kv_index_add_new(struct ldb_module *module,
2649                          struct ldb_kv_private *ldb_kv,
2650                          const struct ldb_message *msg)
2651 {
2652         int ret;
2653
2654         if (ldb_dn_is_special(msg->dn)) {
2655                 return LDB_SUCCESS;
2656         }
2657
2658         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2659         if (ret != LDB_SUCCESS) {
2660                 /*
2661                  * Because we can't trust the caller to be doing
2662                  * transactions properly, clean up any index for this
2663                  * entry rather than relying on a transaction
2664                  * cleanup
2665                  */
2666
2667                 ldb_kv_index_delete(module, msg);
2668                 return ret;
2669         }
2670
2671         ret = ldb_kv_index_onelevel(module, msg, 1);
2672         if (ret != LDB_SUCCESS) {
2673                 /*
2674                  * Because we can't trust the caller to be doing
2675                  * transactions properly, clean up any index for this
2676                  * entry rather than relying on a transaction
2677                  * cleanup
2678                  */
2679                 ldb_kv_index_delete(module, msg);
2680                 return ret;
2681         }
2682         return ret;
2683 }
2684
2685
2686 /*
2687   delete an index entry for one message element
2688 */
2689 int ldb_kv_index_del_value(struct ldb_module *module,
2690                            struct ldb_kv_private *ldb_kv,
2691                            const struct ldb_message *msg,
2692                            struct ldb_message_element *el,
2693                            unsigned int v_idx)
2694 {
2695         struct ldb_context *ldb;
2696         struct ldb_dn *dn_key;
2697         const char *dn_str;
2698         int ret, i;
2699         unsigned int j;
2700         struct dn_list *list;
2701         struct ldb_dn *dn = msg->dn;
2702         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2703
2704         ldb = ldb_module_get_ctx(module);
2705
2706         dn_str = ldb_dn_get_linearized(dn);
2707         if (dn_str == NULL) {
2708                 return LDB_ERR_OPERATIONS_ERROR;
2709         }
2710
2711         if (dn_str[0] == '@') {
2712                 return LDB_SUCCESS;
2713         }
2714
2715         dn_key = ldb_kv_index_key(
2716             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2717         /*
2718          * We ignore key truncation in ltdb_index_add1() so
2719          * match that by ignoring it here as well
2720          *
2721          * Multiple values are legitimate and accepted
2722          */
2723         if (!dn_key) {
2724                 return LDB_ERR_OPERATIONS_ERROR;
2725         }
2726
2727         list = talloc_zero(dn_key, struct dn_list);
2728         if (list == NULL) {
2729                 talloc_free(dn_key);
2730                 return LDB_ERR_OPERATIONS_ERROR;
2731         }
2732
2733         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2734         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2735                 /* it wasn't indexed. Did we have an earlier error? If we did then
2736                    its gone now */
2737                 talloc_free(dn_key);
2738                 return LDB_SUCCESS;
2739         }
2740
2741         if (ret != LDB_SUCCESS) {
2742                 talloc_free(dn_key);
2743                 return ret;
2744         }
2745
2746         /*
2747          * Find one of the values matching this message to remove
2748          */
2749         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2750         if (i == -1) {
2751                 /* nothing to delete */
2752                 talloc_free(dn_key);
2753                 return LDB_SUCCESS;
2754         }
2755
2756         j = (unsigned int) i;
2757         if (j != list->count - 1) {
2758                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2759         }
2760         list->count--;
2761         if (list->count == 0) {
2762                 talloc_free(list->dn);
2763                 list->dn = NULL;
2764         } else {
2765                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2766         }
2767
2768         ret = ldb_kv_dn_list_store(module, dn_key, list);
2769
2770         talloc_free(dn_key);
2771
2772         return ret;
2773 }
2774
2775 /*
2776   delete the index entries for a element
2777   return -1 on failure
2778 */
2779 int ldb_kv_index_del_element(struct ldb_module *module,
2780                              struct ldb_kv_private *ldb_kv,
2781                              const struct ldb_message *msg,
2782                              struct ldb_message_element *el)
2783 {
2784         const char *dn_str;
2785         int ret;
2786         unsigned int i;
2787
2788         if (!ldb_kv->cache->attribute_indexes) {
2789                 /* no indexed fields */
2790                 return LDB_SUCCESS;
2791         }
2792
2793         dn_str = ldb_dn_get_linearized(msg->dn);
2794         if (dn_str == NULL) {
2795                 return LDB_ERR_OPERATIONS_ERROR;
2796         }
2797
2798         if (dn_str[0] == '@') {
2799                 return LDB_SUCCESS;
2800         }
2801
2802         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2803                 return LDB_SUCCESS;
2804         }
2805         for (i = 0; i < el->num_values; i++) {
2806                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2807                 if (ret != LDB_SUCCESS) {
2808                         return ret;
2809                 }
2810         }
2811
2812         return LDB_SUCCESS;
2813 }
2814
2815 /*
2816   delete the index entries for a record
2817   return -1 on failure
2818 */
2819 int ldb_kv_index_delete(struct ldb_module *module,
2820                         const struct ldb_message *msg)
2821 {
2822         struct ldb_kv_private *ldb_kv = talloc_get_type(
2823             ldb_module_get_private(module), struct ldb_kv_private);
2824         int ret;
2825         unsigned int i;
2826
2827         if (ldb_dn_is_special(msg->dn)) {
2828                 return LDB_SUCCESS;
2829         }
2830
2831         ret = ldb_kv_index_onelevel(module, msg, 0);
2832         if (ret != LDB_SUCCESS) {
2833                 return ret;
2834         }
2835
2836         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2837         if (ret != LDB_SUCCESS) {
2838                 return ret;
2839         }
2840
2841         if (!ldb_kv->cache->attribute_indexes) {
2842                 /* no indexed fields */
2843                 return LDB_SUCCESS;
2844         }
2845
2846         for (i = 0; i < msg->num_elements; i++) {
2847                 ret = ldb_kv_index_del_element(
2848                     module, ldb_kv, msg, &msg->elements[i]);
2849                 if (ret != LDB_SUCCESS) {
2850                         return ret;
2851                 }
2852         }
2853
2854         return LDB_SUCCESS;
2855 }
2856
2857
2858 /*
2859   traversal function that deletes all @INDEX records in the in-memory
2860   TDB.
2861
2862   This does not touch the actual DB, that is done at transaction
2863   commit, which in turn greatly reduces DB churn as we will likely
2864   be able to do a direct update into the old record.
2865 */
2866 static int delete_index(struct ldb_kv_private *ldb_kv,
2867                         struct ldb_val key,
2868                         struct ldb_val data,
2869                         void *state)
2870 {
2871         struct ldb_module *module = state;
2872         const char *dnstr = "DN=" LDB_KV_INDEX ":";
2873         struct dn_list list;
2874         struct ldb_dn *dn;
2875         struct ldb_val v;
2876         int ret;
2877
2878         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2879                 return 0;
2880         }
2881         /* we need to put a empty list in the internal tdb for this
2882          * index entry */
2883         list.dn = NULL;
2884         list.count = 0;
2885
2886         /* the offset of 3 is to remove the DN= prefix. */
2887         v.data = key.data + 3;
2888         v.length = strnlen((char *)key.data, key.length) - 3;
2889
2890         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2891
2892         /*
2893          * This does not actually touch the DB quite yet, just
2894          * the in-memory index cache
2895          */
2896         ret = ldb_kv_dn_list_store(module, dn, &list);
2897         if (ret != LDB_SUCCESS) {
2898                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2899                                        "Unable to store null index for %s\n",
2900                                                 ldb_dn_get_linearized(dn));
2901                 talloc_free(dn);
2902                 return -1;
2903         }
2904         talloc_free(dn);
2905         return 0;
2906 }
2907
2908 /*
2909   traversal function that adds @INDEX records during a re index TODO wrong comment
2910 */
2911 static int re_key(struct ldb_kv_private *ldb_kv,
2912                   struct ldb_val key,
2913                   struct ldb_val val,
2914                   void *state)
2915 {
2916         struct ldb_context *ldb;
2917         struct ldb_kv_reindex_context *ctx =
2918             (struct ldb_kv_reindex_context *)state;
2919         struct ldb_module *module = ctx->module;
2920         struct ldb_message *msg;
2921         unsigned int nb_elements_in_db;
2922         int ret;
2923         struct ldb_val key2;
2924         bool is_record;
2925
2926         ldb = ldb_module_get_ctx(module);
2927
2928         if (key.length > 4 &&
2929             memcmp(key.data, "DN=@", 4) == 0) {
2930                 return 0;
2931         }
2932
2933         is_record = ldb_kv_key_is_record(key);
2934         if (is_record == false) {
2935                 return 0;
2936         }
2937
2938         msg = ldb_msg_new(module);
2939         if (msg == NULL) {
2940                 return -1;
2941         }
2942
2943         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2944                                                    msg,
2945                                                    NULL, 0,
2946                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2947                                                    &nb_elements_in_db);
2948         if (ret != 0) {
2949                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2950                                                 ldb_dn_get_linearized(msg->dn));
2951                 ctx->error = ret;
2952                 talloc_free(msg);
2953                 return -1;
2954         }
2955
2956         if (msg->dn == NULL) {
2957                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2958                           "Refusing to re-index as GUID "
2959                           "key %*.*s with no DN\n",
2960                           (int)key.length, (int)key.length,
2961                           (char *)key.data);
2962                 talloc_free(msg);
2963                 return -1;
2964         }
2965
2966         /* check if the DN key has changed, perhaps due to the case
2967            insensitivity of an element changing, or a change from DN
2968            to GUID keys */
2969         key2 = ldb_kv_key_msg(module, msg, msg);
2970         if (key2.data == NULL) {
2971                 /* probably a corrupt record ... darn */
2972                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2973                                                 ldb_dn_get_linearized(msg->dn));
2974                 talloc_free(msg);
2975                 return 0;
2976         }
2977         if (key.length != key2.length ||
2978             (memcmp(key.data, key2.data, key.length) != 0)) {
2979                 ldb_kv->kv_ops->update_in_iterate(
2980                     ldb_kv, key, key2, val, ctx);
2981         }
2982         talloc_free(key2.data);
2983
2984         talloc_free(msg);
2985
2986         ctx->count++;
2987         if (ctx->count % 10000 == 0) {
2988                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2989                           "Reindexing: re-keyed %u records so far",
2990                           ctx->count);
2991         }
2992
2993         return 0;
2994 }
2995
2996 /*
2997   traversal function that adds @INDEX records during a re index
2998 */
2999 static int re_index(struct ldb_kv_private *ldb_kv,
3000                     struct ldb_val key,
3001                     struct ldb_val val,
3002                     void *state)
3003 {
3004         struct ldb_context *ldb;
3005         struct ldb_kv_reindex_context *ctx =
3006             (struct ldb_kv_reindex_context *)state;
3007         struct ldb_module *module = ctx->module;
3008         struct ldb_message *msg;
3009         unsigned int nb_elements_in_db;
3010         int ret;
3011         bool is_record;
3012
3013         ldb = ldb_module_get_ctx(module);
3014
3015         if (key.length > 4 &&
3016             memcmp(key.data, "DN=@", 4) == 0) {
3017                 return 0;
3018         }
3019
3020         is_record = ldb_kv_key_is_record(key);
3021         if (is_record == false) {
3022                 return 0;
3023         }
3024
3025         msg = ldb_msg_new(module);
3026         if (msg == NULL) {
3027                 return -1;
3028         }
3029
3030         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
3031                                                    msg,
3032                                                    NULL, 0,
3033                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
3034                                                    &nb_elements_in_db);
3035         if (ret != 0) {
3036                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3037                                                 ldb_dn_get_linearized(msg->dn));
3038                 ctx->error = ret;
3039                 talloc_free(msg);
3040                 return -1;
3041         }
3042
3043         if (msg->dn == NULL) {
3044                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3045                           "Refusing to re-index as GUID "
3046                           "key %*.*s with no DN\n",
3047                           (int)key.length, (int)key.length,
3048                           (char *)key.data);
3049                 talloc_free(msg);
3050                 return -1;
3051         }
3052
3053         ret = ldb_kv_index_onelevel(module, msg, 1);
3054         if (ret != LDB_SUCCESS) {
3055                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3056                           "Adding special ONE LEVEL index failed (%s)!",
3057                                                 ldb_dn_get_linearized(msg->dn));
3058                 talloc_free(msg);
3059                 return -1;
3060         }
3061
3062         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3063
3064         if (ret != LDB_SUCCESS) {
3065                 ctx->error = ret;
3066                 talloc_free(msg);
3067                 return -1;
3068         }
3069
3070         talloc_free(msg);
3071
3072         ctx->count++;
3073         if (ctx->count % 10000 == 0) {
3074                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3075                           "Reindexing: re-indexed %u records so far",
3076                           ctx->count);
3077         }
3078
3079         return 0;
3080 }
3081
3082 /*
3083   force a complete reindex of the database
3084 */
3085 int ldb_kv_reindex(struct ldb_module *module)
3086 {
3087         struct ldb_kv_private *ldb_kv = talloc_get_type(
3088             ldb_module_get_private(module), struct ldb_kv_private);
3089         int ret;
3090         struct ldb_kv_reindex_context ctx;
3091
3092         /*
3093          * Only triggered after a modification, but make clear we do
3094          * not re-index a read-only DB
3095          */
3096         if (ldb_kv->read_only) {
3097                 return LDB_ERR_UNWILLING_TO_PERFORM;
3098         }
3099
3100         if (ldb_kv_cache_reload(module) != 0) {
3101                 return LDB_ERR_OPERATIONS_ERROR;
3102         }
3103
3104         /*
3105          * Ensure we read (and so remove) the entries from the real
3106          * DB, no values stored so far are any use as we want to do a
3107          * re-index
3108          */
3109         ldb_kv_index_transaction_cancel(module);
3110
3111         ret = ldb_kv_index_transaction_start(module);
3112         if (ret != LDB_SUCCESS) {
3113                 return ret;
3114         }
3115
3116         /* first traverse the database deleting any @INDEX records by
3117          * putting NULL entries in the in-memory tdb
3118          */
3119         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3120         if (ret < 0) {
3121                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3122                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3123                                        ldb_errstring(ldb));
3124                 return LDB_ERR_OPERATIONS_ERROR;
3125         }
3126
3127         ctx.module = module;
3128         ctx.error = 0;
3129         ctx.count = 0;
3130
3131         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3132         if (ret < 0) {
3133                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3134                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3135                                        ldb_errstring(ldb));
3136                 return LDB_ERR_OPERATIONS_ERROR;
3137         }
3138
3139         if (ctx.error != LDB_SUCCESS) {
3140                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3141                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3142                 return ctx.error;
3143         }
3144
3145         ctx.error = 0;
3146         ctx.count = 0;
3147
3148         /* now traverse adding any indexes for normal LDB records */
3149         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3150         if (ret < 0) {
3151                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3152                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3153                                        ldb_errstring(ldb));
3154                 return LDB_ERR_OPERATIONS_ERROR;
3155         }
3156
3157         if (ctx.error != LDB_SUCCESS) {
3158                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3159                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3160                 return ctx.error;
3161         }
3162
3163         if (ctx.count > 10000) {
3164                 ldb_debug(ldb_module_get_ctx(module),
3165                           LDB_DEBUG_WARNING,
3166                           "Reindexing: re_index successful on %s, "
3167                           "final index write-out will be in transaction commit",
3168                           ldb_kv->kv_ops->name(ldb_kv));
3169         }
3170         return LDB_SUCCESS;
3171 }