lib ldb: rename ltdb_private to ldb_kv_private
[ambi/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
174                                       const struct ldb_message *msg,
175                                       int add);
176 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
177                                    struct ldb_kv_private *ldb_kv,
178                                    struct ldb_dn *base_dn,
179                                    struct dn_list *dn_list,
180                                    enum key_truncation *truncation);
181
182 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
183                                 struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
193 {
194         if (ldb_kv->max_key_length == 0){
195                 return UINT_MAX;
196         }
197         return ldb_kv->max_key_length;
198 }
199
200 /* enable the idxptr mode when transactions start */
201 int ldb_kv_index_transaction_start(struct ldb_module *module)
202 {
203         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
204         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ltdb_idxptr);
205         if (ldb_kv->idxptr == NULL) {
206                 return ldb_oom(ldb_module_get_ctx(module));
207         }
208
209         return LDB_SUCCESS;
210 }
211
212 /*
213   see if two ldb_val structures contain exactly the same data
214   return -1 or 1 for a mismatch, 0 for match
215 */
216 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
217                                          const struct ldb_val *v2)
218 {
219         if (v1->length > v2->length) {
220                 return -1;
221         }
222         if (v1->length < v2->length) {
223                 return 1;
224         }
225         return memcmp(v1->data, v2->data, v1->length);
226 }
227
228 /*
229   see if two ldb_val structures contain exactly the same data
230   return -1 or 1 for a mismatch, 0 for match
231 */
232 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
233                                        const struct ldb_val *v2)
234 {
235         if (v1.length > v2->length) {
236                 return -1;
237         }
238         if (v1.length < v2->length) {
239                 return 1;
240         }
241         return memcmp(v1.data, v2->data, v1.length);
242 }
243
244
245 /*
246   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
247   binary-safe comparison for the 'dn' returns -1 if not found
248
249   This is therefore safe when the value is a GUID in the future
250  */
251 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
252                                    const struct dn_list *list,
253                                    const struct ldb_val *v)
254 {
255         unsigned int i;
256         struct ldb_val *exact = NULL, *next = NULL;
257
258         if (ldb_kv->cache->GUID_index_attribute == NULL) {
259                 for (i=0; i<list->count; i++) {
260                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
261                                 return i;
262                         }
263                 }
264                 return -1;
265         }
266
267         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
268                                 *v, ldb_val_equal_exact_ordered,
269                                 exact, next);
270         if (exact == NULL) {
271                 return -1;
272         }
273         /* Not required, but keeps the compiler quiet */
274         if (next != NULL) {
275                 return -1;
276         }
277
278         i = exact - list->dn;
279         return i;
280 }
281
282 /*
283   find a entry in a dn_list. Uses a case sensitive comparison with the dn
284   returns -1 if not found
285  */
286 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
287                                    struct dn_list *list,
288                                    const struct ldb_message *msg)
289 {
290         struct ldb_val v;
291         const struct ldb_val *key_val;
292         if (ldb_kv->cache->GUID_index_attribute == NULL) {
293                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
294                 v.data = discard_const_p(unsigned char, dn_str);
295                 v.length = strlen(dn_str);
296         } else {
297                 key_val = ldb_msg_find_ldb_val(msg,
298                                                ldb_kv->cache->GUID_index_attribute);
299                 if (key_val == NULL) {
300                         return -1;
301                 }
302                 v = *key_val;
303         }
304         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
305 }
306
307 /*
308   this is effectively a cast function, but with lots of paranoia
309   checks and also copes with CPUs that are fussy about pointer
310   alignment
311  */
312 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
313                                            TDB_DATA rec,
314                                            bool check_parent)
315 {
316         struct dn_list *list;
317         if (rec.dsize != sizeof(void *)) {
318                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
319                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
320                 return NULL;
321         }
322         /* note that we can't just use a cast here, as rec.dptr may
323            not be aligned sufficiently for a pointer. A cast would cause
324            platforms like some ARM CPUs to crash */
325         memcpy(&list, rec.dptr, sizeof(void *));
326         list = talloc_get_type(list, struct dn_list);
327         if (list == NULL) {
328                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
329                                        "Bad type '%s' for idxptr",
330                                        talloc_get_name(list));
331                 return NULL;
332         }
333         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
334                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
335                                        "Bad parent '%s' for idxptr",
336                                        talloc_get_name(talloc_parent(list->dn)));
337                 return NULL;
338         }
339         return list;
340 }
341
342 /*
343   return the @IDX list in an index entry for a dn as a
344   struct dn_list
345  */
346 static int ldb_kv_dn_list_load(struct ldb_module *module,
347                                struct ldb_kv_private *ldb_kv,
348                                struct ldb_dn *dn,
349                                struct dn_list *list)
350 {
351         struct ldb_message *msg;
352         int ret, version;
353         struct ldb_message_element *el;
354         TDB_DATA rec;
355         struct dn_list *list2;
356         TDB_DATA key;
357
358         list->dn = NULL;
359         list->count = 0;
360
361         /* see if we have any in-memory index entries */
362         if (ldb_kv->idxptr == NULL ||
363             ldb_kv->idxptr->itdb == NULL) {
364                 goto normal_index;
365         }
366
367         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
368         key.dsize = strlen((char *)key.dptr);
369
370         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
371         if (rec.dptr == NULL) {
372                 goto normal_index;
373         }
374
375         /* we've found an in-memory index entry */
376         list2 = ldb_kv_index_idxptr(module, rec, true);
377         if (list2 == NULL) {
378                 free(rec.dptr);
379                 return LDB_ERR_OPERATIONS_ERROR;
380         }
381         free(rec.dptr);
382
383         *list = *list2;
384         return LDB_SUCCESS;
385
386 normal_index:
387         msg = ldb_msg_new(list);
388         if (msg == NULL) {
389                 return LDB_ERR_OPERATIONS_ERROR;
390         }
391
392         ret = ldb_kv_search_dn1(module,
393                                 dn,
394                                 msg,
395                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
396                                     LDB_UNPACK_DATA_FLAG_NO_DN);
397         if (ret != LDB_SUCCESS) {
398                 talloc_free(msg);
399                 return ret;
400         }
401
402         el = ldb_msg_find_element(msg, LTDB_IDX);
403         if (!el) {
404                 talloc_free(msg);
405                 return LDB_SUCCESS;
406         }
407
408         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
409
410         /*
411          * we avoid copying the strings by stealing the list.  We have
412          * to steal msg onto el->values (which looks odd) because we
413          * asked for the memory to be allocated on msg, not on each
414          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
415          */
416         if (ldb_kv->cache->GUID_index_attribute == NULL) {
417                 /* check indexing version number */
418                 if (version != LTDB_INDEXING_VERSION) {
419                         ldb_debug_set(ldb_module_get_ctx(module),
420                                       LDB_DEBUG_ERROR,
421                                       "Wrong DN index version %d "
422                                       "expected %d for %s",
423                                       version, LTDB_INDEXING_VERSION,
424                                       ldb_dn_get_linearized(dn));
425                         talloc_free(msg);
426                         return LDB_ERR_OPERATIONS_ERROR;
427                 }
428
429                 talloc_steal(el->values, msg);
430                 list->dn = talloc_steal(list, el->values);
431                 list->count = el->num_values;
432         } else {
433                 unsigned int i;
434                 if (version != LTDB_GUID_INDEXING_VERSION) {
435                         /* This is quite likely during the DB startup
436                            on first upgrade to using a GUID index */
437                         ldb_debug_set(ldb_module_get_ctx(module),
438                                       LDB_DEBUG_ERROR,
439                                       "Wrong GUID index version %d "
440                                       "expected %d for %s",
441                                       version, LTDB_GUID_INDEXING_VERSION,
442                                       ldb_dn_get_linearized(dn));
443                         talloc_free(msg);
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 if (el->num_values == 0) {
448                         talloc_free(msg);
449                         return LDB_ERR_OPERATIONS_ERROR;
450                 }
451
452                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
453                         talloc_free(msg);
454                         return LDB_ERR_OPERATIONS_ERROR;
455                 }
456
457                 list->count = el->values[0].length / LTDB_GUID_SIZE;
458                 list->dn = talloc_array(list, struct ldb_val, list->count);
459                 if (list->dn == NULL) {
460                         talloc_free(msg);
461                         return LDB_ERR_OPERATIONS_ERROR;
462                 }
463
464                 /*
465                  * The actual data is on msg, due to
466                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
467                  */
468                 talloc_steal(list->dn, msg);
469                 for (i = 0; i < list->count; i++) {
470                         list->dn[i].data
471                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
472                         list->dn[i].length = LTDB_GUID_SIZE;
473                 }
474         }
475
476         /* We don't need msg->elements any more */
477         talloc_free(msg->elements);
478         return LDB_SUCCESS;
479 }
480
481 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
482                            struct ldb_kv_private *ldb_kv,
483                            TALLOC_CTX *mem_ctx,
484                            struct ldb_dn *dn,
485                            TDB_DATA *tdb_key)
486 {
487         struct ldb_context *ldb = ldb_module_get_ctx(module);
488         int ret;
489         int index = 0;
490         enum key_truncation truncation = KEY_NOT_TRUNCATED;
491         struct dn_list *list = talloc(mem_ctx, struct dn_list);
492         if (list == NULL) {
493                 ldb_oom(ldb);
494                 return LDB_ERR_OPERATIONS_ERROR;
495         }
496
497         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
498         if (ret != LDB_SUCCESS) {
499                 TALLOC_FREE(list);
500                 return ret;
501         }
502
503         if (list->count == 0) {
504                 TALLOC_FREE(list);
505                 return LDB_ERR_NO_SUCH_OBJECT;
506         }
507
508         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
509                 const char *dn_str = ldb_dn_get_linearized(dn);
510                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
511                                        __location__
512                                        ": Failed to read DN index "
513                                        "against %s for %s: too many "
514                                        "values (%u > 1)",
515                                        ldb_kv->cache->GUID_index_attribute,
516                                        dn_str, list->count);
517                 TALLOC_FREE(list);
518                 return LDB_ERR_CONSTRAINT_VIOLATION;
519         }
520
521         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
522                 /*
523                  * DN key has been truncated, need to inspect the actual
524                  * records to locate the actual DN
525                  */
526                 int i;
527                 index = -1;
528                 for (i=0; i < list->count; i++) {
529                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
530                         TDB_DATA key = {
531                                 .dptr = guid_key,
532                                 .dsize = sizeof(guid_key)
533                         };
534                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
535                         struct ldb_message *rec = ldb_msg_new(ldb);
536                         if (rec == NULL) {
537                                 TALLOC_FREE(list);
538                                 return LDB_ERR_OPERATIONS_ERROR;
539                         }
540
541                         ret = ldb_kv_idx_to_key(
542                             module, ldb_kv, ldb, &list->dn[i], &key);
543                         if (ret != LDB_SUCCESS) {
544                                 TALLOC_FREE(list);
545                                 TALLOC_FREE(rec);
546                                 return ret;
547                         }
548
549                         ret = ldb_kv_search_key(module, ldb_kv, key, rec, flags);
550                         if (key.dptr != guid_key) {
551                                 TALLOC_FREE(key.dptr);
552                         }
553                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
554                                 /*
555                                  * the record has disappeared?
556                                  * yes, this can happen
557                                  */
558                                 TALLOC_FREE(rec);
559                                 continue;
560                         }
561
562                         if (ret != LDB_SUCCESS) {
563                                 /* an internal error */
564                                 TALLOC_FREE(rec);
565                                 TALLOC_FREE(list);
566                                 return LDB_ERR_OPERATIONS_ERROR;
567                         }
568
569                         /*
570                          * We found the actual DN that we wanted from in the
571                          * multiple values that matched the index
572                          * (due to truncation), so return that.
573                          *
574                          */
575                         if (ldb_dn_compare(dn, rec->dn) == 0) {
576                                 index = i;
577                                 TALLOC_FREE(rec);
578                                 break;
579                         }
580                 }
581
582                 /*
583                  * We matched the index but the actual DN we wanted
584                  * was not here.
585                  */
586                 if (index == -1) {
587                         TALLOC_FREE(list);
588                         return LDB_ERR_NO_SUCH_OBJECT;
589                 }
590         }
591
592         /* The tdb_key memory is allocated by the caller */
593         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], tdb_key);
594         TALLOC_FREE(list);
595
596         if (ret != LDB_SUCCESS) {
597                 return LDB_ERR_OPERATIONS_ERROR;
598         }
599
600         return LDB_SUCCESS;
601 }
602
603
604
605 /*
606   save a dn_list into a full @IDX style record
607  */
608 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
609                                      struct ldb_kv_private *ldb_kv,
610                                      struct ldb_dn *dn,
611                                      struct dn_list *list)
612 {
613         struct ldb_message *msg;
614         int ret;
615
616         msg = ldb_msg_new(module);
617         if (!msg) {
618                 return ldb_module_oom(module);
619         }
620
621         msg->dn = dn;
622
623         if (list->count == 0) {
624                 ret = ldb_kv_delete_noindex(module, msg);
625                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
626                         ret = LDB_SUCCESS;
627                 }
628                 talloc_free(msg);
629                 return ret;
630         }
631
632         if (ldb_kv->cache->GUID_index_attribute == NULL) {
633                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
634                                       LTDB_INDEXING_VERSION);
635                 if (ret != LDB_SUCCESS) {
636                         talloc_free(msg);
637                         return ldb_module_oom(module);
638                 }
639         } else {
640                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
641                                       LTDB_GUID_INDEXING_VERSION);
642                 if (ret != LDB_SUCCESS) {
643                         talloc_free(msg);
644                         return ldb_module_oom(module);
645                 }
646         }
647
648         if (list->count > 0) {
649                 struct ldb_message_element *el;
650
651                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
652                 if (ret != LDB_SUCCESS) {
653                         talloc_free(msg);
654                         return ldb_module_oom(module);
655                 }
656
657                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
658                         el->values = list->dn;
659                         el->num_values = list->count;
660                 } else {
661                         struct ldb_val v;
662                         unsigned int i;
663                         el->values = talloc_array(msg,
664                                                   struct ldb_val, 1);
665                         if (el->values == NULL) {
666                                 talloc_free(msg);
667                                 return ldb_module_oom(module);
668                         }
669
670                         v.data = talloc_array_size(el->values,
671                                                    list->count,
672                                                    LTDB_GUID_SIZE);
673                         if (v.data == NULL) {
674                                 talloc_free(msg);
675                                 return ldb_module_oom(module);
676                         }
677
678                         v.length = talloc_get_size(v.data);
679
680                         for (i = 0; i < list->count; i++) {
681                                 if (list->dn[i].length !=
682                                     LTDB_GUID_SIZE) {
683                                         talloc_free(msg);
684                                         return ldb_module_operr(module);
685                                 }
686                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
687                                        list->dn[i].data,
688                                        LTDB_GUID_SIZE);
689                         }
690                         el->values[0] = v;
691                         el->num_values = 1;
692                 }
693         }
694
695         ret = ldb_kv_store(module, msg, TDB_REPLACE);
696         talloc_free(msg);
697         return ret;
698 }
699
700 /*
701   save a dn_list into the database, in either @IDX or internal format
702  */
703 static int ldb_kv_dn_list_store(struct ldb_module *module,
704                                 struct ldb_dn *dn,
705                                 struct dn_list *list)
706 {
707         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
708         TDB_DATA rec, key;
709         int ret;
710         struct dn_list *list2;
711
712         if (ldb_kv->idxptr == NULL) {
713                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
714         }
715
716         if (ldb_kv->idxptr->itdb == NULL) {
717                 ldb_kv->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
718                 if (ldb_kv->idxptr->itdb == NULL) {
719                         return LDB_ERR_OPERATIONS_ERROR;
720                 }
721         }
722
723         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
724         if (key.dptr == NULL) {
725                 return LDB_ERR_OPERATIONS_ERROR;
726         }
727         key.dsize = strlen((char *)key.dptr);
728
729         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
730         if (rec.dptr != NULL) {
731                 list2 = ldb_kv_index_idxptr(module, rec, false);
732                 if (list2 == NULL) {
733                         free(rec.dptr);
734                         return LDB_ERR_OPERATIONS_ERROR;
735                 }
736                 free(rec.dptr);
737                 list2->dn = talloc_steal(list2, list->dn);
738                 list2->count = list->count;
739                 return LDB_SUCCESS;
740         }
741
742         list2 = talloc(ldb_kv->idxptr, struct dn_list);
743         if (list2 == NULL) {
744                 return LDB_ERR_OPERATIONS_ERROR;
745         }
746         list2->dn = talloc_steal(list2, list->dn);
747         list2->count = list->count;
748
749         rec.dptr = (uint8_t *)&list2;
750         rec.dsize = sizeof(void *);
751
752
753         /*
754          * This is not a store into the main DB, but into an in-memory
755          * TDB, so we don't need a guard on ltdb->read_only
756          */
757         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
758         if (ret != 0) {
759                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
760         }
761         return LDB_SUCCESS;
762 }
763
764 /*
765   traverse function for storing the in-memory index entries on disk
766  */
767 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
768                                        TDB_DATA key,
769                                        TDB_DATA data,
770                                        void *state)
771 {
772         struct ldb_module *module = state;
773         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
774         struct ldb_dn *dn;
775         struct ldb_context *ldb = ldb_module_get_ctx(module);
776         struct ldb_val v;
777         struct dn_list *list;
778
779         list = ldb_kv_index_idxptr(module, data, true);
780         if (list == NULL) {
781                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
782                 return -1;
783         }
784
785         v.data = key.dptr;
786         v.length = strnlen((char *)key.dptr, key.dsize);
787
788         dn = ldb_dn_from_ldb_val(module, ldb, &v);
789         if (dn == NULL) {
790                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
791                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
792                 return -1;
793         }
794
795         ldb_kv->idxptr->error = ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
796         talloc_free(dn);
797         if (ldb_kv->idxptr->error != 0) {
798                 return -1;
799         }
800         return 0;
801 }
802
803 /* cleanup the idxptr mode when transaction commits */
804 int ldb_kv_index_transaction_commit(struct ldb_module *module)
805 {
806         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
807         int ret;
808
809         struct ldb_context *ldb = ldb_module_get_ctx(module);
810
811         ldb_reset_err_string(ldb);
812
813         if (ldb_kv->idxptr->itdb) {
814                 tdb_traverse(
815                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
816                 tdb_close(ldb_kv->idxptr->itdb);
817         }
818
819         ret = ldb_kv->idxptr->error;
820         if (ret != LDB_SUCCESS) {
821                 if (!ldb_errstring(ldb)) {
822                         ldb_set_errstring(ldb, ldb_strerror(ret));
823                 }
824                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
825         }
826
827         talloc_free(ldb_kv->idxptr);
828         ldb_kv->idxptr = NULL;
829         return ret;
830 }
831
832 /* cleanup the idxptr mode when transaction cancels */
833 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
834 {
835         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
836         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
837                 tdb_close(ldb_kv->idxptr->itdb);
838         }
839         talloc_free(ldb_kv->idxptr);
840         ldb_kv->idxptr = NULL;
841         return LDB_SUCCESS;
842 }
843
844
845 /*
846   return the dn key to be used for an index
847   the caller is responsible for freeing
848 */
849 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
850                                        struct ldb_kv_private *ldb_kv,
851                                        const char *attr,
852                                        const struct ldb_val *value,
853                                        const struct ldb_schema_attribute **ap,
854                                        enum key_truncation *truncation)
855 {
856         struct ldb_dn *ret;
857         struct ldb_val v;
858         const struct ldb_schema_attribute *a = NULL;
859         char *attr_folded = NULL;
860         const char *attr_for_dn = NULL;
861         int r;
862         bool should_b64_encode;
863
864         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
865         size_t key_len = 0;
866         size_t attr_len = 0;
867         const size_t indx_len = sizeof(LTDB_INDEX) - 1;
868         unsigned frmt_len = 0;
869         const size_t additional_key_length = 4;
870         unsigned int num_separators = 3; /* Estimate for overflow check */
871         const size_t min_data = 1;
872         const size_t min_key_length = additional_key_length
873                 + indx_len + num_separators + min_data;
874
875         if (attr[0] == '@') {
876                 attr_for_dn = attr;
877                 v = *value;
878                 if (ap != NULL) {
879                         *ap = NULL;
880                 }
881         } else {
882                 attr_folded = ldb_attr_casefold(ldb, attr);
883                 if (!attr_folded) {
884                         return NULL;
885                 }
886
887                 attr_for_dn = attr_folded;
888
889                 a = ldb_schema_attribute_by_name(ldb, attr);
890                 if (ap) {
891                         *ap = a;
892                 }
893                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
894                 if (r != LDB_SUCCESS) {
895                         const char *errstr = ldb_errstring(ldb);
896                         /* canonicalisation can be refused. For
897                            example, a attribute that takes wildcards
898                            will refuse to canonicalise if the value
899                            contains a wildcard */
900                         ldb_asprintf_errstring(ldb,
901                                                "Failed to create index "
902                                                "key for attribute '%s':%s%s%s",
903                                                attr, ldb_strerror(r),
904                                                (errstr?":":""),
905                                                (errstr?errstr:""));
906                         talloc_free(attr_folded);
907                         return NULL;
908                 }
909         }
910         attr_len = strlen(attr_for_dn);
911
912         /*
913          * Check if there is any hope this will fit into the DB.
914          * Overflow here is not actually critical the code below
915          * checks again to make the printf and the DB does another
916          * check for too long keys
917          */
918         if (max_key_length - attr_len < min_key_length) {
919                 ldb_asprintf_errstring(
920                         ldb,
921                         __location__ ": max_key_length "
922                         "is too small (%u) < (%u)",
923                         max_key_length,
924                         (unsigned)(min_key_length + attr_len));
925                 talloc_free(attr_folded);
926                 return NULL;
927         }
928
929         /*
930          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
931          * "DN=" and a trailing string terminator
932          */
933         max_key_length -= additional_key_length;
934
935         /*
936          * We do not base 64 encode a DN in a key, it has already been
937          * casefold and lineraized, that is good enough.  That already
938          * avoids embedded NUL etc.
939          */
940         if (ldb_kv->cache->GUID_index_attribute != NULL) {
941                 if (strcmp(attr, LTDB_IDXDN) == 0) {
942                         should_b64_encode = false;
943                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
944                         /*
945                          * We can only change the behaviour for IDXONE
946                          * when the GUID index is enabled
947                          */
948                         should_b64_encode = false;
949                 } else {
950                         should_b64_encode
951                                 = ldb_should_b64_encode(ldb, &v);
952                 }
953         } else {
954                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
955         }
956
957         if (should_b64_encode) {
958                 size_t vstr_len = 0;
959                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
960                 if (!vstr) {
961                         talloc_free(attr_folded);
962                         return NULL;
963                 }
964                 vstr_len = strlen(vstr);
965                 /* 
966                  * Overflow here is not critical as we only use this
967                  * to choose the printf truncation
968                  */
969                 key_len = num_separators + indx_len + attr_len + vstr_len;
970                 if (key_len > max_key_length) {
971                         size_t excess = key_len - max_key_length;
972                         frmt_len = vstr_len - excess;
973                         *truncation = KEY_TRUNCATED;
974                         /*
975                         * Truncated keys are placed in a separate key space
976                         * from the non truncated keys
977                         * Note: the double hash "##" is not a typo and
978                         * indicates that the following value is base64 encoded
979                         */
980                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
981                                              LTDB_INDEX, attr_for_dn,
982                                              frmt_len, vstr);
983                 } else {
984                         frmt_len = vstr_len;
985                         *truncation = KEY_NOT_TRUNCATED;
986                         /*
987                          * Note: the double colon "::" is not a typo and
988                          * indicates that the following value is base64 encoded
989                          */
990                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
991                                              LTDB_INDEX, attr_for_dn,
992                                              frmt_len, vstr);
993                 }
994                 talloc_free(vstr);
995         } else {
996                 /* Only need two seperators */
997                 num_separators = 2;
998
999                 /*
1000                  * Overflow here is not critical as we only use this
1001                  * to choose the printf truncation
1002                  */
1003                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1004                 if (key_len > max_key_length) {
1005                         size_t excess = key_len - max_key_length;
1006                         frmt_len = v.length - excess;
1007                         *truncation = KEY_TRUNCATED;
1008                         /*
1009                          * Truncated keys are placed in a separate key space
1010                          * from the non truncated keys
1011                          */
1012                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1013                                              LTDB_INDEX, attr_for_dn,
1014                                              frmt_len, (char *)v.data);
1015                 } else {
1016                         frmt_len = v.length;
1017                         *truncation = KEY_NOT_TRUNCATED;
1018                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1019                                              LTDB_INDEX, attr_for_dn,
1020                                              frmt_len, (char *)v.data);
1021                 }
1022         }
1023
1024         if (v.data != value->data) {
1025                 talloc_free(v.data);
1026         }
1027         talloc_free(attr_folded);
1028
1029         return ret;
1030 }
1031
1032 /*
1033   see if a attribute value is in the list of indexed attributes
1034 */
1035 static bool ldb_kv_is_indexed(struct ldb_module *module,
1036                               struct ldb_kv_private *ldb_kv,
1037                               const char *attr)
1038 {
1039         struct ldb_context *ldb = ldb_module_get_ctx(module);
1040         unsigned int i;
1041         struct ldb_message_element *el;
1042
1043         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1044             (ldb_attr_cmp(attr,
1045                           ldb_kv->cache->GUID_index_attribute) == 0)) {
1046                 /* Implicity covered, this is the index key */
1047                 return false;
1048         }
1049         if (ldb->schema.index_handler_override) {
1050                 const struct ldb_schema_attribute *a
1051                         = ldb_schema_attribute_by_name(ldb, attr);
1052
1053                 if (a == NULL) {
1054                         return false;
1055                 }
1056
1057                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1058                         return true;
1059                 } else {
1060                         return false;
1061                 }
1062         }
1063
1064         if (!ldb_kv->cache->attribute_indexes) {
1065                 return false;
1066         }
1067
1068         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LTDB_IDXATTR);
1069         if (el == NULL) {
1070                 return false;
1071         }
1072
1073         /* TODO: this is too expensive! At least use a binary search */
1074         for (i=0; i<el->num_values; i++) {
1075                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1076                         return true;
1077                 }
1078         }
1079         return false;
1080 }
1081
1082 /*
1083   in the following logic functions, the return value is treated as
1084   follows:
1085
1086      LDB_SUCCESS: we found some matching index values
1087
1088      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1089
1090      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1091                                we'll need a full search
1092  */
1093
1094 /*
1095   return a list of dn's that might match a simple indexed search (an
1096   equality search only)
1097  */
1098 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1099                                   struct ldb_kv_private *ldb_kv,
1100                                   const struct ldb_parse_tree *tree,
1101                                   struct dn_list *list)
1102 {
1103         struct ldb_context *ldb;
1104         struct ldb_dn *dn;
1105         int ret;
1106         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1107
1108         ldb = ldb_module_get_ctx(module);
1109
1110         list->count = 0;
1111         list->dn = NULL;
1112
1113         /* if the attribute isn't in the list of indexed attributes then
1114            this node needs a full search */
1115         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1116                 return LDB_ERR_OPERATIONS_ERROR;
1117         }
1118
1119         /* the attribute is indexed. Pull the list of DNs that match the
1120            search criterion */
1121         dn = ldb_kv_index_key(ldb,
1122                               ldb_kv,
1123                               tree->u.equality.attr,
1124                               &tree->u.equality.value,
1125                               NULL,
1126                               &truncation);
1127         /*
1128          * We ignore truncation here and allow multi-valued matches
1129          * as ltdb_search_indexed will filter out the wrong one in
1130          * ltdb_index_filter() which calls ldb_match_message().
1131          */
1132         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1133
1134         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1135         talloc_free(dn);
1136         return ret;
1137 }
1138
1139
1140 static bool list_union(struct ldb_context *ldb,
1141                        struct ldb_kv_private *ldb_kv,
1142                        struct dn_list *list, struct dn_list *list2);
1143
1144 /*
1145   return a list of dn's that might match a leaf indexed search
1146  */
1147 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1148                                 struct ldb_kv_private *ldb_kv,
1149                                 const struct ldb_parse_tree *tree,
1150                                 struct dn_list *list)
1151 {
1152         if (ldb_kv->disallow_dn_filter &&
1153             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1154                 /* in AD mode we do not support "(dn=...)" search filters */
1155                 list->dn = NULL;
1156                 list->count = 0;
1157                 return LDB_SUCCESS;
1158         }
1159         if (tree->u.equality.attr[0] == '@') {
1160                 /* Do not allow a indexed search against an @ */
1161                 list->dn = NULL;
1162                 list->count = 0;
1163                 return LDB_SUCCESS;
1164         }
1165         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1166                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1167                 struct ldb_dn *dn
1168                         = ldb_dn_from_ldb_val(list,
1169                                               ldb_module_get_ctx(module),
1170                                               &tree->u.equality.value);
1171                 if (dn == NULL) {
1172                         /* If we can't parse it, no match */
1173                         list->dn = NULL;
1174                         list->count = 0;
1175                         return LDB_SUCCESS;
1176                 }
1177
1178                 /*
1179                  * Re-use the same code we use for a SCOPE_BASE
1180                  * search
1181                  *
1182                  * We can't call TALLOC_FREE(dn) as this must belong
1183                  * to list for the memory to remain valid.
1184                  */
1185                 return ldb_kv_index_dn_base_dn(
1186                     module, ldb_kv, dn, list, &truncation);
1187                 /*
1188                  * We ignore truncation here and allow multi-valued matches
1189                  * as ltdb_search_indexed will filter out the wrong one in
1190                  * ltdb_index_filter() which calls ldb_match_message().
1191                  */
1192
1193         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1194                    (ldb_attr_cmp(tree->u.equality.attr,
1195                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1196                 int ret;
1197                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1198                 list->dn = talloc_array(list, struct ldb_val, 1);
1199                 if (list->dn == NULL) {
1200                         ldb_module_oom(module);
1201                         return LDB_ERR_OPERATIONS_ERROR;
1202                 }
1203                 /*
1204                  * We need to go via the canonicalise_fn() to
1205                  * ensure we get the index in binary, rather
1206                  * than a string
1207                  */
1208                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(ldb,
1209                                                                list->dn,
1210                                                                &tree->u.equality.value,
1211                                                                &list->dn[0]);
1212                 if (ret != LDB_SUCCESS) {
1213                         return LDB_ERR_OPERATIONS_ERROR;
1214                 }
1215                 list->count = 1;
1216                 return LDB_SUCCESS;
1217         }
1218
1219         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1220 }
1221
1222
1223 /*
1224   list intersection
1225   list = list & list2
1226 */
1227 static bool list_intersect(struct ldb_context *ldb,
1228                            struct ldb_kv_private *ldb_kv,
1229                            struct dn_list *list, const struct dn_list *list2)
1230 {
1231         const struct dn_list *short_list, *long_list;
1232         struct dn_list *list3;
1233         unsigned int i;
1234
1235         if (list->count == 0) {
1236                 /* 0 & X == 0 */
1237                 return true;
1238         }
1239         if (list2->count == 0) {
1240                 /* X & 0 == 0 */
1241                 list->count = 0;
1242                 list->dn = NULL;
1243                 return true;
1244         }
1245
1246         /* the indexing code is allowed to return a longer list than
1247            what really matches, as all results are filtered by the
1248            full expression at the end - this shortcut avoids a lot of
1249            work in some cases */
1250         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1251                 return true;
1252         }
1253         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1254                 list->count = list2->count;
1255                 list->dn = list2->dn;
1256                 /* note that list2 may not be the parent of list2->dn,
1257                    as list2->dn may be owned by ltdb->idxptr. In that
1258                    case we expect this reparent call to fail, which is
1259                    OK */
1260                 talloc_reparent(list2, list, list2->dn);
1261                 return true;
1262         }
1263
1264         if (list->count > list2->count) {
1265                 short_list = list2;
1266                 long_list = list;
1267         } else {
1268                 short_list = list;
1269                 long_list = list2;
1270         }
1271
1272         list3 = talloc_zero(list, struct dn_list);
1273         if (list3 == NULL) {
1274                 return false;
1275         }
1276
1277         list3->dn = talloc_array(list3, struct ldb_val,
1278                                  MIN(list->count, list2->count));
1279         if (!list3->dn) {
1280                 talloc_free(list3);
1281                 return false;
1282         }
1283         list3->count = 0;
1284
1285         for (i=0;i<short_list->count;i++) {
1286                 /* For the GUID index case, this is a binary search */
1287                 if (ldb_kv_dn_list_find_val(
1288                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1289                         list3->dn[list3->count] = short_list->dn[i];
1290                         list3->count++;
1291                 }
1292         }
1293
1294         list->strict |= list2->strict;
1295         list->dn = talloc_steal(list, list3->dn);
1296         list->count = list3->count;
1297         talloc_free(list3);
1298
1299         return true;
1300 }
1301
1302
1303 /*
1304   list union
1305   list = list | list2
1306 */
1307 static bool list_union(struct ldb_context *ldb,
1308                        struct ldb_kv_private *ldb_kv,
1309                        struct dn_list *list, struct dn_list *list2)
1310 {
1311         struct ldb_val *dn3;
1312         unsigned int i = 0, j = 0, k = 0;
1313
1314         if (list2->count == 0) {
1315                 /* X | 0 == X */
1316                 return true;
1317         }
1318
1319         if (list->count == 0) {
1320                 /* 0 | X == X */
1321                 list->count = list2->count;
1322                 list->dn = list2->dn;
1323                 /* note that list2 may not be the parent of list2->dn,
1324                    as list2->dn may be owned by ltdb->idxptr. In that
1325                    case we expect this reparent call to fail, which is
1326                    OK */
1327                 talloc_reparent(list2, list, list2->dn);
1328                 return true;
1329         }
1330
1331         /*
1332          * Sort the lists (if not in GUID DN mode) so we can do
1333          * the de-duplication during the merge
1334          *
1335          * NOTE: This can sort the in-memory index values, as list or
1336          * list2 might not be a copy!
1337          */
1338         ldb_kv_dn_list_sort(ldb_kv, list);
1339         ldb_kv_dn_list_sort(ldb_kv, list2);
1340
1341         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1342         if (!dn3) {
1343                 ldb_oom(ldb);
1344                 return false;
1345         }
1346
1347         while (i < list->count || j < list2->count) {
1348                 int cmp;
1349                 if (i >= list->count) {
1350                         cmp = 1;
1351                 } else if (j >= list2->count) {
1352                         cmp = -1;
1353                 } else {
1354                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1355                                                           &list2->dn[j]);
1356                 }
1357
1358                 if (cmp < 0) {
1359                         /* Take list */
1360                         dn3[k] = list->dn[i];
1361                         i++;
1362                         k++;
1363                 } else if (cmp > 0) {
1364                         /* Take list2 */
1365                         dn3[k] = list2->dn[j];
1366                         j++;
1367                         k++;
1368                 } else {
1369                         /* Equal, take list */
1370                         dn3[k] = list->dn[i];
1371                         i++;
1372                         j++;
1373                         k++;
1374                 }
1375         }
1376
1377         list->dn = dn3;
1378         list->count = k;
1379
1380         return true;
1381 }
1382
1383 static int ldb_kv_index_dn(struct ldb_module *module,
1384                            struct ldb_kv_private *ldb_kv,
1385                            const struct ldb_parse_tree *tree,
1386                            struct dn_list *list);
1387
1388 /*
1389   process an OR list (a union)
1390  */
1391 static int ldb_kv_index_dn_or(struct ldb_module *module,
1392                               struct ldb_kv_private *ldb_kv,
1393                               const struct ldb_parse_tree *tree,
1394                               struct dn_list *list)
1395 {
1396         struct ldb_context *ldb;
1397         unsigned int i;
1398
1399         ldb = ldb_module_get_ctx(module);
1400
1401         list->dn = NULL;
1402         list->count = 0;
1403
1404         for (i=0; i<tree->u.list.num_elements; i++) {
1405                 struct dn_list *list2;
1406                 int ret;
1407
1408                 list2 = talloc_zero(list, struct dn_list);
1409                 if (list2 == NULL) {
1410                         return LDB_ERR_OPERATIONS_ERROR;
1411                 }
1412
1413                 ret = ldb_kv_index_dn(
1414                     module, ldb_kv, tree->u.list.elements[i], list2);
1415
1416                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1417                         /* X || 0 == X */
1418                         talloc_free(list2);
1419                         continue;
1420                 }
1421
1422                 if (ret != LDB_SUCCESS) {
1423                         /* X || * == * */
1424                         talloc_free(list2);
1425                         return ret;
1426                 }
1427
1428                 if (!list_union(ldb, ldb_kv, list, list2)) {
1429                         talloc_free(list2);
1430                         return LDB_ERR_OPERATIONS_ERROR;
1431                 }
1432         }
1433
1434         if (list->count == 0) {
1435                 return LDB_ERR_NO_SUCH_OBJECT;
1436         }
1437
1438         return LDB_SUCCESS;
1439 }
1440
1441
1442 /*
1443   NOT an index results
1444  */
1445 static int ldb_kv_index_dn_not(struct ldb_module *module,
1446                                struct ldb_kv_private *ldb_kv,
1447                                const struct ldb_parse_tree *tree,
1448                                struct dn_list *list)
1449 {
1450         /* the only way to do an indexed not would be if we could
1451            negate the not via another not or if we knew the total
1452            number of database elements so we could know that the
1453            existing expression covered the whole database.
1454
1455            instead, we just give up, and rely on a full index scan
1456            (unless an outer & manages to reduce the list)
1457         */
1458         return LDB_ERR_OPERATIONS_ERROR;
1459 }
1460
1461 /*
1462  * These things are unique, so avoid a full scan if this is a search
1463  * by GUID, DN or a unique attribute
1464  */
1465 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1466                                 struct ldb_kv_private *ldb_kv,
1467                                 const char *attr)
1468 {
1469         const struct ldb_schema_attribute *a;
1470         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1471                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0) {
1472                         return true;
1473                 }
1474         }
1475         if (ldb_attr_dn(attr) == 0) {
1476                 return true;
1477         }
1478
1479         a = ldb_schema_attribute_by_name(ldb, attr);
1480         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1481                 return true;
1482         }
1483         return false;
1484 }
1485
1486 /*
1487   process an AND expression (intersection)
1488  */
1489 static int ldb_kv_index_dn_and(struct ldb_module *module,
1490                                struct ldb_kv_private *ldb_kv,
1491                                const struct ldb_parse_tree *tree,
1492                                struct dn_list *list)
1493 {
1494         struct ldb_context *ldb;
1495         unsigned int i;
1496         bool found;
1497
1498         ldb = ldb_module_get_ctx(module);
1499
1500         list->dn = NULL;
1501         list->count = 0;
1502
1503         /* in the first pass we only look for unique simple
1504            equality tests, in the hope of avoiding having to look
1505            at any others */
1506         for (i=0; i<tree->u.list.num_elements; i++) {
1507                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1508                 int ret;
1509
1510                 if (subtree->operation != LDB_OP_EQUALITY ||
1511                     !ldb_kv_index_unique(ldb, ldb_kv, subtree->u.equality.attr)) {
1512                         continue;
1513                 }
1514
1515                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1516                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1517                         /* 0 && X == 0 */
1518                         return LDB_ERR_NO_SUCH_OBJECT;
1519                 }
1520                 if (ret == LDB_SUCCESS) {
1521                         /* a unique index match means we can
1522                          * stop. Note that we don't care if we return
1523                          * a few too many objects, due to later
1524                          * filtering */
1525                         return LDB_SUCCESS;
1526                 }
1527         }
1528
1529         /* now do a full intersection */
1530         found = false;
1531
1532         for (i=0; i<tree->u.list.num_elements; i++) {
1533                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1534                 struct dn_list *list2;
1535                 int ret;
1536
1537                 list2 = talloc_zero(list, struct dn_list);
1538                 if (list2 == NULL) {
1539                         return ldb_module_oom(module);
1540                 }
1541
1542                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1543
1544                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1545                         /* X && 0 == 0 */
1546                         list->dn = NULL;
1547                         list->count = 0;
1548                         talloc_free(list2);
1549                         return LDB_ERR_NO_SUCH_OBJECT;
1550                 }
1551
1552                 if (ret != LDB_SUCCESS) {
1553                         /* this didn't adding anything */
1554                         talloc_free(list2);
1555                         continue;
1556                 }
1557
1558                 if (!found) {
1559                         talloc_reparent(list2, list, list->dn);
1560                         list->dn = list2->dn;
1561                         list->count = list2->count;
1562                         found = true;
1563                 } else if (!list_intersect(ldb, ldb_kv,
1564                                            list, list2)) {
1565                         talloc_free(list2);
1566                         return LDB_ERR_OPERATIONS_ERROR;
1567                 }
1568
1569                 if (list->count == 0) {
1570                         list->dn = NULL;
1571                         return LDB_ERR_NO_SUCH_OBJECT;
1572                 }
1573
1574                 if (list->count < 2) {
1575                         /* it isn't worth loading the next part of the tree */
1576                         return LDB_SUCCESS;
1577                 }
1578         }
1579
1580         if (!found) {
1581                 /* none of the attributes were indexed */
1582                 return LDB_ERR_OPERATIONS_ERROR;
1583         }
1584
1585         return LDB_SUCCESS;
1586 }
1587
1588 /*
1589   return a list of matching objects using a one-level index
1590  */
1591 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1592                                 struct ldb_kv_private *ldb_kv,
1593                                 const char *attr,
1594                                 struct ldb_dn *dn,
1595                                 struct dn_list *list,
1596                                 enum key_truncation *truncation)
1597 {
1598         struct ldb_context *ldb;
1599         struct ldb_dn *key;
1600         struct ldb_val val;
1601         int ret;
1602
1603         ldb = ldb_module_get_ctx(module);
1604
1605         /* work out the index key from the parent DN */
1606         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1607         val.length = strlen((char *)val.data);
1608         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1609         if (!key) {
1610                 ldb_oom(ldb);
1611                 return LDB_ERR_OPERATIONS_ERROR;
1612         }
1613
1614         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1615         talloc_free(key);
1616         if (ret != LDB_SUCCESS) {
1617                 return ret;
1618         }
1619
1620         if (list->count == 0) {
1621                 return LDB_ERR_NO_SUCH_OBJECT;
1622         }
1623
1624         return LDB_SUCCESS;
1625 }
1626
1627 /*
1628   return a list of matching objects using a one-level index
1629  */
1630 static int ldb_kv_index_dn_one(struct ldb_module *module,
1631                                struct ldb_kv_private *ldb_kv,
1632                                struct ldb_dn *parent_dn,
1633                                struct dn_list *list,
1634                                enum key_truncation *truncation)
1635 {
1636         /* Ensure we do not shortcut on intersection for this list */
1637         list->strict = true;
1638         return ldb_kv_index_dn_attr(
1639             module, ldb_kv, LTDB_IDXONE, parent_dn, list, truncation);
1640 }
1641
1642 /*
1643   return a list of matching objects using the DN index
1644  */
1645 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1646                                    struct ldb_kv_private *ldb_kv,
1647                                    struct ldb_dn *base_dn,
1648                                    struct dn_list *dn_list,
1649                                    enum key_truncation *truncation)
1650 {
1651         const struct ldb_val *guid_val = NULL;
1652         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1653                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1654                 if (dn_list->dn == NULL) {
1655                         return ldb_module_oom(module);
1656                 }
1657                 dn_list->dn[0].data = discard_const_p(unsigned char,
1658                                                       ldb_dn_get_linearized(base_dn));
1659                 if (dn_list->dn[0].data == NULL) {
1660                         return ldb_module_oom(module);
1661                 }
1662                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1663                 dn_list->count = 1;
1664
1665                 return LDB_SUCCESS;
1666         }
1667
1668         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1669                 guid_val = ldb_dn_get_extended_component(base_dn,
1670                                                          ldb_kv->cache->GUID_index_dn_component);
1671         }
1672
1673         if (guid_val != NULL) {
1674                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1675                 if (dn_list->dn == NULL) {
1676                         return ldb_module_oom(module);
1677                 }
1678                 dn_list->dn[0].data = guid_val->data;
1679                 dn_list->dn[0].length = guid_val->length;
1680                 dn_list->count = 1;
1681
1682                 return LDB_SUCCESS;
1683         }
1684
1685         return ldb_kv_index_dn_attr(
1686             module, ldb_kv, LTDB_IDXDN, base_dn, dn_list, truncation);
1687 }
1688
1689 /*
1690   return a list of dn's that might match a indexed search or
1691   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1692  */
1693 static int ldb_kv_index_dn(struct ldb_module *module,
1694                            struct ldb_kv_private *ldb_kv,
1695                            const struct ldb_parse_tree *tree,
1696                            struct dn_list *list)
1697 {
1698         int ret = LDB_ERR_OPERATIONS_ERROR;
1699
1700         switch (tree->operation) {
1701         case LDB_OP_AND:
1702                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1703                 break;
1704
1705         case LDB_OP_OR:
1706                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1707                 break;
1708
1709         case LDB_OP_NOT:
1710                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1711                 break;
1712
1713         case LDB_OP_EQUALITY:
1714                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1715                 break;
1716
1717         case LDB_OP_SUBSTRING:
1718         case LDB_OP_GREATER:
1719         case LDB_OP_LESS:
1720         case LDB_OP_PRESENT:
1721         case LDB_OP_APPROX:
1722         case LDB_OP_EXTENDED:
1723                 /* we can't index with fancy bitops yet */
1724                 ret = LDB_ERR_OPERATIONS_ERROR;
1725                 break;
1726         }
1727
1728         return ret;
1729 }
1730
1731 /*
1732   filter a candidate dn_list from an indexed search into a set of results
1733   extracting just the given attributes
1734 */
1735 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1736                                const struct dn_list *dn_list,
1737                                struct ldb_kv_context *ac,
1738                                uint32_t *match_count,
1739                                enum key_truncation scope_one_truncation)
1740 {
1741         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1742         struct ldb_message *msg;
1743         struct ldb_message *filtered_msg;
1744         unsigned int i;
1745         unsigned int num_keys = 0;
1746         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1747         TDB_DATA *keys = NULL;
1748
1749         /*
1750          * We have to allocate the key list (rather than just walk the
1751          * caller supplied list) as the callback could change the list
1752          * (by modifying an indexed attribute hosted in the in-memory
1753          * index cache!)
1754          */
1755         keys = talloc_array(ac, TDB_DATA, dn_list->count);
1756         if (keys == NULL) {
1757                 return ldb_module_oom(ac->module);
1758         }
1759
1760         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1761                 /*
1762                  * We speculate that the keys will be GUID based and so
1763                  * pre-fill in enough space for a GUID (avoiding a pile of
1764                  * small allocations)
1765                  */
1766                 struct guid_tdb_key {
1767                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1768                 } *key_values = NULL;
1769
1770                 key_values = talloc_array(keys,
1771                                           struct guid_tdb_key,
1772                                           dn_list->count);
1773
1774                 if (key_values == NULL) {
1775                         talloc_free(keys);
1776                         return ldb_module_oom(ac->module);
1777                 }
1778                 for (i = 0; i < dn_list->count; i++) {
1779                         keys[i].dptr = key_values[i].guid_key;
1780                         keys[i].dsize = sizeof(key_values[i].guid_key);
1781                 }
1782         } else {
1783                 for (i = 0; i < dn_list->count; i++) {
1784                         keys[i].dptr = NULL;
1785                         keys[i].dsize = 0;
1786                 }
1787         }
1788
1789         for (i = 0; i < dn_list->count; i++) {
1790                 int ret;
1791
1792                 ret = ldb_kv_idx_to_key(
1793                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1794                 if (ret != LDB_SUCCESS) {
1795                         talloc_free(keys);
1796                         return ret;
1797                 }
1798
1799                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1800                         /*
1801                          * If we are in GUID index mode, then the dn_list is
1802                          * sorted.  If we got a duplicate, forget about it, as
1803                          * otherwise we would send the same entry back more
1804                          * than once.
1805                          *
1806                          * This is needed in the truncated DN case, or if a
1807                          * duplicate was forced in via
1808                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1809                          */
1810
1811                         if (memcmp(previous_guid_key,
1812                                    keys[num_keys].dptr,
1813                                    sizeof(previous_guid_key)) == 0) {
1814                                 continue;
1815                         }
1816
1817                         memcpy(previous_guid_key,
1818                                keys[num_keys].dptr,
1819                                sizeof(previous_guid_key));
1820                 }
1821                 num_keys++;
1822         }
1823
1824
1825         /*
1826          * Now that the list is a safe copy, send the callbacks
1827          */
1828         for (i = 0; i < num_keys; i++) {
1829                 int ret;
1830                 bool matched;
1831                 msg = ldb_msg_new(ac);
1832                 if (!msg) {
1833                         talloc_free(keys);
1834                         return LDB_ERR_OPERATIONS_ERROR;
1835                 }
1836
1837                 ret =
1838                     ldb_kv_search_key(ac->module,
1839                                       ldb_kv,
1840                                       keys[i],
1841                                       msg,
1842                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1843                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1844                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1845                         /*
1846                          * the record has disappeared? yes, this can
1847                          * happen if the entry is deleted by something
1848                          * operating in the callback (not another
1849                          * process, as we have a read lock)
1850                          */
1851                         talloc_free(msg);
1852                         continue;
1853                 }
1854
1855                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1856                         /* an internal error */
1857                         talloc_free(keys);
1858                         talloc_free(msg);
1859                         return LDB_ERR_OPERATIONS_ERROR;
1860                 }
1861
1862                 /*
1863                  * We trust the index for LDB_SCOPE_ONELEVEL
1864                  * unless the index key has been truncated.
1865                  *
1866                  * LDB_SCOPE_BASE is not passed in by our only caller.
1867                  */
1868                 if (ac->scope == LDB_SCOPE_ONELEVEL
1869                     && ldb_kv->cache->one_level_indexes
1870                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1871                         ret = ldb_match_message(ldb, msg, ac->tree,
1872                                                 ac->scope, &matched);
1873                 } else {
1874                         ret = ldb_match_msg_error(ldb, msg,
1875                                                   ac->tree, ac->base,
1876                                                   ac->scope, &matched);
1877                 }
1878
1879                 if (ret != LDB_SUCCESS) {
1880                         talloc_free(keys);
1881                         talloc_free(msg);
1882                         return ret;
1883                 }
1884                 if (!matched) {
1885                         talloc_free(msg);
1886                         continue;
1887                 }
1888
1889                 /* filter the attributes that the user wants */
1890                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1891
1892                 talloc_free(msg);
1893
1894                 if (ret == -1) {
1895                         talloc_free(keys);
1896                         return LDB_ERR_OPERATIONS_ERROR;
1897                 }
1898
1899                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1900                 if (ret != LDB_SUCCESS) {
1901                         /* Regardless of success or failure, the msg
1902                          * is the callbacks responsiblity, and should
1903                          * not be talloc_free()'ed */
1904                         ac->request_terminated = true;
1905                         talloc_free(keys);
1906                         return ret;
1907                 }
1908
1909                 (*match_count)++;
1910         }
1911
1912         TALLOC_FREE(keys);
1913         return LDB_SUCCESS;
1914 }
1915
1916 /*
1917   sort a DN list
1918  */
1919 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb, struct dn_list *list)
1920 {
1921         if (list->count < 2) {
1922                 return;
1923         }
1924
1925         /* We know the list is sorted when using the GUID index */
1926         if (ltdb->cache->GUID_index_attribute != NULL) {
1927                 return;
1928         }
1929
1930         TYPESAFE_QSORT(list->dn, list->count,
1931                        ldb_val_equal_exact_for_qsort);
1932 }
1933
1934 /*
1935   search the database with a LDAP-like expression using indexes
1936   returns -1 if an indexed search is not possible, in which
1937   case the caller should call ltdb_search_full()
1938 */
1939 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1940 {
1941         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1942         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(ac->module), struct ldb_kv_private);
1943         struct dn_list *dn_list;
1944         int ret;
1945         enum ldb_scope index_scope;
1946         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1947
1948         /* see if indexing is enabled */
1949         if (!ldb_kv->cache->attribute_indexes &&
1950             !ldb_kv->cache->one_level_indexes &&
1951             ac->scope != LDB_SCOPE_BASE) {
1952                 /* fallback to a full search */
1953                 return LDB_ERR_OPERATIONS_ERROR;
1954         }
1955
1956         dn_list = talloc_zero(ac, struct dn_list);
1957         if (dn_list == NULL) {
1958                 return ldb_module_oom(ac->module);
1959         }
1960
1961         /*
1962          * For the purposes of selecting the switch arm below, if we
1963          * don't have a one-level index then treat it like a subtree
1964          * search
1965          */
1966         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1967             !ldb_kv->cache->one_level_indexes) {
1968                 index_scope = LDB_SCOPE_SUBTREE;
1969         } else {
1970                 index_scope = ac->scope;
1971         }
1972
1973         switch (index_scope) {
1974         case LDB_SCOPE_BASE:
1975                 /*
1976                  * The only caller will have filtered the operation out
1977                  * so we should never get here
1978                  */
1979                 return ldb_operr(ldb);
1980
1981         case LDB_SCOPE_ONELEVEL:
1982                 /*
1983                  * If we ever start to also load the index values for
1984                  * the tree, we must ensure we strictly intersect with
1985                  * this list, as we trust the ONELEVEL index
1986                  */
1987                 ret = ldb_kv_index_dn_one(
1988                     ac->module, ldb_kv, ac->base, dn_list, &scope_one_truncation);
1989                 if (ret != LDB_SUCCESS) {
1990                         talloc_free(dn_list);
1991                         return ret;
1992                 }
1993
1994                 /*
1995                  * If we have too many matches, running the filter
1996                  * tree over the SCOPE_ONELEVEL can be quite expensive
1997                  * so we now check the filter tree index as well.
1998                  *
1999                  * We only do this in the GUID index mode, which is
2000                  * O(n*log(m)) otherwise the intersection below will
2001                  * be too costly at O(n*m).
2002                  *
2003                  * We don't set a heuristic for 'too many' but instead
2004                  * do it always and rely on the index lookup being
2005                  * fast enough in the small case.
2006                  */
2007                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2008                         struct dn_list *idx_one_tree_list
2009                                 = talloc_zero(ac, struct dn_list);
2010                         if (idx_one_tree_list == NULL) {
2011                                 return ldb_module_oom(ac->module);
2012                         }
2013
2014                         if (!ldb_kv->cache->attribute_indexes) {
2015                                 talloc_free(idx_one_tree_list);
2016                                 talloc_free(dn_list);
2017                                 return LDB_ERR_OPERATIONS_ERROR;
2018                         }
2019                         /*
2020                          * Here we load the index for the tree.
2021                          *
2022                          * We only care if this is successful, if the
2023                          * index can't trim the result list down then
2024                          * the ONELEVEL index is still good enough.
2025                          */
2026                         ret = ldb_kv_index_dn(
2027                             ac->module, ldb_kv, ac->tree, idx_one_tree_list);
2028                         if (ret == LDB_SUCCESS) {
2029                                 if (!list_intersect(ldb, ldb_kv,
2030                                                     dn_list,
2031                                                     idx_one_tree_list)) {
2032                                         talloc_free(idx_one_tree_list);
2033                                         talloc_free(dn_list);
2034                                         return LDB_ERR_OPERATIONS_ERROR;
2035                                 }
2036                         }
2037                 }
2038                 break;
2039
2040         case LDB_SCOPE_SUBTREE:
2041         case LDB_SCOPE_DEFAULT:
2042                 if (!ldb_kv->cache->attribute_indexes) {
2043                         talloc_free(dn_list);
2044                         return LDB_ERR_OPERATIONS_ERROR;
2045                 }
2046                 /*
2047                  * Here we load the index for the tree.  We have no
2048                  * index for the subtree.
2049                  */
2050                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2051                 if (ret != LDB_SUCCESS) {
2052                         talloc_free(dn_list);
2053                         return ret;
2054                 }
2055                 break;
2056         }
2057
2058         /*
2059          * It is critical that this function do the re-filter even
2060          * on things found by the index as the index can over-match
2061          * in cases of truncation (as well as when it decides it is
2062          * not worth further filtering)
2063          *
2064          * If this changes, then the index code above would need to
2065          * pass up a flag to say if any index was truncated during
2066          * processing as the truncation here refers only to the
2067          * SCOPE_ONELEVEL index.
2068          */
2069         ret = ldb_kv_index_filter(
2070             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2071         talloc_free(dn_list);
2072         return ret;
2073 }
2074
2075 /**
2076  * @brief Add a DN in the index list of a given attribute name/value pair
2077  *
2078  * This function will add the DN in the index list for the index for
2079  * the given attribute name and value.
2080  *
2081  * @param[in]  module       A ldb_module structure
2082  *
2083  * @param[in]  dn           The string representation of the DN as it
2084  *                          will be stored in the index entry
2085  *
2086  * @param[in]  el           A ldb_message_element array, one of the entry
2087  *                          referred by the v_idx is the attribute name and
2088  *                          value pair which will be used to construct the
2089  *                          index name
2090  *
2091  * @param[in]  v_idx        The index of element in the el array to use
2092  *
2093  * @return                  An ldb error code
2094  */
2095 static int ldb_kv_index_add1(struct ldb_module *module,
2096                              struct ldb_kv_private *ldb_kv,
2097                              const struct ldb_message *msg,
2098                              struct ldb_message_element *el,
2099                              int v_idx)
2100 {
2101         struct ldb_context *ldb;
2102         struct ldb_dn *dn_key;
2103         int ret;
2104         const struct ldb_schema_attribute *a;
2105         struct dn_list *list;
2106         unsigned alloc_len;
2107         enum key_truncation truncation = KEY_TRUNCATED;
2108
2109
2110         ldb = ldb_module_get_ctx(module);
2111
2112         list = talloc_zero(module, struct dn_list);
2113         if (list == NULL) {
2114                 return LDB_ERR_OPERATIONS_ERROR;
2115         }
2116
2117         dn_key = ldb_kv_index_key(
2118             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2119         if (!dn_key) {
2120                 talloc_free(list);
2121                 return LDB_ERR_OPERATIONS_ERROR;
2122         }
2123         /*
2124          * Samba only maintains unique indexes on the objectSID and objectGUID
2125          * so if a unique index key exceeds the maximum length there is a
2126          * problem.
2127          */
2128         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2129                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2130                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2131
2132                 ldb_asprintf_errstring(
2133                         ldb,
2134                         __location__ ": unique index key on %s in %s, "
2135                         "exceeds maximum key length of %u (encoded).",
2136                         el->name,
2137                         ldb_dn_get_linearized(msg->dn),
2138                         ldb_kv->max_key_length);
2139                 talloc_free(list);
2140                 return LDB_ERR_CONSTRAINT_VIOLATION;
2141         }
2142         talloc_steal(list, dn_key);
2143
2144         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2145         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2146                 talloc_free(list);
2147                 return ret;
2148         }
2149
2150         /*
2151          * Check for duplicates in the @IDXDN DN -> GUID record
2152          *
2153          * This is very normal, it just means a duplicate DN creation
2154          * was attempted, so don't set the error string or print scary
2155          * messages.
2156          */
2157         if (list->count > 0 &&
2158             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2159             truncation == KEY_NOT_TRUNCATED) {
2160
2161                 talloc_free(list);
2162                 return LDB_ERR_CONSTRAINT_VIOLATION;
2163
2164         } else if (list->count > 0
2165                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2166
2167                 /*
2168                  * At least one existing entry in the DN->GUID index, which
2169                  * arises when the DN indexes have been truncated
2170                  *
2171                  * So need to pull the DN's to check if it's really a duplicate
2172                  */
2173                 int i;
2174                 for (i=0; i < list->count; i++) {
2175                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2176                         TDB_DATA key = {
2177                                 .dptr = guid_key,
2178                                 .dsize = sizeof(guid_key)
2179                         };
2180                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2181                         struct ldb_message *rec = ldb_msg_new(ldb);
2182                         if (rec == NULL) {
2183                                 return LDB_ERR_OPERATIONS_ERROR;
2184                         }
2185
2186                         ret = ldb_kv_idx_to_key(
2187                             module, ldb_kv, ldb, &list->dn[i], &key);
2188                         if (ret != LDB_SUCCESS) {
2189                                 TALLOC_FREE(list);
2190                                 TALLOC_FREE(rec);
2191                                 return ret;
2192                         }
2193
2194                         ret = ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2195                         if (key.dptr != guid_key) {
2196                                 TALLOC_FREE(key.dptr);
2197                         }
2198                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2199                                 /*
2200                                  * the record has disappeared?
2201                                  * yes, this can happen
2202                                  */
2203                                 talloc_free(rec);
2204                                 continue;
2205                         }
2206
2207                         if (ret != LDB_SUCCESS) {
2208                                 /* an internal error */
2209                                 TALLOC_FREE(rec);
2210                                 TALLOC_FREE(list);
2211                                 return LDB_ERR_OPERATIONS_ERROR;
2212                         }
2213                         /*
2214                          * The DN we are trying to add to the DB and index
2215                          * is already here, so we must deny the addition
2216                          */
2217                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2218                                 TALLOC_FREE(rec);
2219                                 TALLOC_FREE(list);
2220                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2221                         }
2222                 }
2223         }
2224
2225         /*
2226          * Check for duplicates in unique indexes
2227          *
2228          * We don't need to do a loop test like the @IDXDN case
2229          * above as we have a ban on long unique index values
2230          * at the start of this function.
2231          */
2232         if (list->count > 0 &&
2233             ((a != NULL
2234               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2235                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2236                 /*
2237                  * We do not want to print info about a possibly
2238                  * confidential DN that the conflict was with in the
2239                  * user-visible error string
2240                  */
2241
2242                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2243                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2244                                   __location__
2245                                   ": unique index violation on %s in %s, "
2246                                   "conficts with %*.*s in %s",
2247                                   el->name, ldb_dn_get_linearized(msg->dn),
2248                                   (int)list->dn[0].length,
2249                                   (int)list->dn[0].length,
2250                                   list->dn[0].data,
2251                                   ldb_dn_get_linearized(dn_key));
2252                 } else {
2253                         /* This can't fail, gives a default at worst */
2254                         const struct ldb_schema_attribute *attr
2255                                 = ldb_schema_attribute_by_name(
2256                                         ldb,
2257                                         ldb_kv->cache->GUID_index_attribute);
2258                         struct ldb_val v;
2259                         ret = attr->syntax->ldif_write_fn(ldb, list,
2260                                                           &list->dn[0], &v);
2261                         if (ret == LDB_SUCCESS) {
2262                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2263                                           __location__
2264                                           ": unique index violation on %s in "
2265                                           "%s, conficts with %s %*.*s in %s",
2266                                           el->name,
2267                                           ldb_dn_get_linearized(msg->dn),
2268                                           ldb_kv->cache->GUID_index_attribute,
2269                                           (int)v.length,
2270                                           (int)v.length,
2271                                           v.data,
2272                                           ldb_dn_get_linearized(dn_key));
2273                         }
2274                 }
2275                 ldb_asprintf_errstring(ldb,
2276                                        __location__ ": unique index violation "
2277                                        "on %s in %s",
2278                                        el->name,
2279                                        ldb_dn_get_linearized(msg->dn));
2280                 talloc_free(list);
2281                 return LDB_ERR_CONSTRAINT_VIOLATION;
2282         }
2283
2284         /* overallocate the list a bit, to reduce the number of
2285          * realloc trigered copies */
2286         alloc_len = ((list->count+1)+7) & ~7;
2287         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2288         if (list->dn == NULL) {
2289                 talloc_free(list);
2290                 return LDB_ERR_OPERATIONS_ERROR;
2291         }
2292
2293         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2294                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2295                 list->dn[list->count].data
2296                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2297                 if (list->dn[list->count].data == NULL) {
2298                         talloc_free(list);
2299                         return LDB_ERR_OPERATIONS_ERROR;
2300                 }
2301                 list->dn[list->count].length = strlen(dn_str);
2302         } else {
2303                 const struct ldb_val *key_val;
2304                 struct ldb_val *exact = NULL, *next = NULL;
2305                 key_val = ldb_msg_find_ldb_val(msg,
2306                                                ldb_kv->cache->GUID_index_attribute);
2307                 if (key_val == NULL) {
2308                         talloc_free(list);
2309                         return ldb_module_operr(module);
2310                 }
2311
2312                 if (key_val->length != LTDB_GUID_SIZE) {
2313                         talloc_free(list);
2314                         return ldb_module_operr(module);
2315                 }
2316
2317                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2318                                         *key_val, ldb_val_equal_exact_ordered,
2319                                         exact, next);
2320
2321                 /*
2322                  * Give a warning rather than fail, this could be a
2323                  * duplicate value in the record allowed by a caller
2324                  * forcing in the value with
2325                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2326                  */
2327                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2328                         /* This can't fail, gives a default at worst */
2329                         const struct ldb_schema_attribute *attr
2330                                 = ldb_schema_attribute_by_name(
2331                                         ldb,
2332                                         ldb_kv->cache->GUID_index_attribute);
2333                         struct ldb_val v;
2334                         ret = attr->syntax->ldif_write_fn(ldb, list,
2335                                                           exact, &v);
2336                         if (ret == LDB_SUCCESS) {
2337                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2338                                           __location__
2339                                           ": duplicate attribute value in %s "
2340                                           "for index on %s, "
2341                                           "duplicate of %s %*.*s in %s",
2342                                           ldb_dn_get_linearized(msg->dn),
2343                                           el->name,
2344                                           ldb_kv->cache->GUID_index_attribute,
2345                                           (int)v.length,
2346                                           (int)v.length,
2347                                           v.data,
2348                                           ldb_dn_get_linearized(dn_key));
2349                         }
2350                 }
2351
2352                 if (next == NULL) {
2353                         next = &list->dn[list->count];
2354                 } else {
2355                         memmove(&next[1], next,
2356                                 sizeof(*next) * (list->count - (next - list->dn)));
2357                 }
2358                 *next = ldb_val_dup(list->dn, key_val);
2359                 if (next->data == NULL) {
2360                         talloc_free(list);
2361                         return ldb_module_operr(module);
2362                 }
2363         }
2364         list->count++;
2365
2366         ret = ldb_kv_dn_list_store(module, dn_key, list);
2367
2368         talloc_free(list);
2369
2370         return ret;
2371 }
2372
2373 /*
2374   add index entries for one elements in a message
2375  */
2376 static int ldb_kv_index_add_el(struct ldb_module *module,
2377                                struct ldb_kv_private *ldb_kv,
2378                                const struct ldb_message *msg,
2379                                struct ldb_message_element *el)
2380 {
2381         unsigned int i;
2382         for (i = 0; i < el->num_values; i++) {
2383                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2384                 if (ret != LDB_SUCCESS) {
2385                         return ret;
2386                 }
2387         }
2388
2389         return LDB_SUCCESS;
2390 }
2391
2392 /*
2393   add index entries for all elements in a message
2394  */
2395 static int ldb_kv_index_add_all(struct ldb_module *module,
2396                                 struct ldb_kv_private *ldb_kv,
2397                                 const struct ldb_message *msg)
2398 {
2399         struct ldb_message_element *elements = msg->elements;
2400         unsigned int i;
2401         const char *dn_str;
2402         int ret;
2403
2404         if (ldb_dn_is_special(msg->dn)) {
2405                 return LDB_SUCCESS;
2406         }
2407
2408         dn_str = ldb_dn_get_linearized(msg->dn);
2409         if (dn_str == NULL) {
2410                 return LDB_ERR_OPERATIONS_ERROR;
2411         }
2412
2413         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2414         if (ret != LDB_SUCCESS) {
2415                 return ret;
2416         }
2417
2418         if (!ldb_kv->cache->attribute_indexes) {
2419                 /* no indexed fields */
2420                 return LDB_SUCCESS;
2421         }
2422
2423         for (i = 0; i < msg->num_elements; i++) {
2424                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2425                         continue;
2426                 }
2427                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2428                 if (ret != LDB_SUCCESS) {
2429                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2430                         ldb_asprintf_errstring(ldb,
2431                                                __location__ ": Failed to re-index %s in %s - %s",
2432                                                elements[i].name, dn_str,
2433                                                ldb_errstring(ldb));
2434                         return ret;
2435                 }
2436         }
2437
2438         return LDB_SUCCESS;
2439 }
2440
2441
2442 /*
2443   insert a DN index for a message
2444 */
2445 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2446                                   struct ldb_kv_private *ldb_kv,
2447                                   const struct ldb_message *msg,
2448                                   struct ldb_dn *dn,
2449                                   const char *index,
2450                                   int add)
2451 {
2452         struct ldb_message_element el;
2453         struct ldb_val val;
2454         int ret;
2455
2456         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2457         if (val.data == NULL) {
2458                 const char *dn_str = ldb_dn_get_linearized(dn);
2459                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2460                                        __location__
2461                                        ": Failed to modify %s "
2462                                        "against %s in %s: failed "
2463                                        "to get casefold DN",
2464                                        index,
2465                                        ldb_kv->cache->GUID_index_attribute,
2466                                        dn_str);
2467                 return LDB_ERR_OPERATIONS_ERROR;
2468         }
2469
2470         val.length = strlen((char *)val.data);
2471         el.name = index;
2472         el.values = &val;
2473         el.num_values = 1;
2474
2475         if (add) {
2476                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2477         } else { /* delete */
2478                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2479         }
2480
2481         if (ret != LDB_SUCCESS) {
2482                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2483                 const char *dn_str = ldb_dn_get_linearized(dn);
2484                 ldb_asprintf_errstring(ldb,
2485                                        __location__
2486                                        ": Failed to modify %s "
2487                                        "against %s in %s - %s",
2488                                        index,
2489                                        ldb_kv->cache->GUID_index_attribute,
2490                                        dn_str, ldb_errstring(ldb));
2491                 return ret;
2492         }
2493         return ret;
2494 }
2495
2496 /*
2497   insert a one level index for a message
2498 */
2499 static int ldb_kv_index_onelevel(struct ldb_module *module,
2500                                  const struct ldb_message *msg,
2501                                  int add)
2502 {
2503         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module),
2504                                                     struct ldb_kv_private);
2505         struct ldb_dn *pdn;
2506         int ret;
2507
2508         /* We index for ONE Level only if requested */
2509         if (!ldb_kv->cache->one_level_indexes) {
2510                 return LDB_SUCCESS;
2511         }
2512
2513         pdn = ldb_dn_get_parent(module, msg->dn);
2514         if (pdn == NULL) {
2515                 return LDB_ERR_OPERATIONS_ERROR;
2516         }
2517         ret = ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LTDB_IDXONE, add);
2518
2519         talloc_free(pdn);
2520
2521         return ret;
2522 }
2523
2524 /*
2525   insert a one level index for a message
2526 */
2527 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2528                                       const struct ldb_message *msg,
2529                                       int add)
2530 {
2531         int ret;
2532         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module),
2533                                                     struct ldb_kv_private);
2534
2535         /* We index for DN only if using a GUID index */
2536         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2537                 return LDB_SUCCESS;
2538         }
2539
2540         ret =
2541             ldb_kv_modify_index_dn(module, ldb_kv, msg, msg->dn, LTDB_IDXDN, add);
2542
2543         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2544                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2545                                        "Entry %s already exists",
2546                                        ldb_dn_get_linearized(msg->dn));
2547                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2548         }
2549         return ret;
2550 }
2551
2552 /*
2553   add the index entries for a new element in a record
2554   The caller guarantees that these element values are not yet indexed
2555 */
2556 int ldb_kv_index_add_element(struct ldb_module *module,
2557                              struct ldb_kv_private *ldb_kv,
2558                              const struct ldb_message *msg,
2559                              struct ldb_message_element *el)
2560 {
2561         if (ldb_dn_is_special(msg->dn)) {
2562                 return LDB_SUCCESS;
2563         }
2564         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2565                 return LDB_SUCCESS;
2566         }
2567         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2568 }
2569
2570 /*
2571   add the index entries for a new record
2572 */
2573 int ldb_kv_index_add_new(struct ldb_module *module,
2574                          struct ldb_kv_private *ldb_kv,
2575                          const struct ldb_message *msg)
2576 {
2577         int ret;
2578
2579         if (ldb_dn_is_special(msg->dn)) {
2580                 return LDB_SUCCESS;
2581         }
2582
2583         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2584         if (ret != LDB_SUCCESS) {
2585                 /*
2586                  * Because we can't trust the caller to be doing
2587                  * transactions properly, clean up any index for this
2588                  * entry rather than relying on a transaction
2589                  * cleanup
2590                  */
2591
2592                 ldb_kv_index_delete(module, msg);
2593                 return ret;
2594         }
2595
2596         ret = ldb_kv_index_onelevel(module, msg, 1);
2597         if (ret != LDB_SUCCESS) {
2598                 /*
2599                  * Because we can't trust the caller to be doing
2600                  * transactions properly, clean up any index for this
2601                  * entry rather than relying on a transaction
2602                  * cleanup
2603                  */
2604                 ldb_kv_index_delete(module, msg);
2605                 return ret;
2606         }
2607         return ret;
2608 }
2609
2610
2611 /*
2612   delete an index entry for one message element
2613 */
2614 int ldb_kv_index_del_value(struct ldb_module *module,
2615                            struct ldb_kv_private *ldb_kv,
2616                            const struct ldb_message *msg,
2617                            struct ldb_message_element *el,
2618                            unsigned int v_idx)
2619 {
2620         struct ldb_context *ldb;
2621         struct ldb_dn *dn_key;
2622         const char *dn_str;
2623         int ret, i;
2624         unsigned int j;
2625         struct dn_list *list;
2626         struct ldb_dn *dn = msg->dn;
2627         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2628
2629         ldb = ldb_module_get_ctx(module);
2630
2631         dn_str = ldb_dn_get_linearized(dn);
2632         if (dn_str == NULL) {
2633                 return LDB_ERR_OPERATIONS_ERROR;
2634         }
2635
2636         if (dn_str[0] == '@') {
2637                 return LDB_SUCCESS;
2638         }
2639
2640         dn_key = ldb_kv_index_key(
2641             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2642         /*
2643          * We ignore key truncation in ltdb_index_add1() so
2644          * match that by ignoring it here as well
2645          *
2646          * Multiple values are legitimate and accepted
2647          */
2648         if (!dn_key) {
2649                 return LDB_ERR_OPERATIONS_ERROR;
2650         }
2651
2652         list = talloc_zero(dn_key, struct dn_list);
2653         if (list == NULL) {
2654                 talloc_free(dn_key);
2655                 return LDB_ERR_OPERATIONS_ERROR;
2656         }
2657
2658         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2659         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2660                 /* it wasn't indexed. Did we have an earlier error? If we did then
2661                    its gone now */
2662                 talloc_free(dn_key);
2663                 return LDB_SUCCESS;
2664         }
2665
2666         if (ret != LDB_SUCCESS) {
2667                 talloc_free(dn_key);
2668                 return ret;
2669         }
2670
2671         /*
2672          * Find one of the values matching this message to remove
2673          */
2674         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2675         if (i == -1) {
2676                 /* nothing to delete */
2677                 talloc_free(dn_key);
2678                 return LDB_SUCCESS;
2679         }
2680
2681         j = (unsigned int) i;
2682         if (j != list->count - 1) {
2683                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2684         }
2685         list->count--;
2686         if (list->count == 0) {
2687                 talloc_free(list->dn);
2688                 list->dn = NULL;
2689         } else {
2690                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2691         }
2692
2693         ret = ldb_kv_dn_list_store(module, dn_key, list);
2694
2695         talloc_free(dn_key);
2696
2697         return ret;
2698 }
2699
2700 /*
2701   delete the index entries for a element
2702   return -1 on failure
2703 */
2704 int ldb_kv_index_del_element(struct ldb_module *module,
2705                              struct ldb_kv_private *ldb_kv,
2706                              const struct ldb_message *msg,
2707                              struct ldb_message_element *el)
2708 {
2709         const char *dn_str;
2710         int ret;
2711         unsigned int i;
2712
2713         if (!ldb_kv->cache->attribute_indexes) {
2714                 /* no indexed fields */
2715                 return LDB_SUCCESS;
2716         }
2717
2718         dn_str = ldb_dn_get_linearized(msg->dn);
2719         if (dn_str == NULL) {
2720                 return LDB_ERR_OPERATIONS_ERROR;
2721         }
2722
2723         if (dn_str[0] == '@') {
2724                 return LDB_SUCCESS;
2725         }
2726
2727         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2728                 return LDB_SUCCESS;
2729         }
2730         for (i = 0; i < el->num_values; i++) {
2731                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2732                 if (ret != LDB_SUCCESS) {
2733                         return ret;
2734                 }
2735         }
2736
2737         return LDB_SUCCESS;
2738 }
2739
2740 /*
2741   delete the index entries for a record
2742   return -1 on failure
2743 */
2744 int ldb_kv_index_delete(struct ldb_module *module,
2745                         const struct ldb_message *msg)
2746 {
2747         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
2748         int ret;
2749         unsigned int i;
2750
2751         if (ldb_dn_is_special(msg->dn)) {
2752                 return LDB_SUCCESS;
2753         }
2754
2755         ret = ldb_kv_index_onelevel(module, msg, 0);
2756         if (ret != LDB_SUCCESS) {
2757                 return ret;
2758         }
2759
2760         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2761         if (ret != LDB_SUCCESS) {
2762                 return ret;
2763         }
2764
2765         if (!ldb_kv->cache->attribute_indexes) {
2766                 /* no indexed fields */
2767                 return LDB_SUCCESS;
2768         }
2769
2770         for (i = 0; i < msg->num_elements; i++) {
2771                 ret = ldb_kv_index_del_element(
2772                     module, ldb_kv, msg, &msg->elements[i]);
2773                 if (ret != LDB_SUCCESS) {
2774                         return ret;
2775                 }
2776         }
2777
2778         return LDB_SUCCESS;
2779 }
2780
2781
2782 /*
2783   traversal function that deletes all @INDEX records in the in-memory
2784   TDB.
2785
2786   This does not touch the actual DB, that is done at transaction
2787   commit, which in turn greatly reduces DB churn as we will likely
2788   be able to do a direct update into the old record.
2789 */
2790 static int delete_index(struct ldb_kv_private *ldb_kv, struct ldb_val key, struct ldb_val data, void *state)
2791 {
2792         struct ldb_module *module = state;
2793         const char *dnstr = "DN=" LTDB_INDEX ":";
2794         struct dn_list list;
2795         struct ldb_dn *dn;
2796         struct ldb_val v;
2797         int ret;
2798
2799         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2800                 return 0;
2801         }
2802         /* we need to put a empty list in the internal tdb for this
2803          * index entry */
2804         list.dn = NULL;
2805         list.count = 0;
2806
2807         /* the offset of 3 is to remove the DN= prefix. */
2808         v.data = key.data + 3;
2809         v.length = strnlen((char *)key.data, key.length) - 3;
2810
2811         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2812
2813         /*
2814          * This does not actually touch the DB quite yet, just
2815          * the in-memory index cache
2816          */
2817         ret = ldb_kv_dn_list_store(module, dn, &list);
2818         if (ret != LDB_SUCCESS) {
2819                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2820                                        "Unable to store null index for %s\n",
2821                                                 ldb_dn_get_linearized(dn));
2822                 talloc_free(dn);
2823                 return -1;
2824         }
2825         talloc_free(dn);
2826         return 0;
2827 }
2828
2829 /*
2830   traversal function that adds @INDEX records during a re index TODO wrong comment
2831 */
2832 static int re_key(struct ldb_kv_private *ldb_kv, struct ldb_val ldb_key, struct ldb_val val, void *state)
2833 {
2834         struct ldb_context *ldb;
2835         struct ldb_kv_reindex_context *ctx =
2836             (struct ldb_kv_reindex_context *)state;
2837         struct ldb_module *module = ctx->module;
2838         struct ldb_message *msg;
2839         unsigned int nb_elements_in_db;
2840         int ret;
2841         TDB_DATA key2;
2842         bool is_record;
2843         TDB_DATA key = {
2844                 .dptr = ldb_key.data,
2845                 .dsize = ldb_key.length
2846         };
2847         
2848         ldb = ldb_module_get_ctx(module);
2849
2850         if (key.dsize > 4 &&
2851             memcmp(key.dptr, "DN=@", 4) == 0) {
2852                 return 0;
2853         }
2854
2855         is_record = ldb_kv_key_is_record(key);
2856         if (is_record == false) {
2857                 return 0;
2858         }
2859         
2860         msg = ldb_msg_new(module);
2861         if (msg == NULL) {
2862                 return -1;
2863         }
2864
2865         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2866                                                    msg,
2867                                                    NULL, 0,
2868                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2869                                                    &nb_elements_in_db);
2870         if (ret != 0) {
2871                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2872                                                 ldb_dn_get_linearized(msg->dn));
2873                 ctx->error = ret;
2874                 talloc_free(msg);
2875                 return -1;
2876         }
2877
2878         if (msg->dn == NULL) {
2879                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2880                           "Refusing to re-index as GUID "
2881                           "key %*.*s with no DN\n",
2882                           (int)key.dsize, (int)key.dsize,
2883                           (char *)key.dptr);
2884                 talloc_free(msg);
2885                 return -1;
2886         }
2887         
2888         /* check if the DN key has changed, perhaps due to the case
2889            insensitivity of an element changing, or a change from DN
2890            to GUID keys */
2891         key2 = ldb_kv_key_msg(module, msg, msg);
2892         if (key2.dptr == NULL) {
2893                 /* probably a corrupt record ... darn */
2894                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2895                                                 ldb_dn_get_linearized(msg->dn));
2896                 talloc_free(msg);
2897                 return 0;
2898         }
2899         if (key.dsize != key2.dsize ||
2900             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2901                 struct ldb_val ldb_key2 = {
2902                         .data = key2.dptr,
2903                         .length = key2.dsize
2904                 };
2905                 ldb_kv->kv_ops->update_in_iterate(ldb_kv, ldb_key, ldb_key2, val, ctx);
2906         }
2907         talloc_free(key2.dptr);
2908
2909         talloc_free(msg);
2910
2911         ctx->count++;
2912         if (ctx->count % 10000 == 0) {
2913                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2914                           "Reindexing: re-keyed %u records so far",
2915                           ctx->count);
2916         }
2917
2918         return 0;
2919 }
2920
2921 /*
2922   traversal function that adds @INDEX records during a re index
2923 */
2924 static int re_index(struct ldb_kv_private *ldb_kv, struct ldb_val ldb_key, struct ldb_val val, void *state)
2925 {
2926         struct ldb_context *ldb;
2927         struct ldb_kv_reindex_context *ctx =
2928             (struct ldb_kv_reindex_context *)state;
2929         struct ldb_module *module = ctx->module;
2930         struct ldb_message *msg;
2931         unsigned int nb_elements_in_db;
2932         TDB_DATA key = {
2933                 .dptr = ldb_key.data,
2934                 .dsize = ldb_key.length
2935         };
2936         int ret;
2937         bool is_record;
2938         
2939         ldb = ldb_module_get_ctx(module);
2940
2941         if (key.dsize > 4 &&
2942             memcmp(key.dptr, "DN=@", 4) == 0) {
2943                 return 0;
2944         }
2945
2946         is_record = ldb_kv_key_is_record(key);
2947         if (is_record == false) {
2948                 return 0;
2949         }
2950         
2951         msg = ldb_msg_new(module);
2952         if (msg == NULL) {
2953                 return -1;
2954         }
2955
2956         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2957                                                    msg,
2958                                                    NULL, 0,
2959                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2960                                                    &nb_elements_in_db);
2961         if (ret != 0) {
2962                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2963                                                 ldb_dn_get_linearized(msg->dn));
2964                 ctx->error = ret;
2965                 talloc_free(msg);
2966                 return -1;
2967         }
2968
2969         if (msg->dn == NULL) {
2970                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2971                           "Refusing to re-index as GUID "
2972                           "key %*.*s with no DN\n",
2973                           (int)key.dsize, (int)key.dsize,
2974                           (char *)key.dptr);
2975                 talloc_free(msg);
2976                 return -1;
2977         }
2978
2979         ret = ldb_kv_index_onelevel(module, msg, 1);
2980         if (ret != LDB_SUCCESS) {
2981                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2982                           "Adding special ONE LEVEL index failed (%s)!",
2983                                                 ldb_dn_get_linearized(msg->dn));
2984                 talloc_free(msg);
2985                 return -1;
2986         }
2987
2988         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2989
2990         if (ret != LDB_SUCCESS) {
2991                 ctx->error = ret;
2992                 talloc_free(msg);
2993                 return -1;
2994         }
2995
2996         talloc_free(msg);
2997
2998         ctx->count++;
2999         if (ctx->count % 10000 == 0) {
3000                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3001                           "Reindexing: re-indexed %u records so far",
3002                           ctx->count);
3003         }
3004
3005         return 0;
3006 }
3007
3008 /*
3009   force a complete reindex of the database
3010 */
3011 int ldb_kv_reindex(struct ldb_module *module)
3012 {
3013         struct ldb_kv_private *ldb_kv = talloc_get_type(ldb_module_get_private(module), struct ldb_kv_private);
3014         int ret;
3015         struct ldb_kv_reindex_context ctx;
3016
3017         /*
3018          * Only triggered after a modification, but make clear we do
3019          * not re-index a read-only DB
3020          */
3021         if (ldb_kv->read_only) {
3022                 return LDB_ERR_UNWILLING_TO_PERFORM;
3023         }
3024
3025         if (ldb_kv_cache_reload(module) != 0) {
3026                 return LDB_ERR_OPERATIONS_ERROR;
3027         }
3028
3029         /*
3030          * Ensure we read (and so remove) the entries from the real
3031          * DB, no values stored so far are any use as we want to do a
3032          * re-index
3033          */
3034         ldb_kv_index_transaction_cancel(module);
3035
3036         ret = ldb_kv_index_transaction_start(module);
3037         if (ret != LDB_SUCCESS) {
3038                 return ret;
3039         }
3040
3041         /* first traverse the database deleting any @INDEX records by
3042          * putting NULL entries in the in-memory tdb
3043          */
3044         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3045         if (ret < 0) {
3046                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3047                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3048                                        ldb_errstring(ldb));
3049                 return LDB_ERR_OPERATIONS_ERROR;
3050         }
3051
3052         ctx.module = module;
3053         ctx.error = 0;
3054         ctx.count = 0;
3055
3056         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3057         if (ret < 0) {
3058                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3059                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3060                                        ldb_errstring(ldb));
3061                 return LDB_ERR_OPERATIONS_ERROR;
3062         }
3063
3064         if (ctx.error != LDB_SUCCESS) {
3065                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3066                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3067                 return ctx.error;
3068         }
3069
3070         ctx.error = 0;
3071         ctx.count = 0;
3072
3073         /* now traverse adding any indexes for normal LDB records */
3074         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3075         if (ret < 0) {
3076                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3077                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3078                                        ldb_errstring(ldb));
3079                 return LDB_ERR_OPERATIONS_ERROR;
3080         }
3081
3082         if (ctx.error != LDB_SUCCESS) {
3083                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3084                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3085                 return ctx.error;
3086         }
3087
3088         if (ctx.count > 10000) {
3089                 ldb_debug(ldb_module_get_ctx(module),
3090                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3091                           "final index write-out will be in transaction commit",
3092                           ldb_kv->kv_ops->name(ldb_kv));
3093         }
3094         return LDB_SUCCESS;
3095 }