lib ldb key value: convert TDB_DATA structs to ldb_val
[amitay/samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(struct ldb_module *module)
203 {
204         struct ldb_kv_private *ldb_kv = talloc_get_type(
205             ldb_module_get_private(module), struct ldb_kv_private);
206         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
207         if (ldb_kv->idxptr == NULL) {
208                 return ldb_oom(ldb_module_get_ctx(module));
209         }
210
211         return LDB_SUCCESS;
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
219                                          const struct ldb_val *v2)
220 {
221         if (v1->length > v2->length) {
222                 return -1;
223         }
224         if (v1->length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1->data, v2->data, v1->length);
228 }
229
230 /*
231   see if two ldb_val structures contain exactly the same data
232   return -1 or 1 for a mismatch, 0 for match
233 */
234 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
235                                        const struct ldb_val *v2)
236 {
237         if (v1.length > v2->length) {
238                 return -1;
239         }
240         if (v1.length < v2->length) {
241                 return 1;
242         }
243         return memcmp(v1.data, v2->data, v1.length);
244 }
245
246
247 /*
248   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
249   binary-safe comparison for the 'dn' returns -1 if not found
250
251   This is therefore safe when the value is a GUID in the future
252  */
253 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
254                                    const struct dn_list *list,
255                                    const struct ldb_val *v)
256 {
257         unsigned int i;
258         struct ldb_val *exact = NULL, *next = NULL;
259
260         if (ldb_kv->cache->GUID_index_attribute == NULL) {
261                 for (i=0; i<list->count; i++) {
262                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
263                                 return i;
264                         }
265                 }
266                 return -1;
267         }
268
269         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
270                                 *v, ldb_val_equal_exact_ordered,
271                                 exact, next);
272         if (exact == NULL) {
273                 return -1;
274         }
275         /* Not required, but keeps the compiler quiet */
276         if (next != NULL) {
277                 return -1;
278         }
279
280         i = exact - list->dn;
281         return i;
282 }
283
284 /*
285   find a entry in a dn_list. Uses a case sensitive comparison with the dn
286   returns -1 if not found
287  */
288 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
289                                    struct dn_list *list,
290                                    const struct ldb_message *msg)
291 {
292         struct ldb_val v;
293         const struct ldb_val *key_val;
294         if (ldb_kv->cache->GUID_index_attribute == NULL) {
295                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
296                 v.data = discard_const_p(unsigned char, dn_str);
297                 v.length = strlen(dn_str);
298         } else {
299                 key_val = ldb_msg_find_ldb_val(
300                     msg, ldb_kv->cache->GUID_index_attribute);
301                 if (key_val == NULL) {
302                         return -1;
303                 }
304                 v = *key_val;
305         }
306         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
307 }
308
309 /*
310   this is effectively a cast function, but with lots of paranoia
311   checks and also copes with CPUs that are fussy about pointer
312   alignment
313  */
314 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
315                                            TDB_DATA rec,
316                                            bool check_parent)
317 {
318         struct dn_list *list;
319         if (rec.dsize != sizeof(void *)) {
320                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
321                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
322                 return NULL;
323         }
324         /* note that we can't just use a cast here, as rec.dptr may
325            not be aligned sufficiently for a pointer. A cast would cause
326            platforms like some ARM CPUs to crash */
327         memcpy(&list, rec.dptr, sizeof(void *));
328         list = talloc_get_type(list, struct dn_list);
329         if (list == NULL) {
330                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
331                                        "Bad type '%s' for idxptr",
332                                        talloc_get_name(list));
333                 return NULL;
334         }
335         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
336                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
337                                        "Bad parent '%s' for idxptr",
338                                        talloc_get_name(talloc_parent(list->dn)));
339                 return NULL;
340         }
341         return list;
342 }
343
344 /*
345   return the @IDX list in an index entry for a dn as a
346   struct dn_list
347  */
348 static int ldb_kv_dn_list_load(struct ldb_module *module,
349                                struct ldb_kv_private *ldb_kv,
350                                struct ldb_dn *dn,
351                                struct dn_list *list)
352 {
353         struct ldb_message *msg;
354         int ret, version;
355         struct ldb_message_element *el;
356         TDB_DATA rec;
357         struct dn_list *list2;
358         TDB_DATA key;
359
360         list->dn = NULL;
361         list->count = 0;
362
363         /* see if we have any in-memory index entries */
364         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
365                 goto normal_index;
366         }
367
368         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
369         key.dsize = strlen((char *)key.dptr);
370
371         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
372         if (rec.dptr == NULL) {
373                 goto normal_index;
374         }
375
376         /* we've found an in-memory index entry */
377         list2 = ldb_kv_index_idxptr(module, rec, true);
378         if (list2 == NULL) {
379                 free(rec.dptr);
380                 return LDB_ERR_OPERATIONS_ERROR;
381         }
382         free(rec.dptr);
383
384         *list = *list2;
385         return LDB_SUCCESS;
386
387 normal_index:
388         msg = ldb_msg_new(list);
389         if (msg == NULL) {
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392
393         ret = ldb_kv_search_dn1(module,
394                                 dn,
395                                 msg,
396                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
397                                     LDB_UNPACK_DATA_FLAG_NO_DN);
398         if (ret != LDB_SUCCESS) {
399                 talloc_free(msg);
400                 return ret;
401         }
402
403         el = ldb_msg_find_element(msg, LDB_KV_IDX);
404         if (!el) {
405                 talloc_free(msg);
406                 return LDB_SUCCESS;
407         }
408
409         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
410
411         /*
412          * we avoid copying the strings by stealing the list.  We have
413          * to steal msg onto el->values (which looks odd) because we
414          * asked for the memory to be allocated on msg, not on each
415          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
416          */
417         if (ldb_kv->cache->GUID_index_attribute == NULL) {
418                 /* check indexing version number */
419                 if (version != LDB_KV_INDEXING_VERSION) {
420                         ldb_debug_set(ldb_module_get_ctx(module),
421                                       LDB_DEBUG_ERROR,
422                                       "Wrong DN index version %d "
423                                       "expected %d for %s",
424                                       version, LDB_KV_INDEXING_VERSION,
425                                       ldb_dn_get_linearized(dn));
426                         talloc_free(msg);
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 talloc_steal(el->values, msg);
431                 list->dn = talloc_steal(list, el->values);
432                 list->count = el->num_values;
433         } else {
434                 unsigned int i;
435                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
436                         /* This is quite likely during the DB startup
437                            on first upgrade to using a GUID index */
438                         ldb_debug_set(ldb_module_get_ctx(module),
439                                       LDB_DEBUG_ERROR,
440                                       "Wrong GUID index version %d "
441                                       "expected %d for %s",
442                                       version, LDB_KV_GUID_INDEXING_VERSION,
443                                       ldb_dn_get_linearized(dn));
444                         talloc_free(msg);
445                         return LDB_ERR_OPERATIONS_ERROR;
446                 }
447
448                 if (el->num_values == 0) {
449                         talloc_free(msg);
450                         return LDB_ERR_OPERATIONS_ERROR;
451                 }
452
453                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
459                 list->dn = talloc_array(list, struct ldb_val, list->count);
460                 if (list->dn == NULL) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 /*
466                  * The actual data is on msg, due to
467                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
468                  */
469                 talloc_steal(list->dn, msg);
470                 for (i = 0; i < list->count; i++) {
471                         list->dn[i].data
472                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
473                         list->dn[i].length = LDB_KV_GUID_SIZE;
474                 }
475         }
476
477         /* We don't need msg->elements any more */
478         talloc_free(msg->elements);
479         return LDB_SUCCESS;
480 }
481
482 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
483                            struct ldb_kv_private *ldb_kv,
484                            TALLOC_CTX *mem_ctx,
485                            struct ldb_dn *dn,
486                            struct ldb_val *ldb_key)
487 {
488         struct ldb_context *ldb = ldb_module_get_ctx(module);
489         int ret;
490         int index = 0;
491         enum key_truncation truncation = KEY_NOT_TRUNCATED;
492         struct dn_list *list = talloc(mem_ctx, struct dn_list);
493         if (list == NULL) {
494                 ldb_oom(ldb);
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
499         if (ret != LDB_SUCCESS) {
500                 TALLOC_FREE(list);
501                 return ret;
502         }
503
504         if (list->count == 0) {
505                 TALLOC_FREE(list);
506                 return LDB_ERR_NO_SUCH_OBJECT;
507         }
508
509         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
510                 const char *dn_str = ldb_dn_get_linearized(dn);
511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
512                                        __location__
513                                        ": Failed to read DN index "
514                                        "against %s for %s: too many "
515                                        "values (%u > 1)",
516                                        ldb_kv->cache->GUID_index_attribute,
517                                        dn_str,
518                                        list->count);
519                 TALLOC_FREE(list);
520                 return LDB_ERR_CONSTRAINT_VIOLATION;
521         }
522
523         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
524                 /*
525                  * DN key has been truncated, need to inspect the actual
526                  * records to locate the actual DN
527                  */
528                 int i;
529                 index = -1;
530                 for (i=0; i < list->count; i++) {
531                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
532                         struct ldb_val key = {
533                                 .data = guid_key,
534                                 .length = sizeof(guid_key)
535                         };
536                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
537                         struct ldb_message *rec = ldb_msg_new(ldb);
538                         if (rec == NULL) {
539                                 TALLOC_FREE(list);
540                                 return LDB_ERR_OPERATIONS_ERROR;
541                         }
542
543                         ret = ldb_kv_idx_to_key(
544                             module, ldb_kv, ldb, &list->dn[i], &key);
545                         if (ret != LDB_SUCCESS) {
546                                 TALLOC_FREE(list);
547                                 TALLOC_FREE(rec);
548                                 return ret;
549                         }
550
551                         ret =
552                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
553                         if (key.data != guid_key) {
554                                 TALLOC_FREE(key.data);
555                         }
556                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
557                                 /*
558                                  * the record has disappeared?
559                                  * yes, this can happen
560                                  */
561                                 TALLOC_FREE(rec);
562                                 continue;
563                         }
564
565                         if (ret != LDB_SUCCESS) {
566                                 /* an internal error */
567                                 TALLOC_FREE(rec);
568                                 TALLOC_FREE(list);
569                                 return LDB_ERR_OPERATIONS_ERROR;
570                         }
571
572                         /*
573                          * We found the actual DN that we wanted from in the
574                          * multiple values that matched the index
575                          * (due to truncation), so return that.
576                          *
577                          */
578                         if (ldb_dn_compare(dn, rec->dn) == 0) {
579                                 index = i;
580                                 TALLOC_FREE(rec);
581                                 break;
582                         }
583                 }
584
585                 /*
586                  * We matched the index but the actual DN we wanted
587                  * was not here.
588                  */
589                 if (index == -1) {
590                         TALLOC_FREE(list);
591                         return LDB_ERR_NO_SUCH_OBJECT;
592                 }
593         }
594
595         /* The ldb_key memory is allocated by the caller */
596         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
597         TALLOC_FREE(list);
598
599         if (ret != LDB_SUCCESS) {
600                 return LDB_ERR_OPERATIONS_ERROR;
601         }
602
603         return LDB_SUCCESS;
604 }
605
606
607
608 /*
609   save a dn_list into a full @IDX style record
610  */
611 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
612                                      struct ldb_kv_private *ldb_kv,
613                                      struct ldb_dn *dn,
614                                      struct dn_list *list)
615 {
616         struct ldb_message *msg;
617         int ret;
618
619         msg = ldb_msg_new(module);
620         if (!msg) {
621                 return ldb_module_oom(module);
622         }
623
624         msg->dn = dn;
625
626         if (list->count == 0) {
627                 ret = ldb_kv_delete_noindex(module, msg);
628                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
629                         ret = LDB_SUCCESS;
630                 }
631                 talloc_free(msg);
632                 return ret;
633         }
634
635         if (ldb_kv->cache->GUID_index_attribute == NULL) {
636                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
637                                       LDB_KV_INDEXING_VERSION);
638                 if (ret != LDB_SUCCESS) {
639                         talloc_free(msg);
640                         return ldb_module_oom(module);
641                 }
642         } else {
643                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
644                                       LDB_KV_GUID_INDEXING_VERSION);
645                 if (ret != LDB_SUCCESS) {
646                         talloc_free(msg);
647                         return ldb_module_oom(module);
648                 }
649         }
650
651         if (list->count > 0) {
652                 struct ldb_message_element *el;
653
654                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
655                 if (ret != LDB_SUCCESS) {
656                         talloc_free(msg);
657                         return ldb_module_oom(module);
658                 }
659
660                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
661                         el->values = list->dn;
662                         el->num_values = list->count;
663                 } else {
664                         struct ldb_val v;
665                         unsigned int i;
666                         el->values = talloc_array(msg,
667                                                   struct ldb_val, 1);
668                         if (el->values == NULL) {
669                                 talloc_free(msg);
670                                 return ldb_module_oom(module);
671                         }
672
673                         v.data = talloc_array_size(el->values,
674                                                    list->count,
675                                                    LDB_KV_GUID_SIZE);
676                         if (v.data == NULL) {
677                                 talloc_free(msg);
678                                 return ldb_module_oom(module);
679                         }
680
681                         v.length = talloc_get_size(v.data);
682
683                         for (i = 0; i < list->count; i++) {
684                                 if (list->dn[i].length !=
685                                     LDB_KV_GUID_SIZE) {
686                                         talloc_free(msg);
687                                         return ldb_module_operr(module);
688                                 }
689                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
690                                        list->dn[i].data,
691                                        LDB_KV_GUID_SIZE);
692                         }
693                         el->values[0] = v;
694                         el->num_values = 1;
695                 }
696         }
697
698         ret = ldb_kv_store(module, msg, TDB_REPLACE);
699         talloc_free(msg);
700         return ret;
701 }
702
703 /*
704   save a dn_list into the database, in either @IDX or internal format
705  */
706 static int ldb_kv_dn_list_store(struct ldb_module *module,
707                                 struct ldb_dn *dn,
708                                 struct dn_list *list)
709 {
710         struct ldb_kv_private *ldb_kv = talloc_get_type(
711             ldb_module_get_private(module), struct ldb_kv_private);
712         TDB_DATA rec, key;
713         int ret;
714         struct dn_list *list2;
715
716         if (ldb_kv->idxptr == NULL) {
717                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
718         }
719
720         if (ldb_kv->idxptr->itdb == NULL) {
721                 ldb_kv->idxptr->itdb =
722                     tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
723                 if (ldb_kv->idxptr->itdb == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
729         if (key.dptr == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         key.dsize = strlen((char *)key.dptr);
733
734         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
735         if (rec.dptr != NULL) {
736                 list2 = ldb_kv_index_idxptr(module, rec, false);
737                 if (list2 == NULL) {
738                         free(rec.dptr);
739                         return LDB_ERR_OPERATIONS_ERROR;
740                 }
741                 free(rec.dptr);
742                 list2->dn = talloc_steal(list2, list->dn);
743                 list2->count = list->count;
744                 return LDB_SUCCESS;
745         }
746
747         list2 = talloc(ldb_kv->idxptr, struct dn_list);
748         if (list2 == NULL) {
749                 return LDB_ERR_OPERATIONS_ERROR;
750         }
751         list2->dn = talloc_steal(list2, list->dn);
752         list2->count = list->count;
753
754         rec.dptr = (uint8_t *)&list2;
755         rec.dsize = sizeof(void *);
756
757
758         /*
759          * This is not a store into the main DB, but into an in-memory
760          * TDB, so we don't need a guard on ltdb->read_only
761          */
762         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
763         if (ret != 0) {
764                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
765         }
766         return LDB_SUCCESS;
767 }
768
769 /*
770   traverse function for storing the in-memory index entries on disk
771  */
772 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
773                                        TDB_DATA key,
774                                        TDB_DATA data,
775                                        void *state)
776 {
777         struct ldb_module *module = state;
778         struct ldb_kv_private *ldb_kv = talloc_get_type(
779             ldb_module_get_private(module), struct ldb_kv_private);
780         struct ldb_dn *dn;
781         struct ldb_context *ldb = ldb_module_get_ctx(module);
782         struct ldb_val v;
783         struct dn_list *list;
784
785         list = ldb_kv_index_idxptr(module, data, true);
786         if (list == NULL) {
787                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
788                 return -1;
789         }
790
791         v.data = key.dptr;
792         v.length = strnlen((char *)key.dptr, key.dsize);
793
794         dn = ldb_dn_from_ldb_val(module, ldb, &v);
795         if (dn == NULL) {
796                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
797                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
798                 return -1;
799         }
800
801         ldb_kv->idxptr->error =
802             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
803         talloc_free(dn);
804         if (ldb_kv->idxptr->error != 0) {
805                 return -1;
806         }
807         return 0;
808 }
809
810 /* cleanup the idxptr mode when transaction commits */
811 int ldb_kv_index_transaction_commit(struct ldb_module *module)
812 {
813         struct ldb_kv_private *ldb_kv = talloc_get_type(
814             ldb_module_get_private(module), struct ldb_kv_private);
815         int ret;
816
817         struct ldb_context *ldb = ldb_module_get_ctx(module);
818
819         ldb_reset_err_string(ldb);
820
821         if (ldb_kv->idxptr->itdb) {
822                 tdb_traverse(
823                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
824                 tdb_close(ldb_kv->idxptr->itdb);
825         }
826
827         ret = ldb_kv->idxptr->error;
828         if (ret != LDB_SUCCESS) {
829                 if (!ldb_errstring(ldb)) {
830                         ldb_set_errstring(ldb, ldb_strerror(ret));
831                 }
832                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
833         }
834
835         talloc_free(ldb_kv->idxptr);
836         ldb_kv->idxptr = NULL;
837         return ret;
838 }
839
840 /* cleanup the idxptr mode when transaction cancels */
841 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
842 {
843         struct ldb_kv_private *ldb_kv = talloc_get_type(
844             ldb_module_get_private(module), struct ldb_kv_private);
845         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
846                 tdb_close(ldb_kv->idxptr->itdb);
847         }
848         talloc_free(ldb_kv->idxptr);
849         ldb_kv->idxptr = NULL;
850         return LDB_SUCCESS;
851 }
852
853
854 /*
855   return the dn key to be used for an index
856   the caller is responsible for freeing
857 */
858 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
859                                        struct ldb_kv_private *ldb_kv,
860                                        const char *attr,
861                                        const struct ldb_val *value,
862                                        const struct ldb_schema_attribute **ap,
863                                        enum key_truncation *truncation)
864 {
865         struct ldb_dn *ret;
866         struct ldb_val v;
867         const struct ldb_schema_attribute *a = NULL;
868         char *attr_folded = NULL;
869         const char *attr_for_dn = NULL;
870         int r;
871         bool should_b64_encode;
872
873         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
874         size_t key_len = 0;
875         size_t attr_len = 0;
876         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
877         unsigned frmt_len = 0;
878         const size_t additional_key_length = 4;
879         unsigned int num_separators = 3; /* Estimate for overflow check */
880         const size_t min_data = 1;
881         const size_t min_key_length = additional_key_length
882                 + indx_len + num_separators + min_data;
883
884         if (attr[0] == '@') {
885                 attr_for_dn = attr;
886                 v = *value;
887                 if (ap != NULL) {
888                         *ap = NULL;
889                 }
890         } else {
891                 attr_folded = ldb_attr_casefold(ldb, attr);
892                 if (!attr_folded) {
893                         return NULL;
894                 }
895
896                 attr_for_dn = attr_folded;
897
898                 a = ldb_schema_attribute_by_name(ldb, attr);
899                 if (ap) {
900                         *ap = a;
901                 }
902                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
903                 if (r != LDB_SUCCESS) {
904                         const char *errstr = ldb_errstring(ldb);
905                         /* canonicalisation can be refused. For
906                            example, a attribute that takes wildcards
907                            will refuse to canonicalise if the value
908                            contains a wildcard */
909                         ldb_asprintf_errstring(ldb,
910                                                "Failed to create index "
911                                                "key for attribute '%s':%s%s%s",
912                                                attr, ldb_strerror(r),
913                                                (errstr?":":""),
914                                                (errstr?errstr:""));
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918         }
919         attr_len = strlen(attr_for_dn);
920
921         /*
922          * Check if there is any hope this will fit into the DB.
923          * Overflow here is not actually critical the code below
924          * checks again to make the printf and the DB does another
925          * check for too long keys
926          */
927         if (max_key_length - attr_len < min_key_length) {
928                 ldb_asprintf_errstring(
929                         ldb,
930                         __location__ ": max_key_length "
931                         "is too small (%u) < (%u)",
932                         max_key_length,
933                         (unsigned)(min_key_length + attr_len));
934                 talloc_free(attr_folded);
935                 return NULL;
936         }
937
938         /*
939          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
940          * "DN=" and a trailing string terminator
941          */
942         max_key_length -= additional_key_length;
943
944         /*
945          * We do not base 64 encode a DN in a key, it has already been
946          * casefold and lineraized, that is good enough.  That already
947          * avoids embedded NUL etc.
948          */
949         if (ldb_kv->cache->GUID_index_attribute != NULL) {
950                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
951                         should_b64_encode = false;
952                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
953                         /*
954                          * We can only change the behaviour for IDXONE
955                          * when the GUID index is enabled
956                          */
957                         should_b64_encode = false;
958                 } else {
959                         should_b64_encode
960                                 = ldb_should_b64_encode(ldb, &v);
961                 }
962         } else {
963                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
964         }
965
966         if (should_b64_encode) {
967                 size_t vstr_len = 0;
968                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
969                 if (!vstr) {
970                         talloc_free(attr_folded);
971                         return NULL;
972                 }
973                 vstr_len = strlen(vstr);
974                 /*
975                  * Overflow here is not critical as we only use this
976                  * to choose the printf truncation
977                  */
978                 key_len = num_separators + indx_len + attr_len + vstr_len;
979                 if (key_len > max_key_length) {
980                         size_t excess = key_len - max_key_length;
981                         frmt_len = vstr_len - excess;
982                         *truncation = KEY_TRUNCATED;
983                         /*
984                         * Truncated keys are placed in a separate key space
985                         * from the non truncated keys
986                         * Note: the double hash "##" is not a typo and
987                         * indicates that the following value is base64 encoded
988                         */
989                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
990                                              LDB_KV_INDEX, attr_for_dn,
991                                              frmt_len, vstr);
992                 } else {
993                         frmt_len = vstr_len;
994                         *truncation = KEY_NOT_TRUNCATED;
995                         /*
996                          * Note: the double colon "::" is not a typo and
997                          * indicates that the following value is base64 encoded
998                          */
999                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1000                                              LDB_KV_INDEX, attr_for_dn,
1001                                              frmt_len, vstr);
1002                 }
1003                 talloc_free(vstr);
1004         } else {
1005                 /* Only need two seperators */
1006                 num_separators = 2;
1007
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = v.length - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                          * Truncated keys are placed in a separate key space
1019                          * from the non truncated keys
1020                          */
1021                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1022                                              LDB_KV_INDEX, attr_for_dn,
1023                                              frmt_len, (char *)v.data);
1024                 } else {
1025                         frmt_len = v.length;
1026                         *truncation = KEY_NOT_TRUNCATED;
1027                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1028                                              LDB_KV_INDEX, attr_for_dn,
1029                                              frmt_len, (char *)v.data);
1030                 }
1031         }
1032
1033         if (v.data != value->data) {
1034                 talloc_free(v.data);
1035         }
1036         talloc_free(attr_folded);
1037
1038         return ret;
1039 }
1040
1041 /*
1042   see if a attribute value is in the list of indexed attributes
1043 */
1044 static bool ldb_kv_is_indexed(struct ldb_module *module,
1045                               struct ldb_kv_private *ldb_kv,
1046                               const char *attr)
1047 {
1048         struct ldb_context *ldb = ldb_module_get_ctx(module);
1049         unsigned int i;
1050         struct ldb_message_element *el;
1051
1052         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1053             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1054                 /* Implicity covered, this is the index key */
1055                 return false;
1056         }
1057         if (ldb->schema.index_handler_override) {
1058                 const struct ldb_schema_attribute *a
1059                         = ldb_schema_attribute_by_name(ldb, attr);
1060
1061                 if (a == NULL) {
1062                         return false;
1063                 }
1064
1065                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1066                         return true;
1067                 } else {
1068                         return false;
1069                 }
1070         }
1071
1072         if (!ldb_kv->cache->attribute_indexes) {
1073                 return false;
1074         }
1075
1076         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1077         if (el == NULL) {
1078                 return false;
1079         }
1080
1081         /* TODO: this is too expensive! At least use a binary search */
1082         for (i=0; i<el->num_values; i++) {
1083                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1084                         return true;
1085                 }
1086         }
1087         return false;
1088 }
1089
1090 /*
1091   in the following logic functions, the return value is treated as
1092   follows:
1093
1094      LDB_SUCCESS: we found some matching index values
1095
1096      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1097
1098      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1099                                we'll need a full search
1100  */
1101
1102 /*
1103   return a list of dn's that might match a simple indexed search (an
1104   equality search only)
1105  */
1106 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1107                                   struct ldb_kv_private *ldb_kv,
1108                                   const struct ldb_parse_tree *tree,
1109                                   struct dn_list *list)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_dn *dn;
1113         int ret;
1114         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1115
1116         ldb = ldb_module_get_ctx(module);
1117
1118         list->count = 0;
1119         list->dn = NULL;
1120
1121         /* if the attribute isn't in the list of indexed attributes then
1122            this node needs a full search */
1123         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* the attribute is indexed. Pull the list of DNs that match the
1128            search criterion */
1129         dn = ldb_kv_index_key(ldb,
1130                               ldb_kv,
1131                               tree->u.equality.attr,
1132                               &tree->u.equality.value,
1133                               NULL,
1134                               &truncation);
1135         /*
1136          * We ignore truncation here and allow multi-valued matches
1137          * as ltdb_search_indexed will filter out the wrong one in
1138          * ltdb_index_filter() which calls ldb_match_message().
1139          */
1140         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1141
1142         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1143         talloc_free(dn);
1144         return ret;
1145 }
1146
1147 static bool list_union(struct ldb_context *ldb,
1148                        struct ldb_kv_private *ldb_kv,
1149                        struct dn_list *list,
1150                        struct dn_list *list2);
1151
1152 /*
1153   return a list of dn's that might match a leaf indexed search
1154  */
1155 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1156                                 struct ldb_kv_private *ldb_kv,
1157                                 const struct ldb_parse_tree *tree,
1158                                 struct dn_list *list)
1159 {
1160         if (ldb_kv->disallow_dn_filter &&
1161             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1162                 /* in AD mode we do not support "(dn=...)" search filters */
1163                 list->dn = NULL;
1164                 list->count = 0;
1165                 return LDB_SUCCESS;
1166         }
1167         if (tree->u.equality.attr[0] == '@') {
1168                 /* Do not allow a indexed search against an @ */
1169                 list->dn = NULL;
1170                 list->count = 0;
1171                 return LDB_SUCCESS;
1172         }
1173         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1174                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1175                 struct ldb_dn *dn
1176                         = ldb_dn_from_ldb_val(list,
1177                                               ldb_module_get_ctx(module),
1178                                               &tree->u.equality.value);
1179                 if (dn == NULL) {
1180                         /* If we can't parse it, no match */
1181                         list->dn = NULL;
1182                         list->count = 0;
1183                         return LDB_SUCCESS;
1184                 }
1185
1186                 /*
1187                  * Re-use the same code we use for a SCOPE_BASE
1188                  * search
1189                  *
1190                  * We can't call TALLOC_FREE(dn) as this must belong
1191                  * to list for the memory to remain valid.
1192                  */
1193                 return ldb_kv_index_dn_base_dn(
1194                     module, ldb_kv, dn, list, &truncation);
1195                 /*
1196                  * We ignore truncation here and allow multi-valued matches
1197                  * as ltdb_search_indexed will filter out the wrong one in
1198                  * ltdb_index_filter() which calls ldb_match_message().
1199                  */
1200
1201         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1202                    (ldb_attr_cmp(tree->u.equality.attr,
1203                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1204                 int ret;
1205                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1206                 list->dn = talloc_array(list, struct ldb_val, 1);
1207                 if (list->dn == NULL) {
1208                         ldb_module_oom(module);
1209                         return LDB_ERR_OPERATIONS_ERROR;
1210                 }
1211                 /*
1212                  * We need to go via the canonicalise_fn() to
1213                  * ensure we get the index in binary, rather
1214                  * than a string
1215                  */
1216                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1217                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1218                 if (ret != LDB_SUCCESS) {
1219                         return LDB_ERR_OPERATIONS_ERROR;
1220                 }
1221                 list->count = 1;
1222                 return LDB_SUCCESS;
1223         }
1224
1225         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1226 }
1227
1228
1229 /*
1230   list intersection
1231   list = list & list2
1232 */
1233 static bool list_intersect(struct ldb_context *ldb,
1234                            struct ldb_kv_private *ldb_kv,
1235                            struct dn_list *list,
1236                            const struct dn_list *list2)
1237 {
1238         const struct dn_list *short_list, *long_list;
1239         struct dn_list *list3;
1240         unsigned int i;
1241
1242         if (list->count == 0) {
1243                 /* 0 & X == 0 */
1244                 return true;
1245         }
1246         if (list2->count == 0) {
1247                 /* X & 0 == 0 */
1248                 list->count = 0;
1249                 list->dn = NULL;
1250                 return true;
1251         }
1252
1253         /* the indexing code is allowed to return a longer list than
1254            what really matches, as all results are filtered by the
1255            full expression at the end - this shortcut avoids a lot of
1256            work in some cases */
1257         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1258                 return true;
1259         }
1260         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1261                 list->count = list2->count;
1262                 list->dn = list2->dn;
1263                 /* note that list2 may not be the parent of list2->dn,
1264                    as list2->dn may be owned by ltdb->idxptr. In that
1265                    case we expect this reparent call to fail, which is
1266                    OK */
1267                 talloc_reparent(list2, list, list2->dn);
1268                 return true;
1269         }
1270
1271         if (list->count > list2->count) {
1272                 short_list = list2;
1273                 long_list = list;
1274         } else {
1275                 short_list = list;
1276                 long_list = list2;
1277         }
1278
1279         list3 = talloc_zero(list, struct dn_list);
1280         if (list3 == NULL) {
1281                 return false;
1282         }
1283
1284         list3->dn = talloc_array(list3, struct ldb_val,
1285                                  MIN(list->count, list2->count));
1286         if (!list3->dn) {
1287                 talloc_free(list3);
1288                 return false;
1289         }
1290         list3->count = 0;
1291
1292         for (i=0;i<short_list->count;i++) {
1293                 /* For the GUID index case, this is a binary search */
1294                 if (ldb_kv_dn_list_find_val(
1295                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1296                         list3->dn[list3->count] = short_list->dn[i];
1297                         list3->count++;
1298                 }
1299         }
1300
1301         list->strict |= list2->strict;
1302         list->dn = talloc_steal(list, list3->dn);
1303         list->count = list3->count;
1304         talloc_free(list3);
1305
1306         return true;
1307 }
1308
1309
1310 /*
1311   list union
1312   list = list | list2
1313 */
1314 static bool list_union(struct ldb_context *ldb,
1315                        struct ldb_kv_private *ldb_kv,
1316                        struct dn_list *list,
1317                        struct dn_list *list2)
1318 {
1319         struct ldb_val *dn3;
1320         unsigned int i = 0, j = 0, k = 0;
1321
1322         if (list2->count == 0) {
1323                 /* X | 0 == X */
1324                 return true;
1325         }
1326
1327         if (list->count == 0) {
1328                 /* 0 | X == X */
1329                 list->count = list2->count;
1330                 list->dn = list2->dn;
1331                 /* note that list2 may not be the parent of list2->dn,
1332                    as list2->dn may be owned by ltdb->idxptr. In that
1333                    case we expect this reparent call to fail, which is
1334                    OK */
1335                 talloc_reparent(list2, list, list2->dn);
1336                 return true;
1337         }
1338
1339         /*
1340          * Sort the lists (if not in GUID DN mode) so we can do
1341          * the de-duplication during the merge
1342          *
1343          * NOTE: This can sort the in-memory index values, as list or
1344          * list2 might not be a copy!
1345          */
1346         ldb_kv_dn_list_sort(ldb_kv, list);
1347         ldb_kv_dn_list_sort(ldb_kv, list2);
1348
1349         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1350         if (!dn3) {
1351                 ldb_oom(ldb);
1352                 return false;
1353         }
1354
1355         while (i < list->count || j < list2->count) {
1356                 int cmp;
1357                 if (i >= list->count) {
1358                         cmp = 1;
1359                 } else if (j >= list2->count) {
1360                         cmp = -1;
1361                 } else {
1362                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1363                                                           &list2->dn[j]);
1364                 }
1365
1366                 if (cmp < 0) {
1367                         /* Take list */
1368                         dn3[k] = list->dn[i];
1369                         i++;
1370                         k++;
1371                 } else if (cmp > 0) {
1372                         /* Take list2 */
1373                         dn3[k] = list2->dn[j];
1374                         j++;
1375                         k++;
1376                 } else {
1377                         /* Equal, take list */
1378                         dn3[k] = list->dn[i];
1379                         i++;
1380                         j++;
1381                         k++;
1382                 }
1383         }
1384
1385         list->dn = dn3;
1386         list->count = k;
1387
1388         return true;
1389 }
1390
1391 static int ldb_kv_index_dn(struct ldb_module *module,
1392                            struct ldb_kv_private *ldb_kv,
1393                            const struct ldb_parse_tree *tree,
1394                            struct dn_list *list);
1395
1396 /*
1397   process an OR list (a union)
1398  */
1399 static int ldb_kv_index_dn_or(struct ldb_module *module,
1400                               struct ldb_kv_private *ldb_kv,
1401                               const struct ldb_parse_tree *tree,
1402                               struct dn_list *list)
1403 {
1404         struct ldb_context *ldb;
1405         unsigned int i;
1406
1407         ldb = ldb_module_get_ctx(module);
1408
1409         list->dn = NULL;
1410         list->count = 0;
1411
1412         for (i=0; i<tree->u.list.num_elements; i++) {
1413                 struct dn_list *list2;
1414                 int ret;
1415
1416                 list2 = talloc_zero(list, struct dn_list);
1417                 if (list2 == NULL) {
1418                         return LDB_ERR_OPERATIONS_ERROR;
1419                 }
1420
1421                 ret = ldb_kv_index_dn(
1422                     module, ldb_kv, tree->u.list.elements[i], list2);
1423
1424                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1425                         /* X || 0 == X */
1426                         talloc_free(list2);
1427                         continue;
1428                 }
1429
1430                 if (ret != LDB_SUCCESS) {
1431                         /* X || * == * */
1432                         talloc_free(list2);
1433                         return ret;
1434                 }
1435
1436                 if (!list_union(ldb, ldb_kv, list, list2)) {
1437                         talloc_free(list2);
1438                         return LDB_ERR_OPERATIONS_ERROR;
1439                 }
1440         }
1441
1442         if (list->count == 0) {
1443                 return LDB_ERR_NO_SUCH_OBJECT;
1444         }
1445
1446         return LDB_SUCCESS;
1447 }
1448
1449
1450 /*
1451   NOT an index results
1452  */
1453 static int ldb_kv_index_dn_not(struct ldb_module *module,
1454                                struct ldb_kv_private *ldb_kv,
1455                                const struct ldb_parse_tree *tree,
1456                                struct dn_list *list)
1457 {
1458         /* the only way to do an indexed not would be if we could
1459            negate the not via another not or if we knew the total
1460            number of database elements so we could know that the
1461            existing expression covered the whole database.
1462
1463            instead, we just give up, and rely on a full index scan
1464            (unless an outer & manages to reduce the list)
1465         */
1466         return LDB_ERR_OPERATIONS_ERROR;
1467 }
1468
1469 /*
1470  * These things are unique, so avoid a full scan if this is a search
1471  * by GUID, DN or a unique attribute
1472  */
1473 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1474                                 struct ldb_kv_private *ldb_kv,
1475                                 const char *attr)
1476 {
1477         const struct ldb_schema_attribute *a;
1478         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1479                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1480                     0) {
1481                         return true;
1482                 }
1483         }
1484         if (ldb_attr_dn(attr) == 0) {
1485                 return true;
1486         }
1487
1488         a = ldb_schema_attribute_by_name(ldb, attr);
1489         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1490                 return true;
1491         }
1492         return false;
1493 }
1494
1495 /*
1496   process an AND expression (intersection)
1497  */
1498 static int ldb_kv_index_dn_and(struct ldb_module *module,
1499                                struct ldb_kv_private *ldb_kv,
1500                                const struct ldb_parse_tree *tree,
1501                                struct dn_list *list)
1502 {
1503         struct ldb_context *ldb;
1504         unsigned int i;
1505         bool found;
1506
1507         ldb = ldb_module_get_ctx(module);
1508
1509         list->dn = NULL;
1510         list->count = 0;
1511
1512         /* in the first pass we only look for unique simple
1513            equality tests, in the hope of avoiding having to look
1514            at any others */
1515         for (i=0; i<tree->u.list.num_elements; i++) {
1516                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1517                 int ret;
1518
1519                 if (subtree->operation != LDB_OP_EQUALITY ||
1520                     !ldb_kv_index_unique(
1521                         ldb, ldb_kv, subtree->u.equality.attr)) {
1522                         continue;
1523                 }
1524
1525                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1526                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1527                         /* 0 && X == 0 */
1528                         return LDB_ERR_NO_SUCH_OBJECT;
1529                 }
1530                 if (ret == LDB_SUCCESS) {
1531                         /* a unique index match means we can
1532                          * stop. Note that we don't care if we return
1533                          * a few too many objects, due to later
1534                          * filtering */
1535                         return LDB_SUCCESS;
1536                 }
1537         }
1538
1539         /* now do a full intersection */
1540         found = false;
1541
1542         for (i=0; i<tree->u.list.num_elements; i++) {
1543                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1544                 struct dn_list *list2;
1545                 int ret;
1546
1547                 list2 = talloc_zero(list, struct dn_list);
1548                 if (list2 == NULL) {
1549                         return ldb_module_oom(module);
1550                 }
1551
1552                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1553
1554                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1555                         /* X && 0 == 0 */
1556                         list->dn = NULL;
1557                         list->count = 0;
1558                         talloc_free(list2);
1559                         return LDB_ERR_NO_SUCH_OBJECT;
1560                 }
1561
1562                 if (ret != LDB_SUCCESS) {
1563                         /* this didn't adding anything */
1564                         talloc_free(list2);
1565                         continue;
1566                 }
1567
1568                 if (!found) {
1569                         talloc_reparent(list2, list, list->dn);
1570                         list->dn = list2->dn;
1571                         list->count = list2->count;
1572                         found = true;
1573                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1574                         talloc_free(list2);
1575                         return LDB_ERR_OPERATIONS_ERROR;
1576                 }
1577
1578                 if (list->count == 0) {
1579                         list->dn = NULL;
1580                         return LDB_ERR_NO_SUCH_OBJECT;
1581                 }
1582
1583                 if (list->count < 2) {
1584                         /* it isn't worth loading the next part of the tree */
1585                         return LDB_SUCCESS;
1586                 }
1587         }
1588
1589         if (!found) {
1590                 /* none of the attributes were indexed */
1591                 return LDB_ERR_OPERATIONS_ERROR;
1592         }
1593
1594         return LDB_SUCCESS;
1595 }
1596
1597 /*
1598   return a list of matching objects using a one-level index
1599  */
1600 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1601                                 struct ldb_kv_private *ldb_kv,
1602                                 const char *attr,
1603                                 struct ldb_dn *dn,
1604                                 struct dn_list *list,
1605                                 enum key_truncation *truncation)
1606 {
1607         struct ldb_context *ldb;
1608         struct ldb_dn *key;
1609         struct ldb_val val;
1610         int ret;
1611
1612         ldb = ldb_module_get_ctx(module);
1613
1614         /* work out the index key from the parent DN */
1615         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1616         val.length = strlen((char *)val.data);
1617         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1618         if (!key) {
1619                 ldb_oom(ldb);
1620                 return LDB_ERR_OPERATIONS_ERROR;
1621         }
1622
1623         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1624         talloc_free(key);
1625         if (ret != LDB_SUCCESS) {
1626                 return ret;
1627         }
1628
1629         if (list->count == 0) {
1630                 return LDB_ERR_NO_SUCH_OBJECT;
1631         }
1632
1633         return LDB_SUCCESS;
1634 }
1635
1636 /*
1637   return a list of matching objects using a one-level index
1638  */
1639 static int ldb_kv_index_dn_one(struct ldb_module *module,
1640                                struct ldb_kv_private *ldb_kv,
1641                                struct ldb_dn *parent_dn,
1642                                struct dn_list *list,
1643                                enum key_truncation *truncation)
1644 {
1645         /* Ensure we do not shortcut on intersection for this list */
1646         list->strict = true;
1647         return ldb_kv_index_dn_attr(
1648             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
1649 }
1650
1651 /*
1652   return a list of matching objects using the DN index
1653  */
1654 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1655                                    struct ldb_kv_private *ldb_kv,
1656                                    struct ldb_dn *base_dn,
1657                                    struct dn_list *dn_list,
1658                                    enum key_truncation *truncation)
1659 {
1660         const struct ldb_val *guid_val = NULL;
1661         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1662                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1663                 if (dn_list->dn == NULL) {
1664                         return ldb_module_oom(module);
1665                 }
1666                 dn_list->dn[0].data = discard_const_p(unsigned char,
1667                                                       ldb_dn_get_linearized(base_dn));
1668                 if (dn_list->dn[0].data == NULL) {
1669                         return ldb_module_oom(module);
1670                 }
1671                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1672                 dn_list->count = 1;
1673
1674                 return LDB_SUCCESS;
1675         }
1676
1677         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1678                 guid_val = ldb_dn_get_extended_component(
1679                     base_dn, ldb_kv->cache->GUID_index_dn_component);
1680         }
1681
1682         if (guid_val != NULL) {
1683                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1684                 if (dn_list->dn == NULL) {
1685                         return ldb_module_oom(module);
1686                 }
1687                 dn_list->dn[0].data = guid_val->data;
1688                 dn_list->dn[0].length = guid_val->length;
1689                 dn_list->count = 1;
1690
1691                 return LDB_SUCCESS;
1692         }
1693
1694         return ldb_kv_index_dn_attr(
1695             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
1696 }
1697
1698 /*
1699   return a list of dn's that might match a indexed search or
1700   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1701  */
1702 static int ldb_kv_index_dn(struct ldb_module *module,
1703                            struct ldb_kv_private *ldb_kv,
1704                            const struct ldb_parse_tree *tree,
1705                            struct dn_list *list)
1706 {
1707         int ret = LDB_ERR_OPERATIONS_ERROR;
1708
1709         switch (tree->operation) {
1710         case LDB_OP_AND:
1711                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1712                 break;
1713
1714         case LDB_OP_OR:
1715                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1716                 break;
1717
1718         case LDB_OP_NOT:
1719                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1720                 break;
1721
1722         case LDB_OP_EQUALITY:
1723                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1724                 break;
1725
1726         case LDB_OP_SUBSTRING:
1727         case LDB_OP_GREATER:
1728         case LDB_OP_LESS:
1729         case LDB_OP_PRESENT:
1730         case LDB_OP_APPROX:
1731         case LDB_OP_EXTENDED:
1732                 /* we can't index with fancy bitops yet */
1733                 ret = LDB_ERR_OPERATIONS_ERROR;
1734                 break;
1735         }
1736
1737         return ret;
1738 }
1739
1740 /*
1741   filter a candidate dn_list from an indexed search into a set of results
1742   extracting just the given attributes
1743 */
1744 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1745                                const struct dn_list *dn_list,
1746                                struct ldb_kv_context *ac,
1747                                uint32_t *match_count,
1748                                enum key_truncation scope_one_truncation)
1749 {
1750         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1751         struct ldb_message *msg;
1752         struct ldb_message *filtered_msg;
1753         unsigned int i;
1754         unsigned int num_keys = 0;
1755         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
1756         struct ldb_val *keys = NULL;
1757
1758         /*
1759          * We have to allocate the key list (rather than just walk the
1760          * caller supplied list) as the callback could change the list
1761          * (by modifying an indexed attribute hosted in the in-memory
1762          * index cache!)
1763          */
1764         keys = talloc_array(ac, struct ldb_val, dn_list->count);
1765         if (keys == NULL) {
1766                 return ldb_module_oom(ac->module);
1767         }
1768
1769         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1770                 /*
1771                  * We speculate that the keys will be GUID based and so
1772                  * pre-fill in enough space for a GUID (avoiding a pile of
1773                  * small allocations)
1774                  */
1775                 struct guid_tdb_key {
1776                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
1777                 } *key_values = NULL;
1778
1779                 key_values = talloc_array(keys,
1780                                           struct guid_tdb_key,
1781                                           dn_list->count);
1782
1783                 if (key_values == NULL) {
1784                         talloc_free(keys);
1785                         return ldb_module_oom(ac->module);
1786                 }
1787                 for (i = 0; i < dn_list->count; i++) {
1788                         keys[i].data = key_values[i].guid_key;
1789                         keys[i].length = sizeof(key_values[i].guid_key);
1790                 }
1791         } else {
1792                 for (i = 0; i < dn_list->count; i++) {
1793                         keys[i].data = NULL;
1794                         keys[i].length = 0;
1795                 }
1796         }
1797
1798         for (i = 0; i < dn_list->count; i++) {
1799                 int ret;
1800
1801                 ret = ldb_kv_idx_to_key(
1802                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1803                 if (ret != LDB_SUCCESS) {
1804                         talloc_free(keys);
1805                         return ret;
1806                 }
1807
1808                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1809                         /*
1810                          * If we are in GUID index mode, then the dn_list is
1811                          * sorted.  If we got a duplicate, forget about it, as
1812                          * otherwise we would send the same entry back more
1813                          * than once.
1814                          *
1815                          * This is needed in the truncated DN case, or if a
1816                          * duplicate was forced in via
1817                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1818                          */
1819
1820                         if (memcmp(previous_guid_key,
1821                                    keys[num_keys].data,
1822                                    sizeof(previous_guid_key)) == 0) {
1823                                 continue;
1824                         }
1825
1826                         memcpy(previous_guid_key,
1827                                keys[num_keys].data,
1828                                sizeof(previous_guid_key));
1829                 }
1830                 num_keys++;
1831         }
1832
1833
1834         /*
1835          * Now that the list is a safe copy, send the callbacks
1836          */
1837         for (i = 0; i < num_keys; i++) {
1838                 int ret;
1839                 bool matched;
1840                 msg = ldb_msg_new(ac);
1841                 if (!msg) {
1842                         talloc_free(keys);
1843                         return LDB_ERR_OPERATIONS_ERROR;
1844                 }
1845
1846                 ret =
1847                     ldb_kv_search_key(ac->module,
1848                                       ldb_kv,
1849                                       keys[i],
1850                                       msg,
1851                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1852                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1853                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1854                         /*
1855                          * the record has disappeared? yes, this can
1856                          * happen if the entry is deleted by something
1857                          * operating in the callback (not another
1858                          * process, as we have a read lock)
1859                          */
1860                         talloc_free(msg);
1861                         continue;
1862                 }
1863
1864                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1865                         /* an internal error */
1866                         talloc_free(keys);
1867                         talloc_free(msg);
1868                         return LDB_ERR_OPERATIONS_ERROR;
1869                 }
1870
1871                 /*
1872                  * We trust the index for LDB_SCOPE_ONELEVEL
1873                  * unless the index key has been truncated.
1874                  *
1875                  * LDB_SCOPE_BASE is not passed in by our only caller.
1876                  */
1877                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
1878                     ldb_kv->cache->one_level_indexes &&
1879                     scope_one_truncation == KEY_NOT_TRUNCATED) {
1880                         ret = ldb_match_message(ldb, msg, ac->tree,
1881                                                 ac->scope, &matched);
1882                 } else {
1883                         ret = ldb_match_msg_error(ldb, msg,
1884                                                   ac->tree, ac->base,
1885                                                   ac->scope, &matched);
1886                 }
1887
1888                 if (ret != LDB_SUCCESS) {
1889                         talloc_free(keys);
1890                         talloc_free(msg);
1891                         return ret;
1892                 }
1893                 if (!matched) {
1894                         talloc_free(msg);
1895                         continue;
1896                 }
1897
1898                 /* filter the attributes that the user wants */
1899                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1900
1901                 talloc_free(msg);
1902
1903                 if (ret == -1) {
1904                         talloc_free(keys);
1905                         return LDB_ERR_OPERATIONS_ERROR;
1906                 }
1907
1908                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1909                 if (ret != LDB_SUCCESS) {
1910                         /* Regardless of success or failure, the msg
1911                          * is the callbacks responsiblity, and should
1912                          * not be talloc_free()'ed */
1913                         ac->request_terminated = true;
1914                         talloc_free(keys);
1915                         return ret;
1916                 }
1917
1918                 (*match_count)++;
1919         }
1920
1921         TALLOC_FREE(keys);
1922         return LDB_SUCCESS;
1923 }
1924
1925 /*
1926   sort a DN list
1927  */
1928 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
1929                                 struct dn_list *list)
1930 {
1931         if (list->count < 2) {
1932                 return;
1933         }
1934
1935         /* We know the list is sorted when using the GUID index */
1936         if (ltdb->cache->GUID_index_attribute != NULL) {
1937                 return;
1938         }
1939
1940         TYPESAFE_QSORT(list->dn, list->count,
1941                        ldb_val_equal_exact_for_qsort);
1942 }
1943
1944 /*
1945   search the database with a LDAP-like expression using indexes
1946   returns -1 if an indexed search is not possible, in which
1947   case the caller should call ltdb_search_full()
1948 */
1949 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1950 {
1951         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1952         struct ldb_kv_private *ldb_kv = talloc_get_type(
1953             ldb_module_get_private(ac->module), struct ldb_kv_private);
1954         struct dn_list *dn_list;
1955         int ret;
1956         enum ldb_scope index_scope;
1957         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1958
1959         /* see if indexing is enabled */
1960         if (!ldb_kv->cache->attribute_indexes &&
1961             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
1962                 /* fallback to a full search */
1963                 return LDB_ERR_OPERATIONS_ERROR;
1964         }
1965
1966         dn_list = talloc_zero(ac, struct dn_list);
1967         if (dn_list == NULL) {
1968                 return ldb_module_oom(ac->module);
1969         }
1970
1971         /*
1972          * For the purposes of selecting the switch arm below, if we
1973          * don't have a one-level index then treat it like a subtree
1974          * search
1975          */
1976         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1977             !ldb_kv->cache->one_level_indexes) {
1978                 index_scope = LDB_SCOPE_SUBTREE;
1979         } else {
1980                 index_scope = ac->scope;
1981         }
1982
1983         switch (index_scope) {
1984         case LDB_SCOPE_BASE:
1985                 /*
1986                  * The only caller will have filtered the operation out
1987                  * so we should never get here
1988                  */
1989                 return ldb_operr(ldb);
1990
1991         case LDB_SCOPE_ONELEVEL:
1992                 /*
1993                  * If we ever start to also load the index values for
1994                  * the tree, we must ensure we strictly intersect with
1995                  * this list, as we trust the ONELEVEL index
1996                  */
1997                 ret = ldb_kv_index_dn_one(ac->module,
1998                                           ldb_kv,
1999                                           ac->base,
2000                                           dn_list,
2001                                           &scope_one_truncation);
2002                 if (ret != LDB_SUCCESS) {
2003                         talloc_free(dn_list);
2004                         return ret;
2005                 }
2006
2007                 /*
2008                  * If we have too many matches, running the filter
2009                  * tree over the SCOPE_ONELEVEL can be quite expensive
2010                  * so we now check the filter tree index as well.
2011                  *
2012                  * We only do this in the GUID index mode, which is
2013                  * O(n*log(m)) otherwise the intersection below will
2014                  * be too costly at O(n*m).
2015                  *
2016                  * We don't set a heuristic for 'too many' but instead
2017                  * do it always and rely on the index lookup being
2018                  * fast enough in the small case.
2019                  */
2020                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2021                         struct dn_list *idx_one_tree_list
2022                                 = talloc_zero(ac, struct dn_list);
2023                         if (idx_one_tree_list == NULL) {
2024                                 return ldb_module_oom(ac->module);
2025                         }
2026
2027                         if (!ldb_kv->cache->attribute_indexes) {
2028                                 talloc_free(idx_one_tree_list);
2029                                 talloc_free(dn_list);
2030                                 return LDB_ERR_OPERATIONS_ERROR;
2031                         }
2032                         /*
2033                          * Here we load the index for the tree.
2034                          *
2035                          * We only care if this is successful, if the
2036                          * index can't trim the result list down then
2037                          * the ONELEVEL index is still good enough.
2038                          */
2039                         ret = ldb_kv_index_dn(
2040                             ac->module, ldb_kv, ac->tree, idx_one_tree_list);
2041                         if (ret == LDB_SUCCESS) {
2042                                 if (!list_intersect(ldb,
2043                                                     ldb_kv,
2044                                                     dn_list,
2045                                                     idx_one_tree_list)) {
2046                                         talloc_free(idx_one_tree_list);
2047                                         talloc_free(dn_list);
2048                                         return LDB_ERR_OPERATIONS_ERROR;
2049                                 }
2050                         }
2051                 }
2052                 break;
2053
2054         case LDB_SCOPE_SUBTREE:
2055         case LDB_SCOPE_DEFAULT:
2056                 if (!ldb_kv->cache->attribute_indexes) {
2057                         talloc_free(dn_list);
2058                         return LDB_ERR_OPERATIONS_ERROR;
2059                 }
2060                 /*
2061                  * Here we load the index for the tree.  We have no
2062                  * index for the subtree.
2063                  */
2064                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2065                 if (ret != LDB_SUCCESS) {
2066                         talloc_free(dn_list);
2067                         return ret;
2068                 }
2069                 break;
2070         }
2071
2072         /*
2073          * It is critical that this function do the re-filter even
2074          * on things found by the index as the index can over-match
2075          * in cases of truncation (as well as when it decides it is
2076          * not worth further filtering)
2077          *
2078          * If this changes, then the index code above would need to
2079          * pass up a flag to say if any index was truncated during
2080          * processing as the truncation here refers only to the
2081          * SCOPE_ONELEVEL index.
2082          */
2083         ret = ldb_kv_index_filter(
2084             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2085         talloc_free(dn_list);
2086         return ret;
2087 }
2088
2089 /**
2090  * @brief Add a DN in the index list of a given attribute name/value pair
2091  *
2092  * This function will add the DN in the index list for the index for
2093  * the given attribute name and value.
2094  *
2095  * @param[in]  module       A ldb_module structure
2096  *
2097  * @param[in]  dn           The string representation of the DN as it
2098  *                          will be stored in the index entry
2099  *
2100  * @param[in]  el           A ldb_message_element array, one of the entry
2101  *                          referred by the v_idx is the attribute name and
2102  *                          value pair which will be used to construct the
2103  *                          index name
2104  *
2105  * @param[in]  v_idx        The index of element in the el array to use
2106  *
2107  * @return                  An ldb error code
2108  */
2109 static int ldb_kv_index_add1(struct ldb_module *module,
2110                              struct ldb_kv_private *ldb_kv,
2111                              const struct ldb_message *msg,
2112                              struct ldb_message_element *el,
2113                              int v_idx)
2114 {
2115         struct ldb_context *ldb;
2116         struct ldb_dn *dn_key;
2117         int ret;
2118         const struct ldb_schema_attribute *a;
2119         struct dn_list *list;
2120         unsigned alloc_len;
2121         enum key_truncation truncation = KEY_TRUNCATED;
2122
2123
2124         ldb = ldb_module_get_ctx(module);
2125
2126         list = talloc_zero(module, struct dn_list);
2127         if (list == NULL) {
2128                 return LDB_ERR_OPERATIONS_ERROR;
2129         }
2130
2131         dn_key = ldb_kv_index_key(
2132             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2133         if (!dn_key) {
2134                 talloc_free(list);
2135                 return LDB_ERR_OPERATIONS_ERROR;
2136         }
2137         /*
2138          * Samba only maintains unique indexes on the objectSID and objectGUID
2139          * so if a unique index key exceeds the maximum length there is a
2140          * problem.
2141          */
2142         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2143                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2144                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2145
2146                 ldb_asprintf_errstring(
2147                     ldb,
2148                     __location__ ": unique index key on %s in %s, "
2149                                  "exceeds maximum key length of %u (encoded).",
2150                     el->name,
2151                     ldb_dn_get_linearized(msg->dn),
2152                     ldb_kv->max_key_length);
2153                 talloc_free(list);
2154                 return LDB_ERR_CONSTRAINT_VIOLATION;
2155         }
2156         talloc_steal(list, dn_key);
2157
2158         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2159         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2160                 talloc_free(list);
2161                 return ret;
2162         }
2163
2164         /*
2165          * Check for duplicates in the @IDXDN DN -> GUID record
2166          *
2167          * This is very normal, it just means a duplicate DN creation
2168          * was attempted, so don't set the error string or print scary
2169          * messages.
2170          */
2171         if (list->count > 0 &&
2172             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2173             truncation == KEY_NOT_TRUNCATED) {
2174
2175                 talloc_free(list);
2176                 return LDB_ERR_CONSTRAINT_VIOLATION;
2177
2178         } else if (list->count > 0
2179                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2180
2181                 /*
2182                  * At least one existing entry in the DN->GUID index, which
2183                  * arises when the DN indexes have been truncated
2184                  *
2185                  * So need to pull the DN's to check if it's really a duplicate
2186                  */
2187                 int i;
2188                 for (i=0; i < list->count; i++) {
2189                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2190                         struct ldb_val key = {
2191                                 .data = guid_key,
2192                                 .length = sizeof(guid_key)
2193                         };
2194                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2195                         struct ldb_message *rec = ldb_msg_new(ldb);
2196                         if (rec == NULL) {
2197                                 return LDB_ERR_OPERATIONS_ERROR;
2198                         }
2199
2200                         ret = ldb_kv_idx_to_key(
2201                             module, ldb_kv, ldb, &list->dn[i], &key);
2202                         if (ret != LDB_SUCCESS) {
2203                                 TALLOC_FREE(list);
2204                                 TALLOC_FREE(rec);
2205                                 return ret;
2206                         }
2207
2208                         ret =
2209                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2210                         if (key.data != guid_key) {
2211                                 TALLOC_FREE(key.data);
2212                         }
2213                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2214                                 /*
2215                                  * the record has disappeared?
2216                                  * yes, this can happen
2217                                  */
2218                                 talloc_free(rec);
2219                                 continue;
2220                         }
2221
2222                         if (ret != LDB_SUCCESS) {
2223                                 /* an internal error */
2224                                 TALLOC_FREE(rec);
2225                                 TALLOC_FREE(list);
2226                                 return LDB_ERR_OPERATIONS_ERROR;
2227                         }
2228                         /*
2229                          * The DN we are trying to add to the DB and index
2230                          * is already here, so we must deny the addition
2231                          */
2232                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2233                                 TALLOC_FREE(rec);
2234                                 TALLOC_FREE(list);
2235                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2236                         }
2237                 }
2238         }
2239
2240         /*
2241          * Check for duplicates in unique indexes
2242          *
2243          * We don't need to do a loop test like the @IDXDN case
2244          * above as we have a ban on long unique index values
2245          * at the start of this function.
2246          */
2247         if (list->count > 0 &&
2248             ((a != NULL
2249               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2250                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2251                 /*
2252                  * We do not want to print info about a possibly
2253                  * confidential DN that the conflict was with in the
2254                  * user-visible error string
2255                  */
2256
2257                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2258                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2259                                   __location__
2260                                   ": unique index violation on %s in %s, "
2261                                   "conficts with %*.*s in %s",
2262                                   el->name, ldb_dn_get_linearized(msg->dn),
2263                                   (int)list->dn[0].length,
2264                                   (int)list->dn[0].length,
2265                                   list->dn[0].data,
2266                                   ldb_dn_get_linearized(dn_key));
2267                 } else {
2268                         /* This can't fail, gives a default at worst */
2269                         const struct ldb_schema_attribute *attr =
2270                             ldb_schema_attribute_by_name(
2271                                 ldb, ldb_kv->cache->GUID_index_attribute);
2272                         struct ldb_val v;
2273                         ret = attr->syntax->ldif_write_fn(ldb, list,
2274                                                           &list->dn[0], &v);
2275                         if (ret == LDB_SUCCESS) {
2276                                 ldb_debug(ldb,
2277                                           LDB_DEBUG_WARNING,
2278                                           __location__
2279                                           ": unique index violation on %s in "
2280                                           "%s, conficts with %s %*.*s in %s",
2281                                           el->name,
2282                                           ldb_dn_get_linearized(msg->dn),
2283                                           ldb_kv->cache->GUID_index_attribute,
2284                                           (int)v.length,
2285                                           (int)v.length,
2286                                           v.data,
2287                                           ldb_dn_get_linearized(dn_key));
2288                         }
2289                 }
2290                 ldb_asprintf_errstring(ldb,
2291                                        __location__ ": unique index violation "
2292                                        "on %s in %s",
2293                                        el->name,
2294                                        ldb_dn_get_linearized(msg->dn));
2295                 talloc_free(list);
2296                 return LDB_ERR_CONSTRAINT_VIOLATION;
2297         }
2298
2299         /* overallocate the list a bit, to reduce the number of
2300          * realloc trigered copies */
2301         alloc_len = ((list->count+1)+7) & ~7;
2302         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2303         if (list->dn == NULL) {
2304                 talloc_free(list);
2305                 return LDB_ERR_OPERATIONS_ERROR;
2306         }
2307
2308         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2309                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2310                 list->dn[list->count].data
2311                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2312                 if (list->dn[list->count].data == NULL) {
2313                         talloc_free(list);
2314                         return LDB_ERR_OPERATIONS_ERROR;
2315                 }
2316                 list->dn[list->count].length = strlen(dn_str);
2317         } else {
2318                 const struct ldb_val *key_val;
2319                 struct ldb_val *exact = NULL, *next = NULL;
2320                 key_val = ldb_msg_find_ldb_val(
2321                     msg, ldb_kv->cache->GUID_index_attribute);
2322                 if (key_val == NULL) {
2323                         talloc_free(list);
2324                         return ldb_module_operr(module);
2325                 }
2326
2327                 if (key_val->length != LDB_KV_GUID_SIZE) {
2328                         talloc_free(list);
2329                         return ldb_module_operr(module);
2330                 }
2331
2332                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2333                                         *key_val, ldb_val_equal_exact_ordered,
2334                                         exact, next);
2335
2336                 /*
2337                  * Give a warning rather than fail, this could be a
2338                  * duplicate value in the record allowed by a caller
2339                  * forcing in the value with
2340                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2341                  */
2342                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2343                         /* This can't fail, gives a default at worst */
2344                         const struct ldb_schema_attribute *attr =
2345                             ldb_schema_attribute_by_name(
2346                                 ldb, ldb_kv->cache->GUID_index_attribute);
2347                         struct ldb_val v;
2348                         ret = attr->syntax->ldif_write_fn(ldb, list,
2349                                                           exact, &v);
2350                         if (ret == LDB_SUCCESS) {
2351                                 ldb_debug(ldb,
2352                                           LDB_DEBUG_WARNING,
2353                                           __location__
2354                                           ": duplicate attribute value in %s "
2355                                           "for index on %s, "
2356                                           "duplicate of %s %*.*s in %s",
2357                                           ldb_dn_get_linearized(msg->dn),
2358                                           el->name,
2359                                           ldb_kv->cache->GUID_index_attribute,
2360                                           (int)v.length,
2361                                           (int)v.length,
2362                                           v.data,
2363                                           ldb_dn_get_linearized(dn_key));
2364                         }
2365                 }
2366
2367                 if (next == NULL) {
2368                         next = &list->dn[list->count];
2369                 } else {
2370                         memmove(&next[1], next,
2371                                 sizeof(*next) * (list->count - (next - list->dn)));
2372                 }
2373                 *next = ldb_val_dup(list->dn, key_val);
2374                 if (next->data == NULL) {
2375                         talloc_free(list);
2376                         return ldb_module_operr(module);
2377                 }
2378         }
2379         list->count++;
2380
2381         ret = ldb_kv_dn_list_store(module, dn_key, list);
2382
2383         talloc_free(list);
2384
2385         return ret;
2386 }
2387
2388 /*
2389   add index entries for one elements in a message
2390  */
2391 static int ldb_kv_index_add_el(struct ldb_module *module,
2392                                struct ldb_kv_private *ldb_kv,
2393                                const struct ldb_message *msg,
2394                                struct ldb_message_element *el)
2395 {
2396         unsigned int i;
2397         for (i = 0; i < el->num_values; i++) {
2398                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2399                 if (ret != LDB_SUCCESS) {
2400                         return ret;
2401                 }
2402         }
2403
2404         return LDB_SUCCESS;
2405 }
2406
2407 /*
2408   add index entries for all elements in a message
2409  */
2410 static int ldb_kv_index_add_all(struct ldb_module *module,
2411                                 struct ldb_kv_private *ldb_kv,
2412                                 const struct ldb_message *msg)
2413 {
2414         struct ldb_message_element *elements = msg->elements;
2415         unsigned int i;
2416         const char *dn_str;
2417         int ret;
2418
2419         if (ldb_dn_is_special(msg->dn)) {
2420                 return LDB_SUCCESS;
2421         }
2422
2423         dn_str = ldb_dn_get_linearized(msg->dn);
2424         if (dn_str == NULL) {
2425                 return LDB_ERR_OPERATIONS_ERROR;
2426         }
2427
2428         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2429         if (ret != LDB_SUCCESS) {
2430                 return ret;
2431         }
2432
2433         if (!ldb_kv->cache->attribute_indexes) {
2434                 /* no indexed fields */
2435                 return LDB_SUCCESS;
2436         }
2437
2438         for (i = 0; i < msg->num_elements; i++) {
2439                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2440                         continue;
2441                 }
2442                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2443                 if (ret != LDB_SUCCESS) {
2444                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2445                         ldb_asprintf_errstring(ldb,
2446                                                __location__ ": Failed to re-index %s in %s - %s",
2447                                                elements[i].name, dn_str,
2448                                                ldb_errstring(ldb));
2449                         return ret;
2450                 }
2451         }
2452
2453         return LDB_SUCCESS;
2454 }
2455
2456
2457 /*
2458   insert a DN index for a message
2459 */
2460 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2461                                   struct ldb_kv_private *ldb_kv,
2462                                   const struct ldb_message *msg,
2463                                   struct ldb_dn *dn,
2464                                   const char *index,
2465                                   int add)
2466 {
2467         struct ldb_message_element el;
2468         struct ldb_val val;
2469         int ret;
2470
2471         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2472         if (val.data == NULL) {
2473                 const char *dn_str = ldb_dn_get_linearized(dn);
2474                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2475                                        __location__ ": Failed to modify %s "
2476                                                     "against %s in %s: failed "
2477                                                     "to get casefold DN",
2478                                        index,
2479                                        ldb_kv->cache->GUID_index_attribute,
2480                                        dn_str);
2481                 return LDB_ERR_OPERATIONS_ERROR;
2482         }
2483
2484         val.length = strlen((char *)val.data);
2485         el.name = index;
2486         el.values = &val;
2487         el.num_values = 1;
2488
2489         if (add) {
2490                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2491         } else { /* delete */
2492                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2493         }
2494
2495         if (ret != LDB_SUCCESS) {
2496                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2497                 const char *dn_str = ldb_dn_get_linearized(dn);
2498                 ldb_asprintf_errstring(ldb,
2499                                        __location__ ": Failed to modify %s "
2500                                                     "against %s in %s - %s",
2501                                        index,
2502                                        ldb_kv->cache->GUID_index_attribute,
2503                                        dn_str,
2504                                        ldb_errstring(ldb));
2505                 return ret;
2506         }
2507         return ret;
2508 }
2509
2510 /*
2511   insert a one level index for a message
2512 */
2513 static int ldb_kv_index_onelevel(struct ldb_module *module,
2514                                  const struct ldb_message *msg,
2515                                  int add)
2516 {
2517         struct ldb_kv_private *ldb_kv = talloc_get_type(
2518             ldb_module_get_private(module), struct ldb_kv_private);
2519         struct ldb_dn *pdn;
2520         int ret;
2521
2522         /* We index for ONE Level only if requested */
2523         if (!ldb_kv->cache->one_level_indexes) {
2524                 return LDB_SUCCESS;
2525         }
2526
2527         pdn = ldb_dn_get_parent(module, msg->dn);
2528         if (pdn == NULL) {
2529                 return LDB_ERR_OPERATIONS_ERROR;
2530         }
2531         ret =
2532             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2533
2534         talloc_free(pdn);
2535
2536         return ret;
2537 }
2538
2539 /*
2540   insert a one level index for a message
2541 */
2542 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2543                                       const struct ldb_message *msg,
2544                                       int add)
2545 {
2546         int ret;
2547         struct ldb_kv_private *ldb_kv = talloc_get_type(
2548             ldb_module_get_private(module), struct ldb_kv_private);
2549
2550         /* We index for DN only if using a GUID index */
2551         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2552                 return LDB_SUCCESS;
2553         }
2554
2555         ret = ldb_kv_modify_index_dn(
2556             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2557
2558         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2559                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2560                                        "Entry %s already exists",
2561                                        ldb_dn_get_linearized(msg->dn));
2562                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2563         }
2564         return ret;
2565 }
2566
2567 /*
2568   add the index entries for a new element in a record
2569   The caller guarantees that these element values are not yet indexed
2570 */
2571 int ldb_kv_index_add_element(struct ldb_module *module,
2572                              struct ldb_kv_private *ldb_kv,
2573                              const struct ldb_message *msg,
2574                              struct ldb_message_element *el)
2575 {
2576         if (ldb_dn_is_special(msg->dn)) {
2577                 return LDB_SUCCESS;
2578         }
2579         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2580                 return LDB_SUCCESS;
2581         }
2582         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2583 }
2584
2585 /*
2586   add the index entries for a new record
2587 */
2588 int ldb_kv_index_add_new(struct ldb_module *module,
2589                          struct ldb_kv_private *ldb_kv,
2590                          const struct ldb_message *msg)
2591 {
2592         int ret;
2593
2594         if (ldb_dn_is_special(msg->dn)) {
2595                 return LDB_SUCCESS;
2596         }
2597
2598         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2599         if (ret != LDB_SUCCESS) {
2600                 /*
2601                  * Because we can't trust the caller to be doing
2602                  * transactions properly, clean up any index for this
2603                  * entry rather than relying on a transaction
2604                  * cleanup
2605                  */
2606
2607                 ldb_kv_index_delete(module, msg);
2608                 return ret;
2609         }
2610
2611         ret = ldb_kv_index_onelevel(module, msg, 1);
2612         if (ret != LDB_SUCCESS) {
2613                 /*
2614                  * Because we can't trust the caller to be doing
2615                  * transactions properly, clean up any index for this
2616                  * entry rather than relying on a transaction
2617                  * cleanup
2618                  */
2619                 ldb_kv_index_delete(module, msg);
2620                 return ret;
2621         }
2622         return ret;
2623 }
2624
2625
2626 /*
2627   delete an index entry for one message element
2628 */
2629 int ldb_kv_index_del_value(struct ldb_module *module,
2630                            struct ldb_kv_private *ldb_kv,
2631                            const struct ldb_message *msg,
2632                            struct ldb_message_element *el,
2633                            unsigned int v_idx)
2634 {
2635         struct ldb_context *ldb;
2636         struct ldb_dn *dn_key;
2637         const char *dn_str;
2638         int ret, i;
2639         unsigned int j;
2640         struct dn_list *list;
2641         struct ldb_dn *dn = msg->dn;
2642         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2643
2644         ldb = ldb_module_get_ctx(module);
2645
2646         dn_str = ldb_dn_get_linearized(dn);
2647         if (dn_str == NULL) {
2648                 return LDB_ERR_OPERATIONS_ERROR;
2649         }
2650
2651         if (dn_str[0] == '@') {
2652                 return LDB_SUCCESS;
2653         }
2654
2655         dn_key = ldb_kv_index_key(
2656             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2657         /*
2658          * We ignore key truncation in ltdb_index_add1() so
2659          * match that by ignoring it here as well
2660          *
2661          * Multiple values are legitimate and accepted
2662          */
2663         if (!dn_key) {
2664                 return LDB_ERR_OPERATIONS_ERROR;
2665         }
2666
2667         list = talloc_zero(dn_key, struct dn_list);
2668         if (list == NULL) {
2669                 talloc_free(dn_key);
2670                 return LDB_ERR_OPERATIONS_ERROR;
2671         }
2672
2673         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2674         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2675                 /* it wasn't indexed. Did we have an earlier error? If we did then
2676                    its gone now */
2677                 talloc_free(dn_key);
2678                 return LDB_SUCCESS;
2679         }
2680
2681         if (ret != LDB_SUCCESS) {
2682                 talloc_free(dn_key);
2683                 return ret;
2684         }
2685
2686         /*
2687          * Find one of the values matching this message to remove
2688          */
2689         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2690         if (i == -1) {
2691                 /* nothing to delete */
2692                 talloc_free(dn_key);
2693                 return LDB_SUCCESS;
2694         }
2695
2696         j = (unsigned int) i;
2697         if (j != list->count - 1) {
2698                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2699         }
2700         list->count--;
2701         if (list->count == 0) {
2702                 talloc_free(list->dn);
2703                 list->dn = NULL;
2704         } else {
2705                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2706         }
2707
2708         ret = ldb_kv_dn_list_store(module, dn_key, list);
2709
2710         talloc_free(dn_key);
2711
2712         return ret;
2713 }
2714
2715 /*
2716   delete the index entries for a element
2717   return -1 on failure
2718 */
2719 int ldb_kv_index_del_element(struct ldb_module *module,
2720                              struct ldb_kv_private *ldb_kv,
2721                              const struct ldb_message *msg,
2722                              struct ldb_message_element *el)
2723 {
2724         const char *dn_str;
2725         int ret;
2726         unsigned int i;
2727
2728         if (!ldb_kv->cache->attribute_indexes) {
2729                 /* no indexed fields */
2730                 return LDB_SUCCESS;
2731         }
2732
2733         dn_str = ldb_dn_get_linearized(msg->dn);
2734         if (dn_str == NULL) {
2735                 return LDB_ERR_OPERATIONS_ERROR;
2736         }
2737
2738         if (dn_str[0] == '@') {
2739                 return LDB_SUCCESS;
2740         }
2741
2742         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2743                 return LDB_SUCCESS;
2744         }
2745         for (i = 0; i < el->num_values; i++) {
2746                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2747                 if (ret != LDB_SUCCESS) {
2748                         return ret;
2749                 }
2750         }
2751
2752         return LDB_SUCCESS;
2753 }
2754
2755 /*
2756   delete the index entries for a record
2757   return -1 on failure
2758 */
2759 int ldb_kv_index_delete(struct ldb_module *module,
2760                         const struct ldb_message *msg)
2761 {
2762         struct ldb_kv_private *ldb_kv = talloc_get_type(
2763             ldb_module_get_private(module), struct ldb_kv_private);
2764         int ret;
2765         unsigned int i;
2766
2767         if (ldb_dn_is_special(msg->dn)) {
2768                 return LDB_SUCCESS;
2769         }
2770
2771         ret = ldb_kv_index_onelevel(module, msg, 0);
2772         if (ret != LDB_SUCCESS) {
2773                 return ret;
2774         }
2775
2776         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2777         if (ret != LDB_SUCCESS) {
2778                 return ret;
2779         }
2780
2781         if (!ldb_kv->cache->attribute_indexes) {
2782                 /* no indexed fields */
2783                 return LDB_SUCCESS;
2784         }
2785
2786         for (i = 0; i < msg->num_elements; i++) {
2787                 ret = ldb_kv_index_del_element(
2788                     module, ldb_kv, msg, &msg->elements[i]);
2789                 if (ret != LDB_SUCCESS) {
2790                         return ret;
2791                 }
2792         }
2793
2794         return LDB_SUCCESS;
2795 }
2796
2797
2798 /*
2799   traversal function that deletes all @INDEX records in the in-memory
2800   TDB.
2801
2802   This does not touch the actual DB, that is done at transaction
2803   commit, which in turn greatly reduces DB churn as we will likely
2804   be able to do a direct update into the old record.
2805 */
2806 static int delete_index(struct ldb_kv_private *ldb_kv,
2807                         struct ldb_val key,
2808                         struct ldb_val data,
2809                         void *state)
2810 {
2811         struct ldb_module *module = state;
2812         const char *dnstr = "DN=" LDB_KV_INDEX ":";
2813         struct dn_list list;
2814         struct ldb_dn *dn;
2815         struct ldb_val v;
2816         int ret;
2817
2818         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2819                 return 0;
2820         }
2821         /* we need to put a empty list in the internal tdb for this
2822          * index entry */
2823         list.dn = NULL;
2824         list.count = 0;
2825
2826         /* the offset of 3 is to remove the DN= prefix. */
2827         v.data = key.data + 3;
2828         v.length = strnlen((char *)key.data, key.length) - 3;
2829
2830         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2831
2832         /*
2833          * This does not actually touch the DB quite yet, just
2834          * the in-memory index cache
2835          */
2836         ret = ldb_kv_dn_list_store(module, dn, &list);
2837         if (ret != LDB_SUCCESS) {
2838                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2839                                        "Unable to store null index for %s\n",
2840                                                 ldb_dn_get_linearized(dn));
2841                 talloc_free(dn);
2842                 return -1;
2843         }
2844         talloc_free(dn);
2845         return 0;
2846 }
2847
2848 /*
2849   traversal function that adds @INDEX records during a re index TODO wrong comment
2850 */
2851 static int re_key(struct ldb_kv_private *ldb_kv,
2852                   struct ldb_val key,
2853                   struct ldb_val val,
2854                   void *state)
2855 {
2856         struct ldb_context *ldb;
2857         struct ldb_kv_reindex_context *ctx =
2858             (struct ldb_kv_reindex_context *)state;
2859         struct ldb_module *module = ctx->module;
2860         struct ldb_message *msg;
2861         unsigned int nb_elements_in_db;
2862         int ret;
2863         struct ldb_val key2;
2864         bool is_record;
2865
2866         ldb = ldb_module_get_ctx(module);
2867
2868         if (key.length > 4 &&
2869             memcmp(key.data, "DN=@", 4) == 0) {
2870                 return 0;
2871         }
2872
2873         is_record = ldb_kv_key_is_record(key);
2874         if (is_record == false) {
2875                 return 0;
2876         }
2877
2878         msg = ldb_msg_new(module);
2879         if (msg == NULL) {
2880                 return -1;
2881         }
2882
2883         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2884                                                    msg,
2885                                                    NULL, 0,
2886                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2887                                                    &nb_elements_in_db);
2888         if (ret != 0) {
2889                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2890                                                 ldb_dn_get_linearized(msg->dn));
2891                 ctx->error = ret;
2892                 talloc_free(msg);
2893                 return -1;
2894         }
2895
2896         if (msg->dn == NULL) {
2897                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2898                           "Refusing to re-index as GUID "
2899                           "key %*.*s with no DN\n",
2900                           (int)key.length, (int)key.length,
2901                           (char *)key.data);
2902                 talloc_free(msg);
2903                 return -1;
2904         }
2905
2906         /* check if the DN key has changed, perhaps due to the case
2907            insensitivity of an element changing, or a change from DN
2908            to GUID keys */
2909         key2 = ldb_kv_key_msg(module, msg, msg);
2910         if (key2.data == NULL) {
2911                 /* probably a corrupt record ... darn */
2912                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2913                                                 ldb_dn_get_linearized(msg->dn));
2914                 talloc_free(msg);
2915                 return 0;
2916         }
2917         if (key.length != key2.length ||
2918             (memcmp(key.data, key2.data, key.length) != 0)) {
2919                 ldb_kv->kv_ops->update_in_iterate(
2920                     ldb_kv, key, key2, val, ctx);
2921         }
2922         talloc_free(key2.data);
2923
2924         talloc_free(msg);
2925
2926         ctx->count++;
2927         if (ctx->count % 10000 == 0) {
2928                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2929                           "Reindexing: re-keyed %u records so far",
2930                           ctx->count);
2931         }
2932
2933         return 0;
2934 }
2935
2936 /*
2937   traversal function that adds @INDEX records during a re index
2938 */
2939 static int re_index(struct ldb_kv_private *ldb_kv,
2940                     struct ldb_val key,
2941                     struct ldb_val val,
2942                     void *state)
2943 {
2944         struct ldb_context *ldb;
2945         struct ldb_kv_reindex_context *ctx =
2946             (struct ldb_kv_reindex_context *)state;
2947         struct ldb_module *module = ctx->module;
2948         struct ldb_message *msg;
2949         unsigned int nb_elements_in_db;
2950         int ret;
2951         bool is_record;
2952
2953         ldb = ldb_module_get_ctx(module);
2954
2955         if (key.length > 4 &&
2956             memcmp(key.data, "DN=@", 4) == 0) {
2957                 return 0;
2958         }
2959
2960         is_record = ldb_kv_key_is_record(key);
2961         if (is_record == false) {
2962                 return 0;
2963         }
2964
2965         msg = ldb_msg_new(module);
2966         if (msg == NULL) {
2967                 return -1;
2968         }
2969
2970         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2971                                                    msg,
2972                                                    NULL, 0,
2973                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2974                                                    &nb_elements_in_db);
2975         if (ret != 0) {
2976                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2977                                                 ldb_dn_get_linearized(msg->dn));
2978                 ctx->error = ret;
2979                 talloc_free(msg);
2980                 return -1;
2981         }
2982
2983         if (msg->dn == NULL) {
2984                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2985                           "Refusing to re-index as GUID "
2986                           "key %*.*s with no DN\n",
2987                           (int)key.length, (int)key.length,
2988                           (char *)key.data);
2989                 talloc_free(msg);
2990                 return -1;
2991         }
2992
2993         ret = ldb_kv_index_onelevel(module, msg, 1);
2994         if (ret != LDB_SUCCESS) {
2995                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2996                           "Adding special ONE LEVEL index failed (%s)!",
2997                                                 ldb_dn_get_linearized(msg->dn));
2998                 talloc_free(msg);
2999                 return -1;
3000         }
3001
3002         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3003
3004         if (ret != LDB_SUCCESS) {
3005                 ctx->error = ret;
3006                 talloc_free(msg);
3007                 return -1;
3008         }
3009
3010         talloc_free(msg);
3011
3012         ctx->count++;
3013         if (ctx->count % 10000 == 0) {
3014                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3015                           "Reindexing: re-indexed %u records so far",
3016                           ctx->count);
3017         }
3018
3019         return 0;
3020 }
3021
3022 /*
3023   force a complete reindex of the database
3024 */
3025 int ldb_kv_reindex(struct ldb_module *module)
3026 {
3027         struct ldb_kv_private *ldb_kv = talloc_get_type(
3028             ldb_module_get_private(module), struct ldb_kv_private);
3029         int ret;
3030         struct ldb_kv_reindex_context ctx;
3031
3032         /*
3033          * Only triggered after a modification, but make clear we do
3034          * not re-index a read-only DB
3035          */
3036         if (ldb_kv->read_only) {
3037                 return LDB_ERR_UNWILLING_TO_PERFORM;
3038         }
3039
3040         if (ldb_kv_cache_reload(module) != 0) {
3041                 return LDB_ERR_OPERATIONS_ERROR;
3042         }
3043
3044         /*
3045          * Ensure we read (and so remove) the entries from the real
3046          * DB, no values stored so far are any use as we want to do a
3047          * re-index
3048          */
3049         ldb_kv_index_transaction_cancel(module);
3050
3051         ret = ldb_kv_index_transaction_start(module);
3052         if (ret != LDB_SUCCESS) {
3053                 return ret;
3054         }
3055
3056         /* first traverse the database deleting any @INDEX records by
3057          * putting NULL entries in the in-memory tdb
3058          */
3059         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3060         if (ret < 0) {
3061                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3062                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3063                                        ldb_errstring(ldb));
3064                 return LDB_ERR_OPERATIONS_ERROR;
3065         }
3066
3067         ctx.module = module;
3068         ctx.error = 0;
3069         ctx.count = 0;
3070
3071         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3072         if (ret < 0) {
3073                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3074                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3075                                        ldb_errstring(ldb));
3076                 return LDB_ERR_OPERATIONS_ERROR;
3077         }
3078
3079         if (ctx.error != LDB_SUCCESS) {
3080                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3081                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3082                 return ctx.error;
3083         }
3084
3085         ctx.error = 0;
3086         ctx.count = 0;
3087
3088         /* now traverse adding any indexes for normal LDB records */
3089         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3090         if (ret < 0) {
3091                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3092                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3093                                        ldb_errstring(ldb));
3094                 return LDB_ERR_OPERATIONS_ERROR;
3095         }
3096
3097         if (ctx.error != LDB_SUCCESS) {
3098                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3099                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3100                 return ctx.error;
3101         }
3102
3103         if (ctx.count > 10000) {
3104                 ldb_debug(ldb_module_get_ctx(module),
3105                           LDB_DEBUG_WARNING,
3106                           "Reindexing: re_index successful on %s, "
3107                           "final index write-out will be in transaction commit",
3108                           ldb_kv->kv_ops->name(ldb_kv));
3109         }
3110         return LDB_SUCCESS;
3111 }