97cc6109662f27fcd0dcffa2e3052d3d1e26f80b
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ltdb_write_index_dn_guid(struct ldb_module *module,
174                                     const struct ldb_message *msg,
175                                     int add);
176 static int ltdb_index_dn_base_dn(struct ldb_module *module,
177                                  struct ltdb_private *ltdb,
178                                  struct ldb_dn *base_dn,
179                                  struct dn_list *dn_list,
180                                  enum key_truncation *truncation);
181
182 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
183                               struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
193         if (ltdb->max_key_length == 0){
194                 return UINT_MAX;
195         }
196         return ltdb->max_key_length;
197 }
198
199 /* enable the idxptr mode when transactions start */
200 int ltdb_index_transaction_start(struct ldb_module *module)
201 {
202         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
203         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
204         if (ltdb->idxptr == NULL) {
205                 return ldb_oom(ldb_module_get_ctx(module));
206         }
207
208         return LDB_SUCCESS;
209 }
210
211 /*
212   see if two ldb_val structures contain exactly the same data
213   return -1 or 1 for a mismatch, 0 for match
214 */
215 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
216                                          const struct ldb_val *v2)
217 {
218         if (v1->length > v2->length) {
219                 return -1;
220         }
221         if (v1->length < v2->length) {
222                 return 1;
223         }
224         return memcmp(v1->data, v2->data, v1->length);
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
232                                        const struct ldb_val *v2)
233 {
234         if (v1.length > v2->length) {
235                 return -1;
236         }
237         if (v1.length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1.data, v2->data, v1.length);
241 }
242
243
244 /*
245   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
246   binary-safe comparison for the 'dn' returns -1 if not found
247
248   This is therefore safe when the value is a GUID in the future
249  */
250 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
251                                  const struct dn_list *list,
252                                  const struct ldb_val *v)
253 {
254         unsigned int i;
255         struct ldb_val *exact = NULL, *next = NULL;
256
257         if (ltdb->cache->GUID_index_attribute == NULL) {
258                 for (i=0; i<list->count; i++) {
259                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
260                                 return i;
261                         }
262                 }
263                 return -1;
264         }
265
266         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
267                                 *v, ldb_val_equal_exact_ordered,
268                                 exact, next);
269         if (exact == NULL) {
270                 return -1;
271         }
272         /* Not required, but keeps the compiler quiet */
273         if (next != NULL) {
274                 return -1;
275         }
276
277         i = exact - list->dn;
278         return i;
279 }
280
281 /*
282   find a entry in a dn_list. Uses a case sensitive comparison with the dn
283   returns -1 if not found
284  */
285 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
286                                  struct dn_list *list,
287                                  const struct ldb_message *msg)
288 {
289         struct ldb_val v;
290         const struct ldb_val *key_val;
291         if (ltdb->cache->GUID_index_attribute == NULL) {
292                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
293                 v.data = discard_const_p(unsigned char, dn_str);
294                 v.length = strlen(dn_str);
295         } else {
296                 key_val = ldb_msg_find_ldb_val(msg,
297                                                ltdb->cache->GUID_index_attribute);
298                 if (key_val == NULL) {
299                         return -1;
300                 }
301                 v = *key_val;
302         }
303         return ltdb_dn_list_find_val(ltdb, list, &v);
304 }
305
306 /*
307   this is effectively a cast function, but with lots of paranoia
308   checks and also copes with CPUs that are fussy about pointer
309   alignment
310  */
311 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
312 {
313         struct dn_list *list;
314         if (rec.dsize != sizeof(void *)) {
315                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
316                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
317                 return NULL;
318         }
319         /* note that we can't just use a cast here, as rec.dptr may
320            not be aligned sufficiently for a pointer. A cast would cause
321            platforms like some ARM CPUs to crash */
322         memcpy(&list, rec.dptr, sizeof(void *));
323         list = talloc_get_type(list, struct dn_list);
324         if (list == NULL) {
325                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
326                                        "Bad type '%s' for idxptr",
327                                        talloc_get_name(list));
328                 return NULL;
329         }
330         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
331                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
332                                        "Bad parent '%s' for idxptr",
333                                        talloc_get_name(talloc_parent(list->dn)));
334                 return NULL;
335         }
336         return list;
337 }
338
339 /*
340   return the @IDX list in an index entry for a dn as a
341   struct dn_list
342  */
343 static int ltdb_dn_list_load(struct ldb_module *module,
344                              struct ltdb_private *ltdb,
345                              struct ldb_dn *dn, struct dn_list *list)
346 {
347         struct ldb_message *msg;
348         int ret, version;
349         struct ldb_message_element *el;
350         TDB_DATA rec;
351         struct dn_list *list2;
352         TDB_DATA key;
353
354         list->dn = NULL;
355         list->count = 0;
356
357         /* see if we have any in-memory index entries */
358         if (ltdb->idxptr == NULL ||
359             ltdb->idxptr->itdb == NULL) {
360                 goto normal_index;
361         }
362
363         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
364         key.dsize = strlen((char *)key.dptr);
365
366         rec = tdb_fetch(ltdb->idxptr->itdb, key);
367         if (rec.dptr == NULL) {
368                 goto normal_index;
369         }
370
371         /* we've found an in-memory index entry */
372         list2 = ltdb_index_idxptr(module, rec, true);
373         if (list2 == NULL) {
374                 free(rec.dptr);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         free(rec.dptr);
378
379         *list = *list2;
380         return LDB_SUCCESS;
381
382 normal_index:
383         msg = ldb_msg_new(list);
384         if (msg == NULL) {
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_search_dn1(module, dn, msg,
389                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
390                               |LDB_UNPACK_DATA_FLAG_NO_DN);
391         if (ret != LDB_SUCCESS) {
392                 talloc_free(msg);
393                 return ret;
394         }
395
396         el = ldb_msg_find_element(msg, LTDB_IDX);
397         if (!el) {
398                 talloc_free(msg);
399                 return LDB_SUCCESS;
400         }
401
402         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
403
404         /*
405          * we avoid copying the strings by stealing the list.  We have
406          * to steal msg onto el->values (which looks odd) because we
407          * asked for the memory to be allocated on msg, not on each
408          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
409          */
410         if (ltdb->cache->GUID_index_attribute == NULL) {
411                 /* check indexing version number */
412                 if (version != LTDB_INDEXING_VERSION) {
413                         ldb_debug_set(ldb_module_get_ctx(module),
414                                       LDB_DEBUG_ERROR,
415                                       "Wrong DN index version %d "
416                                       "expected %d for %s",
417                                       version, LTDB_INDEXING_VERSION,
418                                       ldb_dn_get_linearized(dn));
419                         return LDB_ERR_OPERATIONS_ERROR;
420                 }
421
422                 talloc_steal(el->values, msg);
423                 list->dn = talloc_steal(list, el->values);
424                 list->count = el->num_values;
425         } else {
426                 unsigned int i;
427                 if (version != LTDB_GUID_INDEXING_VERSION) {
428                         /* This is quite likely during the DB startup
429                            on first upgrade to using a GUID index */
430                         ldb_debug_set(ldb_module_get_ctx(module),
431                                       LDB_DEBUG_ERROR,
432                                       "Wrong GUID index version %d "
433                                       "expected %d for %s",
434                                       version, LTDB_GUID_INDEXING_VERSION,
435                                       ldb_dn_get_linearized(dn));
436                         return LDB_ERR_OPERATIONS_ERROR;
437                 }
438
439                 if (el->num_values == 0) {
440                         return LDB_ERR_OPERATIONS_ERROR;
441                 }
442
443                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 list->count = el->values[0].length / LTDB_GUID_SIZE;
448                 list->dn = talloc_array(list, struct ldb_val, list->count);
449
450                 /*
451                  * The actual data is on msg, due to
452                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
453                  */
454                 talloc_steal(list->dn, msg);
455                 for (i = 0; i < list->count; i++) {
456                         list->dn[i].data
457                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
458                         list->dn[i].length = LTDB_GUID_SIZE;
459                 }
460         }
461
462         /* We don't need msg->elements any more */
463         talloc_free(msg->elements);
464         return LDB_SUCCESS;
465 }
466
467 int ltdb_key_dn_from_idx(struct ldb_module *module,
468                          struct ltdb_private *ltdb,
469                          TALLOC_CTX *mem_ctx,
470                          struct ldb_dn *dn,
471                          TDB_DATA *tdb_key)
472 {
473         struct ldb_context *ldb = ldb_module_get_ctx(module);
474         int ret;
475         int index = 0;
476         enum key_truncation truncation = KEY_NOT_TRUNCATED;
477         struct dn_list *list = talloc(mem_ctx, struct dn_list);
478         if (list == NULL) {
479                 ldb_oom(ldb);
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483
484         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
485         if (ret != LDB_SUCCESS) {
486                 TALLOC_FREE(list);
487                 return ret;
488         }
489
490         if (list->count == 0) {
491                 TALLOC_FREE(list);
492                 return LDB_ERR_NO_SUCH_OBJECT;
493         }
494
495         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
496                 const char *dn_str = ldb_dn_get_linearized(dn);
497                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
498                                        __location__
499                                        ": Failed to read DN index "
500                                        "against %s for %s: too many "
501                                        "values (%u > 1)",
502                                        ltdb->cache->GUID_index_attribute,
503                                        dn_str, list->count);
504                 TALLOC_FREE(list);
505                 return LDB_ERR_CONSTRAINT_VIOLATION;
506         }
507
508         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
509                 /*
510                  * DN key has been truncated, need to inspect the actual
511                  * records to locate the actual DN
512                  */
513                 int i;
514                 index = -1;
515                 for (i=0; i < list->count; i++) {
516                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
517                         TDB_DATA key = {
518                                 .dptr = guid_key,
519                                 .dsize = sizeof(guid_key)
520                         };
521                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
522                         struct ldb_message *rec = ldb_msg_new(ldb);
523                         if (rec == NULL) {
524                                 return LDB_ERR_OPERATIONS_ERROR;
525                         }
526
527                         ret = ltdb_idx_to_key(module, ltdb,
528                                               ldb, &list->dn[i],
529                                               &key);
530                         if (ret != LDB_SUCCESS) {
531                                 TALLOC_FREE(list);
532                                 TALLOC_FREE(rec);
533                                 return ret;
534                         }
535
536                         ret = ltdb_search_key(module, ltdb, key,
537                                               rec, flags);
538                         if (key.dptr != guid_key) {
539                                 TALLOC_FREE(key.dptr);
540                         }
541                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
542                                 /*
543                                  * the record has disappeared?
544                                  * yes, this can happen
545                                  */
546                                 TALLOC_FREE(rec);
547                                 continue;
548                         }
549
550                         if (ret != LDB_SUCCESS) {
551                                 /* an internal error */
552                                 TALLOC_FREE(rec);
553                                 TALLOC_FREE(list);
554                                 return LDB_ERR_OPERATIONS_ERROR;
555                         }
556
557                         /*
558                          * We found the actual DN that we wanted from in the
559                          * multiple values that matched the index
560                          * (due to truncation), so return that.
561                          *
562                          */
563                         if (ldb_dn_compare(dn, rec->dn) == 0) {
564                                 index = i;
565                                 TALLOC_FREE(rec);
566                                 break;
567                         }
568                 }
569
570                 /*
571                  * We matched the index but the actual DN we wanted
572                  * was not here.
573                  */
574                 if (index == -1) {
575                         TALLOC_FREE(list);
576                         return LDB_ERR_NO_SUCH_OBJECT;
577                 }
578         }
579
580         /* The tdb_key memory is allocated by the caller */
581         ret = ltdb_guid_to_key(module, ltdb,
582                                &list->dn[index], tdb_key);
583         TALLOC_FREE(list);
584
585         if (ret != LDB_SUCCESS) {
586                 return LDB_ERR_OPERATIONS_ERROR;
587         }
588
589         return LDB_SUCCESS;
590 }
591
592
593
594 /*
595   save a dn_list into a full @IDX style record
596  */
597 static int ltdb_dn_list_store_full(struct ldb_module *module,
598                                    struct ltdb_private *ltdb,
599                                    struct ldb_dn *dn,
600                                    struct dn_list *list)
601 {
602         struct ldb_message *msg;
603         int ret;
604
605         msg = ldb_msg_new(module);
606         if (!msg) {
607                 return ldb_module_oom(module);
608         }
609
610         msg->dn = dn;
611
612         if (list->count == 0) {
613                 ret = ltdb_delete_noindex(module, msg);
614                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
615                         talloc_free(msg);
616                         return LDB_SUCCESS;
617                 }
618                 return ret;
619         }
620
621         if (ltdb->cache->GUID_index_attribute == NULL) {
622                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
623                                       LTDB_INDEXING_VERSION);
624                 if (ret != LDB_SUCCESS) {
625                         talloc_free(msg);
626                         return ldb_module_oom(module);
627                 }
628         } else {
629                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
630                                       LTDB_GUID_INDEXING_VERSION);
631                 if (ret != LDB_SUCCESS) {
632                         talloc_free(msg);
633                         return ldb_module_oom(module);
634                 }
635         }
636
637         if (list->count > 0) {
638                 struct ldb_message_element *el;
639
640                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
641                 if (ret != LDB_SUCCESS) {
642                         talloc_free(msg);
643                         return ldb_module_oom(module);
644                 }
645
646                 if (ltdb->cache->GUID_index_attribute == NULL) {
647                         el->values = list->dn;
648                         el->num_values = list->count;
649                 } else {
650                         struct ldb_val v;
651                         unsigned int i;
652                         el->values = talloc_array(msg,
653                                                   struct ldb_val, 1);
654                         if (el->values == NULL) {
655                                 talloc_free(msg);
656                                 return ldb_module_oom(module);
657                         }
658
659                         v.data = talloc_array_size(el->values,
660                                                    list->count,
661                                                    LTDB_GUID_SIZE);
662                         if (v.data == NULL) {
663                                 talloc_free(msg);
664                                 return ldb_module_oom(module);
665                         }
666
667                         v.length = talloc_get_size(v.data);
668
669                         for (i = 0; i < list->count; i++) {
670                                 if (list->dn[i].length !=
671                                     LTDB_GUID_SIZE) {
672                                         talloc_free(msg);
673                                         return ldb_module_operr(module);
674                                 }
675                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
676                                        list->dn[i].data,
677                                        LTDB_GUID_SIZE);
678                         }
679                         el->values[0] = v;
680                         el->num_values = 1;
681                 }
682         }
683
684         ret = ltdb_store(module, msg, TDB_REPLACE);
685         talloc_free(msg);
686         return ret;
687 }
688
689 /*
690   save a dn_list into the database, in either @IDX or internal format
691  */
692 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
693                               struct dn_list *list)
694 {
695         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
696         TDB_DATA rec, key;
697         int ret;
698         struct dn_list *list2;
699
700         if (ltdb->idxptr == NULL) {
701                 return ltdb_dn_list_store_full(module, ltdb,
702                                                dn, list);
703         }
704
705         if (ltdb->idxptr->itdb == NULL) {
706                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
707                 if (ltdb->idxptr->itdb == NULL) {
708                         return LDB_ERR_OPERATIONS_ERROR;
709                 }
710         }
711
712         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
713         key.dsize = strlen((char *)key.dptr);
714
715         rec = tdb_fetch(ltdb->idxptr->itdb, key);
716         if (rec.dptr != NULL) {
717                 list2 = ltdb_index_idxptr(module, rec, false);
718                 if (list2 == NULL) {
719                         free(rec.dptr);
720                         return LDB_ERR_OPERATIONS_ERROR;
721                 }
722                 free(rec.dptr);
723                 list2->dn = talloc_steal(list2, list->dn);
724                 list2->count = list->count;
725                 return LDB_SUCCESS;
726         }
727
728         list2 = talloc(ltdb->idxptr, struct dn_list);
729         if (list2 == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         list2->dn = talloc_steal(list2, list->dn);
733         list2->count = list->count;
734
735         rec.dptr = (uint8_t *)&list2;
736         rec.dsize = sizeof(void *);
737
738
739         /*
740          * This is not a store into the main DB, but into an in-memory
741          * TDB, so we don't need a guard on ltdb->read_only
742          */
743         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
744         if (ret != 0) {
745                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
746         }
747         return LDB_SUCCESS;
748 }
749
750 /*
751   traverse function for storing the in-memory index entries on disk
752  */
753 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
754 {
755         struct ldb_module *module = state;
756         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
757         struct ldb_dn *dn;
758         struct ldb_context *ldb = ldb_module_get_ctx(module);
759         struct ldb_val v;
760         struct dn_list *list;
761
762         list = ltdb_index_idxptr(module, data, true);
763         if (list == NULL) {
764                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
765                 return -1;
766         }
767
768         v.data = key.dptr;
769         v.length = strnlen((char *)key.dptr, key.dsize);
770
771         dn = ldb_dn_from_ldb_val(module, ldb, &v);
772         if (dn == NULL) {
773                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
774                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
775                 return -1;
776         }
777
778         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
779                                                       dn, list);
780         talloc_free(dn);
781         if (ltdb->idxptr->error != 0) {
782                 return -1;
783         }
784         return 0;
785 }
786
787 /* cleanup the idxptr mode when transaction commits */
788 int ltdb_index_transaction_commit(struct ldb_module *module)
789 {
790         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
791         int ret;
792
793         struct ldb_context *ldb = ldb_module_get_ctx(module);
794
795         ldb_reset_err_string(ldb);
796
797         if (ltdb->idxptr->itdb) {
798                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
799                 tdb_close(ltdb->idxptr->itdb);
800         }
801
802         ret = ltdb->idxptr->error;
803         if (ret != LDB_SUCCESS) {
804                 if (!ldb_errstring(ldb)) {
805                         ldb_set_errstring(ldb, ldb_strerror(ret));
806                 }
807                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
808         }
809
810         talloc_free(ltdb->idxptr);
811         ltdb->idxptr = NULL;
812         return ret;
813 }
814
815 /* cleanup the idxptr mode when transaction cancels */
816 int ltdb_index_transaction_cancel(struct ldb_module *module)
817 {
818         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
819         if (ltdb->idxptr && ltdb->idxptr->itdb) {
820                 tdb_close(ltdb->idxptr->itdb);
821         }
822         talloc_free(ltdb->idxptr);
823         ltdb->idxptr = NULL;
824         return LDB_SUCCESS;
825 }
826
827
828 /*
829   return the dn key to be used for an index
830   the caller is responsible for freeing
831 */
832 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
833                                      struct ltdb_private *ltdb,
834                                      const char *attr, const struct ldb_val *value,
835                                      const struct ldb_schema_attribute **ap,
836                                      enum key_truncation *truncation)
837 {
838         struct ldb_dn *ret;
839         struct ldb_val v;
840         const struct ldb_schema_attribute *a = NULL;
841         char *attr_folded = NULL;
842         const char *attr_for_dn = NULL;
843         int r;
844         bool should_b64_encode;
845         unsigned max_key_length = ltdb_max_key_length(ltdb);
846         unsigned key_len = 0;
847         unsigned attr_len = 0;
848         unsigned indx_len = 0;
849         unsigned frmt_len = 0;
850
851         if (attr[0] == '@') {
852                 attr_for_dn = attr;
853                 v = *value;
854                 if (ap != NULL) {
855                         *ap = NULL;
856                 }
857         } else {
858                 attr_folded = ldb_attr_casefold(ldb, attr);
859                 if (!attr_folded) {
860                         return NULL;
861                 }
862
863                 attr_for_dn = attr_folded;
864
865                 a = ldb_schema_attribute_by_name(ldb, attr);
866                 if (ap) {
867                         *ap = a;
868                 }
869                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
870                 if (r != LDB_SUCCESS) {
871                         const char *errstr = ldb_errstring(ldb);
872                         /* canonicalisation can be refused. For
873                            example, a attribute that takes wildcards
874                            will refuse to canonicalise if the value
875                            contains a wildcard */
876                         ldb_asprintf_errstring(ldb,
877                                                "Failed to create index "
878                                                "key for attribute '%s':%s%s%s",
879                                                attr, ldb_strerror(r),
880                                                (errstr?":":""),
881                                                (errstr?errstr:""));
882                         talloc_free(attr_folded);
883                         return NULL;
884                 }
885         }
886         attr_len = strlen(attr_for_dn);
887         indx_len = strlen(LTDB_INDEX);
888
889         /*
890          * We do not base 64 encode a DN in a key, it has already been
891          * casefold and lineraized, that is good enough.  That already
892          * avoids embedded NUL etc.
893          */
894         if (ltdb->cache->GUID_index_attribute != NULL) {
895                 if (strcmp(attr, LTDB_IDXDN) == 0) {
896                         should_b64_encode = false;
897                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
898                         /*
899                          * We can only change the behaviour for IDXONE
900                          * when the GUID index is enabled
901                          */
902                         should_b64_encode = false;
903                 } else {
904                         should_b64_encode
905                                 = ldb_should_b64_encode(ldb, &v);
906                 }
907         } else {
908                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
909         }
910
911         if (should_b64_encode) {
912                 unsigned vstr_len = 0;
913                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
914                 if (!vstr) {
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918                 vstr_len = strlen(vstr);
919                 key_len = 3 + indx_len + attr_len + vstr_len;
920                 if (key_len > max_key_length) {
921                         unsigned excess = key_len - max_key_length;
922                         frmt_len = vstr_len - excess;
923                         *truncation = KEY_TRUNCATED;
924                         /*
925                         * Truncated keys are placed in a separate key space
926                         * from the non truncated keys
927                         * Note: the double hash "##" is not a typo and
928                         * indicates that the following value is base64 encoded
929                         */
930                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
931                                              LTDB_INDEX, attr_for_dn,
932                                              frmt_len, vstr);
933                 } else {
934                         frmt_len = vstr_len;
935                         *truncation = KEY_NOT_TRUNCATED;
936                         /*
937                          * Note: the double colon "::" is not a typo and
938                          * indicates that the following value is base64 encoded
939                          */
940                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
941                                              LTDB_INDEX, attr_for_dn,
942                                              frmt_len, vstr);
943                 }
944                 talloc_free(vstr);
945         } else {
946                 key_len = 2 + indx_len + attr_len + (int)v.length;
947                 if (key_len > max_key_length) {
948                         unsigned excess = key_len - max_key_length;
949                         frmt_len = v.length - excess;
950                         *truncation = KEY_TRUNCATED;
951                         /*
952                          * Truncated keys are placed in a separate key space
953                          * from the non truncated keys
954                          */
955                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
956                                              LTDB_INDEX, attr_for_dn,
957                                              frmt_len, (char *)v.data);
958                 } else {
959                         frmt_len = v.length;
960                         *truncation = KEY_NOT_TRUNCATED;
961                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
962                                              LTDB_INDEX, attr_for_dn,
963                                              frmt_len, (char *)v.data);
964                 }
965         }
966
967         if (v.data != value->data) {
968                 talloc_free(v.data);
969         }
970         talloc_free(attr_folded);
971
972         return ret;
973 }
974
975 /*
976   see if a attribute value is in the list of indexed attributes
977 */
978 static bool ltdb_is_indexed(struct ldb_module *module,
979                             struct ltdb_private *ltdb,
980                             const char *attr)
981 {
982         struct ldb_context *ldb = ldb_module_get_ctx(module);
983         unsigned int i;
984         struct ldb_message_element *el;
985
986         if ((ltdb->cache->GUID_index_attribute != NULL) &&
987             (ldb_attr_cmp(attr,
988                           ltdb->cache->GUID_index_attribute) == 0)) {
989                 /* Implicity covered, this is the index key */
990                 return false;
991         }
992         if (ldb->schema.index_handler_override) {
993                 const struct ldb_schema_attribute *a
994                         = ldb_schema_attribute_by_name(ldb, attr);
995
996                 if (a == NULL) {
997                         return false;
998                 }
999
1000                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1001                         return true;
1002                 } else {
1003                         return false;
1004                 }
1005         }
1006
1007         if (!ltdb->cache->attribute_indexes) {
1008                 return false;
1009         }
1010
1011         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
1012         if (el == NULL) {
1013                 return false;
1014         }
1015
1016         /* TODO: this is too expensive! At least use a binary search */
1017         for (i=0; i<el->num_values; i++) {
1018                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1019                         return true;
1020                 }
1021         }
1022         return false;
1023 }
1024
1025 /*
1026   in the following logic functions, the return value is treated as
1027   follows:
1028
1029      LDB_SUCCESS: we found some matching index values
1030
1031      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1032
1033      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1034                                we'll need a full search
1035  */
1036
1037 /*
1038   return a list of dn's that might match a simple indexed search (an
1039   equality search only)
1040  */
1041 static int ltdb_index_dn_simple(struct ldb_module *module,
1042                                 struct ltdb_private *ltdb,
1043                                 const struct ldb_parse_tree *tree,
1044                                 struct dn_list *list)
1045 {
1046         struct ldb_context *ldb;
1047         struct ldb_dn *dn;
1048         int ret;
1049         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1050
1051         ldb = ldb_module_get_ctx(module);
1052
1053         list->count = 0;
1054         list->dn = NULL;
1055
1056         /* if the attribute isn't in the list of indexed attributes then
1057            this node needs a full search */
1058         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
1059                 return LDB_ERR_OPERATIONS_ERROR;
1060         }
1061
1062         /* the attribute is indexed. Pull the list of DNs that match the
1063            search criterion */
1064         dn = ltdb_index_key(ldb, ltdb,
1065                             tree->u.equality.attr,
1066                             &tree->u.equality.value, NULL, &truncation);
1067         /*
1068          * We ignore truncation here and allow multi-valued matches
1069          * as ltdb_search_indexed will filter out the wrong one in
1070          * ltdb_index_filter() which calls ldb_match_message().
1071          */
1072         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1073
1074         ret = ltdb_dn_list_load(module, ltdb, dn, list);
1075         talloc_free(dn);
1076         return ret;
1077 }
1078
1079
1080 static bool list_union(struct ldb_context *ldb,
1081                        struct ltdb_private *ltdb,
1082                        struct dn_list *list, struct dn_list *list2);
1083
1084 /*
1085   return a list of dn's that might match a leaf indexed search
1086  */
1087 static int ltdb_index_dn_leaf(struct ldb_module *module,
1088                               struct ltdb_private *ltdb,
1089                               const struct ldb_parse_tree *tree,
1090                               struct dn_list *list)
1091 {
1092         if (ltdb->disallow_dn_filter &&
1093             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1094                 /* in AD mode we do not support "(dn=...)" search filters */
1095                 list->dn = NULL;
1096                 list->count = 0;
1097                 return LDB_SUCCESS;
1098         }
1099         if (tree->u.equality.attr[0] == '@') {
1100                 /* Do not allow a indexed search against an @ */
1101                 list->dn = NULL;
1102                 list->count = 0;
1103                 return LDB_SUCCESS;
1104         }
1105         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1106                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1107                 struct ldb_dn *dn
1108                         = ldb_dn_from_ldb_val(list,
1109                                               ldb_module_get_ctx(module),
1110                                               &tree->u.equality.value);
1111                 if (dn == NULL) {
1112                         /* If we can't parse it, no match */
1113                         list->dn = NULL;
1114                         list->count = 0;
1115                         return LDB_SUCCESS;
1116                 }
1117
1118                 /*
1119                  * Re-use the same code we use for a SCOPE_BASE
1120                  * search
1121                  *
1122                  * We can't call TALLOC_FREE(dn) as this must belong
1123                  * to list for the memory to remain valid.
1124                  */
1125                 return ltdb_index_dn_base_dn(module, ltdb, dn, list,
1126                                              &truncation);
1127                 /*
1128                  * We ignore truncation here and allow multi-valued matches
1129                  * as ltdb_search_indexed will filter out the wrong one in
1130                  * ltdb_index_filter() which calls ldb_match_message().
1131                  */
1132
1133         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
1134                    (ldb_attr_cmp(tree->u.equality.attr,
1135                                  ltdb->cache->GUID_index_attribute) == 0)) {
1136                 int ret;
1137                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1138                 list->dn = talloc_array(list, struct ldb_val, 1);
1139                 if (list->dn == NULL) {
1140                         ldb_module_oom(module);
1141                         return LDB_ERR_OPERATIONS_ERROR;
1142                 }
1143                 /*
1144                  * We need to go via the canonicalise_fn() to
1145                  * ensure we get the index in binary, rather
1146                  * than a string
1147                  */
1148                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
1149                                                                list->dn,
1150                                                                &tree->u.equality.value,
1151                                                                &list->dn[0]);
1152                 if (ret != LDB_SUCCESS) {
1153                         return LDB_ERR_OPERATIONS_ERROR;
1154                 }
1155                 list->count = 1;
1156                 return LDB_SUCCESS;
1157         }
1158
1159         return ltdb_index_dn_simple(module, ltdb, tree, list);
1160 }
1161
1162
1163 /*
1164   list intersection
1165   list = list & list2
1166 */
1167 static bool list_intersect(struct ldb_context *ldb,
1168                            struct ltdb_private *ltdb,
1169                            struct dn_list *list, const struct dn_list *list2)
1170 {
1171         const struct dn_list *short_list, *long_list;
1172         struct dn_list *list3;
1173         unsigned int i;
1174
1175         if (list->count == 0) {
1176                 /* 0 & X == 0 */
1177                 return true;
1178         }
1179         if (list2->count == 0) {
1180                 /* X & 0 == 0 */
1181                 list->count = 0;
1182                 list->dn = NULL;
1183                 return true;
1184         }
1185
1186         /* the indexing code is allowed to return a longer list than
1187            what really matches, as all results are filtered by the
1188            full expression at the end - this shortcut avoids a lot of
1189            work in some cases */
1190         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1191                 return true;
1192         }
1193         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1194                 list->count = list2->count;
1195                 list->dn = list2->dn;
1196                 /* note that list2 may not be the parent of list2->dn,
1197                    as list2->dn may be owned by ltdb->idxptr. In that
1198                    case we expect this reparent call to fail, which is
1199                    OK */
1200                 talloc_reparent(list2, list, list2->dn);
1201                 return true;
1202         }
1203
1204         if (list->count > list2->count) {
1205                 short_list = list2;
1206                 long_list = list;
1207         } else {
1208                 short_list = list;
1209                 long_list = list2;
1210         }
1211
1212         list3 = talloc_zero(list, struct dn_list);
1213         if (list3 == NULL) {
1214                 return false;
1215         }
1216
1217         list3->dn = talloc_array(list3, struct ldb_val,
1218                                  MIN(list->count, list2->count));
1219         if (!list3->dn) {
1220                 talloc_free(list3);
1221                 return false;
1222         }
1223         list3->count = 0;
1224
1225         for (i=0;i<short_list->count;i++) {
1226                 /* For the GUID index case, this is a binary search */
1227                 if (ltdb_dn_list_find_val(ltdb, long_list,
1228                                           &short_list->dn[i]) != -1) {
1229                         list3->dn[list3->count] = short_list->dn[i];
1230                         list3->count++;
1231                 }
1232         }
1233
1234         list->strict |= list2->strict;
1235         list->dn = talloc_steal(list, list3->dn);
1236         list->count = list3->count;
1237         talloc_free(list3);
1238
1239         return true;
1240 }
1241
1242
1243 /*
1244   list union
1245   list = list | list2
1246 */
1247 static bool list_union(struct ldb_context *ldb,
1248                        struct ltdb_private *ltdb,
1249                        struct dn_list *list, struct dn_list *list2)
1250 {
1251         struct ldb_val *dn3;
1252         unsigned int i = 0, j = 0, k = 0;
1253
1254         if (list2->count == 0) {
1255                 /* X | 0 == X */
1256                 return true;
1257         }
1258
1259         if (list->count == 0) {
1260                 /* 0 | X == X */
1261                 list->count = list2->count;
1262                 list->dn = list2->dn;
1263                 /* note that list2 may not be the parent of list2->dn,
1264                    as list2->dn may be owned by ltdb->idxptr. In that
1265                    case we expect this reparent call to fail, which is
1266                    OK */
1267                 talloc_reparent(list2, list, list2->dn);
1268                 return true;
1269         }
1270
1271         /*
1272          * Sort the lists (if not in GUID DN mode) so we can do
1273          * the de-duplication during the merge
1274          */
1275         ltdb_dn_list_sort(ltdb, list);
1276         ltdb_dn_list_sort(ltdb, list2);
1277
1278         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1279         if (!dn3) {
1280                 ldb_oom(ldb);
1281                 return false;
1282         }
1283
1284         while (i < list->count || j < list2->count) {
1285                 int cmp;
1286                 if (i >= list->count) {
1287                         cmp = 1;
1288                 } else if (j >= list2->count) {
1289                         cmp = -1;
1290                 } else {
1291                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1292                                                           &list2->dn[j]);
1293                 }
1294
1295                 if (cmp < 0) {
1296                         /* Take list */
1297                         dn3[k] = list->dn[i];
1298                         i++;
1299                         k++;
1300                 } else if (cmp > 0) {
1301                         /* Take list2 */
1302                         dn3[k] = list2->dn[j];
1303                         j++;
1304                         k++;
1305                 } else {
1306                         /* Equal, take list */
1307                         dn3[k] = list->dn[i];
1308                         i++;
1309                         j++;
1310                         k++;
1311                 }
1312         }
1313
1314         list->dn = dn3;
1315         list->count = k;
1316
1317         return true;
1318 }
1319
1320 static int ltdb_index_dn(struct ldb_module *module,
1321                          struct ltdb_private *ltdb,
1322                          const struct ldb_parse_tree *tree,
1323                          struct dn_list *list);
1324
1325
1326 /*
1327   process an OR list (a union)
1328  */
1329 static int ltdb_index_dn_or(struct ldb_module *module,
1330                             struct ltdb_private *ltdb,
1331                             const struct ldb_parse_tree *tree,
1332                             struct dn_list *list)
1333 {
1334         struct ldb_context *ldb;
1335         unsigned int i;
1336
1337         ldb = ldb_module_get_ctx(module);
1338
1339         list->dn = NULL;
1340         list->count = 0;
1341
1342         for (i=0; i<tree->u.list.num_elements; i++) {
1343                 struct dn_list *list2;
1344                 int ret;
1345
1346                 list2 = talloc_zero(list, struct dn_list);
1347                 if (list2 == NULL) {
1348                         return LDB_ERR_OPERATIONS_ERROR;
1349                 }
1350
1351                 ret = ltdb_index_dn(module, ltdb,
1352                                     tree->u.list.elements[i], list2);
1353
1354                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1355                         /* X || 0 == X */
1356                         talloc_free(list2);
1357                         continue;
1358                 }
1359
1360                 if (ret != LDB_SUCCESS) {
1361                         /* X || * == * */
1362                         talloc_free(list2);
1363                         return ret;
1364                 }
1365
1366                 if (!list_union(ldb, ltdb, list, list2)) {
1367                         talloc_free(list2);
1368                         return LDB_ERR_OPERATIONS_ERROR;
1369                 }
1370         }
1371
1372         if (list->count == 0) {
1373                 return LDB_ERR_NO_SUCH_OBJECT;
1374         }
1375
1376         return LDB_SUCCESS;
1377 }
1378
1379
1380 /*
1381   NOT an index results
1382  */
1383 static int ltdb_index_dn_not(struct ldb_module *module,
1384                              struct ltdb_private *ltdb,
1385                              const struct ldb_parse_tree *tree,
1386                              struct dn_list *list)
1387 {
1388         /* the only way to do an indexed not would be if we could
1389            negate the not via another not or if we knew the total
1390            number of database elements so we could know that the
1391            existing expression covered the whole database.
1392
1393            instead, we just give up, and rely on a full index scan
1394            (unless an outer & manages to reduce the list)
1395         */
1396         return LDB_ERR_OPERATIONS_ERROR;
1397 }
1398
1399 /*
1400  * These things are unique, so avoid a full scan if this is a search
1401  * by GUID, DN or a unique attribute
1402  */
1403 static bool ltdb_index_unique(struct ldb_context *ldb,
1404                               struct ltdb_private *ltdb,
1405                               const char *attr)
1406 {
1407         const struct ldb_schema_attribute *a;
1408         if (ltdb->cache->GUID_index_attribute != NULL) {
1409                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1410                         return true;
1411                 }
1412         }
1413         if (ldb_attr_dn(attr) == 0) {
1414                 return true;
1415         }
1416
1417         a = ldb_schema_attribute_by_name(ldb, attr);
1418         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1419                 return true;
1420         }
1421         return false;
1422 }
1423
1424 /*
1425   process an AND expression (intersection)
1426  */
1427 static int ltdb_index_dn_and(struct ldb_module *module,
1428                              struct ltdb_private *ltdb,
1429                              const struct ldb_parse_tree *tree,
1430                              struct dn_list *list)
1431 {
1432         struct ldb_context *ldb;
1433         unsigned int i;
1434         bool found;
1435
1436         ldb = ldb_module_get_ctx(module);
1437
1438         list->dn = NULL;
1439         list->count = 0;
1440
1441         /* in the first pass we only look for unique simple
1442            equality tests, in the hope of avoiding having to look
1443            at any others */
1444         for (i=0; i<tree->u.list.num_elements; i++) {
1445                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1446                 int ret;
1447
1448                 if (subtree->operation != LDB_OP_EQUALITY ||
1449                     !ltdb_index_unique(ldb, ltdb,
1450                                        subtree->u.equality.attr)) {
1451                         continue;
1452                 }
1453
1454                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1455                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1456                         /* 0 && X == 0 */
1457                         return LDB_ERR_NO_SUCH_OBJECT;
1458                 }
1459                 if (ret == LDB_SUCCESS) {
1460                         /* a unique index match means we can
1461                          * stop. Note that we don't care if we return
1462                          * a few too many objects, due to later
1463                          * filtering */
1464                         return LDB_SUCCESS;
1465                 }
1466         }
1467
1468         /* now do a full intersection */
1469         found = false;
1470
1471         for (i=0; i<tree->u.list.num_elements; i++) {
1472                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1473                 struct dn_list *list2;
1474                 int ret;
1475
1476                 list2 = talloc_zero(list, struct dn_list);
1477                 if (list2 == NULL) {
1478                         return ldb_module_oom(module);
1479                 }
1480
1481                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1482
1483                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1484                         /* X && 0 == 0 */
1485                         list->dn = NULL;
1486                         list->count = 0;
1487                         talloc_free(list2);
1488                         return LDB_ERR_NO_SUCH_OBJECT;
1489                 }
1490
1491                 if (ret != LDB_SUCCESS) {
1492                         /* this didn't adding anything */
1493                         talloc_free(list2);
1494                         continue;
1495                 }
1496
1497                 if (!found) {
1498                         talloc_reparent(list2, list, list->dn);
1499                         list->dn = list2->dn;
1500                         list->count = list2->count;
1501                         found = true;
1502                 } else if (!list_intersect(ldb, ltdb,
1503                                            list, list2)) {
1504                         talloc_free(list2);
1505                         return LDB_ERR_OPERATIONS_ERROR;
1506                 }
1507
1508                 if (list->count == 0) {
1509                         list->dn = NULL;
1510                         return LDB_ERR_NO_SUCH_OBJECT;
1511                 }
1512
1513                 if (list->count < 2) {
1514                         /* it isn't worth loading the next part of the tree */
1515                         return LDB_SUCCESS;
1516                 }
1517         }
1518
1519         if (!found) {
1520                 /* none of the attributes were indexed */
1521                 return LDB_ERR_OPERATIONS_ERROR;
1522         }
1523
1524         return LDB_SUCCESS;
1525 }
1526
1527 /*
1528   return a list of matching objects using a one-level index
1529  */
1530 static int ltdb_index_dn_attr(struct ldb_module *module,
1531                               struct ltdb_private *ltdb,
1532                               const char *attr,
1533                               struct ldb_dn *dn,
1534                               struct dn_list *list,
1535                               enum key_truncation *truncation)
1536 {
1537         struct ldb_context *ldb;
1538         struct ldb_dn *key;
1539         struct ldb_val val;
1540         int ret;
1541
1542         ldb = ldb_module_get_ctx(module);
1543
1544         /* work out the index key from the parent DN */
1545         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1546         val.length = strlen((char *)val.data);
1547         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
1548         if (!key) {
1549                 ldb_oom(ldb);
1550                 return LDB_ERR_OPERATIONS_ERROR;
1551         }
1552
1553         ret = ltdb_dn_list_load(module, ltdb, key, list);
1554         talloc_free(key);
1555         if (ret != LDB_SUCCESS) {
1556                 return ret;
1557         }
1558
1559         if (list->count == 0) {
1560                 return LDB_ERR_NO_SUCH_OBJECT;
1561         }
1562
1563         return LDB_SUCCESS;
1564 }
1565
1566 /*
1567   return a list of matching objects using a one-level index
1568  */
1569 static int ltdb_index_dn_one(struct ldb_module *module,
1570                              struct ltdb_private *ltdb,
1571                              struct ldb_dn *parent_dn,
1572                              struct dn_list *list,
1573                              enum key_truncation *truncation)
1574 {
1575         /* Ensure we do not shortcut on intersection for this list */
1576         list->strict = true;
1577         return ltdb_index_dn_attr(module, ltdb,
1578                                   LTDB_IDXONE, parent_dn, list, truncation);
1579
1580 }
1581
1582 /*
1583   return a list of matching objects using the DN index
1584  */
1585 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1586                                  struct ltdb_private *ltdb,
1587                                  struct ldb_dn *base_dn,
1588                                  struct dn_list *dn_list,
1589                                  enum key_truncation *truncation)
1590 {
1591         const struct ldb_val *guid_val = NULL;
1592         if (ltdb->cache->GUID_index_attribute == NULL) {
1593                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1594                 if (dn_list->dn == NULL) {
1595                         return ldb_module_oom(module);
1596                 }
1597                 dn_list->dn[0].data = discard_const_p(unsigned char,
1598                                                       ldb_dn_get_linearized(base_dn));
1599                 if (dn_list->dn[0].data == NULL) {
1600                         return ldb_module_oom(module);
1601                 }
1602                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1603                 dn_list->count = 1;
1604
1605                 return LDB_SUCCESS;
1606         }
1607
1608         if (ltdb->cache->GUID_index_dn_component != NULL) {
1609                 guid_val = ldb_dn_get_extended_component(base_dn,
1610                                                          ltdb->cache->GUID_index_dn_component);
1611         }
1612
1613         if (guid_val != NULL) {
1614                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1615                 if (dn_list->dn == NULL) {
1616                         return ldb_module_oom(module);
1617                 }
1618                 dn_list->dn[0].data = guid_val->data;
1619                 dn_list->dn[0].length = guid_val->length;
1620                 dn_list->count = 1;
1621
1622                 return LDB_SUCCESS;
1623         }
1624
1625         return ltdb_index_dn_attr(module, ltdb,
1626                                   LTDB_IDXDN, base_dn, dn_list, truncation);
1627 }
1628
1629 /*
1630   return a list of dn's that might match a indexed search or
1631   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1632  */
1633 static int ltdb_index_dn(struct ldb_module *module,
1634                          struct ltdb_private *ltdb,
1635                          const struct ldb_parse_tree *tree,
1636                          struct dn_list *list)
1637 {
1638         int ret = LDB_ERR_OPERATIONS_ERROR;
1639
1640         switch (tree->operation) {
1641         case LDB_OP_AND:
1642                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1643                 break;
1644
1645         case LDB_OP_OR:
1646                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1647                 break;
1648
1649         case LDB_OP_NOT:
1650                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1651                 break;
1652
1653         case LDB_OP_EQUALITY:
1654                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1655                 break;
1656
1657         case LDB_OP_SUBSTRING:
1658         case LDB_OP_GREATER:
1659         case LDB_OP_LESS:
1660         case LDB_OP_PRESENT:
1661         case LDB_OP_APPROX:
1662         case LDB_OP_EXTENDED:
1663                 /* we can't index with fancy bitops yet */
1664                 ret = LDB_ERR_OPERATIONS_ERROR;
1665                 break;
1666         }
1667
1668         return ret;
1669 }
1670
1671 /*
1672   filter a candidate dn_list from an indexed search into a set of results
1673   extracting just the given attributes
1674 */
1675 static int ltdb_index_filter(struct ltdb_private *ltdb,
1676                              const struct dn_list *dn_list,
1677                              struct ltdb_context *ac,
1678                              uint32_t *match_count,
1679                              enum key_truncation scope_one_truncation)
1680 {
1681         struct ldb_context *ldb;
1682         struct ldb_message *msg;
1683         struct ldb_message *filtered_msg;
1684         unsigned int i;
1685         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1686
1687         ldb = ldb_module_get_ctx(ac->module);
1688
1689         for (i = 0; i < dn_list->count; i++) {
1690                 uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1691                 TDB_DATA tdb_key = {
1692                         .dptr = guid_key,
1693                         .dsize = sizeof(guid_key)
1694                 };
1695                 int ret;
1696                 bool matched;
1697
1698                 ret = ltdb_idx_to_key(ac->module, ltdb,
1699                                       ac, &dn_list->dn[i],
1700                                       &tdb_key);
1701                 if (ret != LDB_SUCCESS) {
1702                         return ret;
1703                 }
1704
1705                 if (ltdb->cache->GUID_index_attribute != NULL) {
1706                         /*
1707                          * If we are in GUID index mode, then the dn_list is
1708                          * sorted.  If we got a duplicate, forget about it, as
1709                          * otherwise we would send the same entry back more
1710                          * than once.
1711                          *
1712                          * This is needed in the truncated DN case, or if a
1713                          * duplicate was forced in via
1714                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1715                          */
1716
1717                         if (memcmp(previous_guid_key, tdb_key.dptr,
1718                                    sizeof(previous_guid_key)) == 0) {
1719                                 continue;
1720                         }
1721
1722                         memcpy(previous_guid_key, tdb_key.dptr,
1723                                sizeof(previous_guid_key));
1724                 }
1725
1726                 msg = ldb_msg_new(ac);
1727                 if (!msg) {
1728                         return LDB_ERR_OPERATIONS_ERROR;
1729                 }
1730
1731
1732                 ret = ltdb_search_key(ac->module, ltdb,
1733                                       tdb_key, msg,
1734                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1735                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1736                 if (tdb_key.dptr != guid_key) {
1737                         TALLOC_FREE(tdb_key.dptr);
1738                 }
1739                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1740                         /* the record has disappeared? yes, this can happen */
1741                         talloc_free(msg);
1742                         continue;
1743                 }
1744
1745                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1746                         /* an internal error */
1747                         talloc_free(msg);
1748                         return LDB_ERR_OPERATIONS_ERROR;
1749                 }
1750
1751                 /*
1752                  * We trust the index for LDB_SCOPE_ONELEVEL
1753                  * unless the index key has been truncated.
1754                  *
1755                  * LDB_SCOPE_BASE is not passed in by our only caller.
1756                  */
1757                 if (ac->scope == LDB_SCOPE_ONELEVEL
1758                     && ltdb->cache->one_level_indexes
1759                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1760                         ret = ldb_match_message(ldb, msg, ac->tree,
1761                                                 ac->scope, &matched);
1762                 } else {
1763                         ret = ldb_match_msg_error(ldb, msg,
1764                                                   ac->tree, ac->base,
1765                                                   ac->scope, &matched);
1766                 }
1767
1768                 if (ret != LDB_SUCCESS) {
1769                         talloc_free(msg);
1770                         return ret;
1771                 }
1772                 if (!matched) {
1773                         talloc_free(msg);
1774                         continue;
1775                 }
1776
1777                 /* filter the attributes that the user wants */
1778                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1779
1780                 talloc_free(msg);
1781
1782                 if (ret == -1) {
1783                         return LDB_ERR_OPERATIONS_ERROR;
1784                 }
1785
1786                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1787                 if (ret != LDB_SUCCESS) {
1788                         /* Regardless of success or failure, the msg
1789                          * is the callbacks responsiblity, and should
1790                          * not be talloc_free()'ed */
1791                         ac->request_terminated = true;
1792                         return ret;
1793                 }
1794
1795                 (*match_count)++;
1796         }
1797
1798         return LDB_SUCCESS;
1799 }
1800
1801 /*
1802   sort a DN list
1803  */
1804 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1805                               struct dn_list *list)
1806 {
1807         if (list->count < 2) {
1808                 return;
1809         }
1810
1811         /* We know the list is sorted when using the GUID index */
1812         if (ltdb->cache->GUID_index_attribute != NULL) {
1813                 return;
1814         }
1815
1816         TYPESAFE_QSORT(list->dn, list->count,
1817                        ldb_val_equal_exact_for_qsort);
1818 }
1819
1820 /*
1821   search the database with a LDAP-like expression using indexes
1822   returns -1 if an indexed search is not possible, in which
1823   case the caller should call ltdb_search_full()
1824 */
1825 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1826 {
1827         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1828         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1829         struct dn_list *dn_list;
1830         int ret;
1831         enum ldb_scope index_scope;
1832         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1833
1834         /* see if indexing is enabled */
1835         if (!ltdb->cache->attribute_indexes &&
1836             !ltdb->cache->one_level_indexes &&
1837             ac->scope != LDB_SCOPE_BASE) {
1838                 /* fallback to a full search */
1839                 return LDB_ERR_OPERATIONS_ERROR;
1840         }
1841
1842         dn_list = talloc_zero(ac, struct dn_list);
1843         if (dn_list == NULL) {
1844                 return ldb_module_oom(ac->module);
1845         }
1846
1847         /*
1848          * For the purposes of selecting the switch arm below, if we
1849          * don't have a one-level index then treat it like a subtree
1850          * search
1851          */
1852         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1853             !ltdb->cache->one_level_indexes) {
1854                 index_scope = LDB_SCOPE_SUBTREE;
1855         } else {
1856                 index_scope = ac->scope;
1857         }
1858
1859         switch (index_scope) {
1860         case LDB_SCOPE_BASE:
1861                 /*
1862                  * The only caller will have filtered the operation out
1863                  * so we should never get here
1864                  */
1865                 return ldb_operr(ldb);
1866
1867         case LDB_SCOPE_ONELEVEL:
1868                 /*
1869                  * If we ever start to also load the index values for
1870                  * the tree, we must ensure we strictly intersect with
1871                  * this list, as we trust the ONELEVEL index
1872                  */
1873                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
1874                                         &scope_one_truncation);
1875                 if (ret != LDB_SUCCESS) {
1876                         talloc_free(dn_list);
1877                         return ret;
1878                 }
1879
1880                 /*
1881                  * If we have too many matches, running the filter
1882                  * tree over the SCOPE_ONELEVEL can be quite expensive
1883                  * so we now check the filter tree index as well.
1884                  *
1885                  * We only do this in the GUID index mode, which is
1886                  * O(n*log(m)) otherwise the intersection below will
1887                  * be too costly at O(n*m).
1888                  *
1889                  * We don't set a heuristic for 'too many' but instead
1890                  * do it always and rely on the index lookup being
1891                  * fast enough in the small case.
1892                  */
1893                 if (ltdb->cache->GUID_index_attribute != NULL) {
1894                         struct dn_list *idx_one_tree_list
1895                                 = talloc_zero(ac, struct dn_list);
1896                         if (idx_one_tree_list == NULL) {
1897                                 return ldb_module_oom(ac->module);
1898                         }
1899
1900                         if (!ltdb->cache->attribute_indexes) {
1901                                 talloc_free(idx_one_tree_list);
1902                                 talloc_free(dn_list);
1903                                 return LDB_ERR_OPERATIONS_ERROR;
1904                         }
1905                         /*
1906                          * Here we load the index for the tree.
1907                          */
1908                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
1909                                             idx_one_tree_list);
1910                         if (ret != LDB_SUCCESS) {
1911                                 talloc_free(idx_one_tree_list);
1912                                 talloc_free(dn_list);
1913                                 return ret;
1914                         }
1915
1916                         if (!list_intersect(ldb, ltdb,
1917                                             dn_list, idx_one_tree_list)) {
1918                                 talloc_free(idx_one_tree_list);
1919                                 talloc_free(dn_list);
1920                                 return LDB_ERR_OPERATIONS_ERROR;
1921                         }
1922                 }
1923                 break;
1924
1925         case LDB_SCOPE_SUBTREE:
1926         case LDB_SCOPE_DEFAULT:
1927                 if (!ltdb->cache->attribute_indexes) {
1928                         talloc_free(dn_list);
1929                         return LDB_ERR_OPERATIONS_ERROR;
1930                 }
1931                 /*
1932                  * Here we load the index for the tree.  We have no
1933                  * index for the subtree.
1934                  */
1935                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1936                 if (ret != LDB_SUCCESS) {
1937                         talloc_free(dn_list);
1938                         return ret;
1939                 }
1940                 break;
1941         }
1942
1943         /*
1944          * It is critical that this function do the re-filter even
1945          * on things found by the index as the index can over-match
1946          * in cases of truncation (as well as when it decides it is
1947          * not worth further filtering)
1948          *
1949          * If this changes, then the index code above would need to
1950          * pass up a flag to say if any index was truncated during
1951          * processing as the truncation here refers only to the
1952          * SCOPE_ONELEVEL index.
1953          */
1954         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
1955                                 scope_one_truncation);
1956         talloc_free(dn_list);
1957         return ret;
1958 }
1959
1960 /**
1961  * @brief Add a DN in the index list of a given attribute name/value pair
1962  *
1963  * This function will add the DN in the index list for the index for
1964  * the given attribute name and value.
1965  *
1966  * @param[in]  module       A ldb_module structure
1967  *
1968  * @param[in]  dn           The string representation of the DN as it
1969  *                          will be stored in the index entry
1970  *
1971  * @param[in]  el           A ldb_message_element array, one of the entry
1972  *                          referred by the v_idx is the attribute name and
1973  *                          value pair which will be used to construct the
1974  *                          index name
1975  *
1976  * @param[in]  v_idx        The index of element in the el array to use
1977  *
1978  * @return                  An ldb error code
1979  */
1980 static int ltdb_index_add1(struct ldb_module *module,
1981                            struct ltdb_private *ltdb,
1982                            const struct ldb_message *msg,
1983                            struct ldb_message_element *el, int v_idx)
1984 {
1985         struct ldb_context *ldb;
1986         struct ldb_dn *dn_key;
1987         int ret;
1988         const struct ldb_schema_attribute *a;
1989         struct dn_list *list;
1990         unsigned alloc_len;
1991         enum key_truncation truncation = KEY_TRUNCATED;
1992
1993
1994         ldb = ldb_module_get_ctx(module);
1995
1996         list = talloc_zero(module, struct dn_list);
1997         if (list == NULL) {
1998                 return LDB_ERR_OPERATIONS_ERROR;
1999         }
2000
2001         dn_key = ltdb_index_key(ldb, ltdb,
2002                                 el->name, &el->values[v_idx], &a, &truncation);
2003         if (!dn_key) {
2004                 talloc_free(list);
2005                 return LDB_ERR_OPERATIONS_ERROR;
2006         }
2007         /*
2008          * Samba only maintains unique indexes on the objectSID and objectGUID
2009          * so if a unique index key exceeds the maximum length there is a
2010          * problem.
2011          */
2012         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2013                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2014                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2015
2016                 ldb_asprintf_errstring(
2017                         ldb,
2018                         __location__ ": unique index key on %s in %s, "
2019                         "exceeds maximum key length of %u (encoded).",
2020                         el->name,
2021                         ldb_dn_get_linearized(msg->dn),
2022                         ltdb->max_key_length);
2023                 talloc_free(list);
2024                 return LDB_ERR_CONSTRAINT_VIOLATION;
2025         }
2026         talloc_steal(list, dn_key);
2027
2028         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2029         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2030                 talloc_free(list);
2031                 return ret;
2032         }
2033
2034         /*
2035          * Check for duplicates in the @IDXDN DN -> GUID record
2036          *
2037          * This is very normal, it just means a duplicate DN creation
2038          * was attempted, so don't set the error string or print scary
2039          * messages.
2040          */
2041         if (list->count > 0 &&
2042             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2043             truncation == KEY_NOT_TRUNCATED) {
2044
2045                 talloc_free(list);
2046                 return LDB_ERR_CONSTRAINT_VIOLATION;
2047
2048         } else if (list->count > 0
2049                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2050
2051                 /*
2052                  * At least one existing entry in the DN->GUID index, which
2053                  * arises when the DN indexes have been truncated
2054                  *
2055                  * So need to pull the DN's to check if it's really a duplicate
2056                  */
2057                 int i;
2058                 for (i=0; i < list->count; i++) {
2059                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2060                         TDB_DATA key = {
2061                                 .dptr = guid_key,
2062                                 .dsize = sizeof(guid_key)
2063                         };
2064                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2065                         struct ldb_message *rec = ldb_msg_new(ldb);
2066                         if (rec == NULL) {
2067                                 return LDB_ERR_OPERATIONS_ERROR;
2068                         }
2069
2070                         ret = ltdb_idx_to_key(module, ltdb,
2071                                               ldb, &list->dn[i],
2072                                               &key);
2073                         if (ret != LDB_SUCCESS) {
2074                                 TALLOC_FREE(list);
2075                                 TALLOC_FREE(rec);
2076                                 return ret;
2077                         }
2078
2079                         ret = ltdb_search_key(module, ltdb, key,
2080                                               rec, flags);
2081                         if (key.dptr != guid_key) {
2082                                 TALLOC_FREE(key.dptr);
2083                         }
2084                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2085                                 /*
2086                                  * the record has disappeared?
2087                                  * yes, this can happen
2088                                  */
2089                                 talloc_free(rec);
2090                                 continue;
2091                         }
2092
2093                         if (ret != LDB_SUCCESS) {
2094                                 /* an internal error */
2095                                 TALLOC_FREE(rec);
2096                                 TALLOC_FREE(list);
2097                                 return LDB_ERR_OPERATIONS_ERROR;
2098                         }
2099                         /*
2100                          * The DN we are trying to add to the DB and index
2101                          * is already here, so we must deny the addition
2102                          */
2103                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2104                                 TALLOC_FREE(rec);
2105                                 TALLOC_FREE(list);
2106                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2107                         }
2108                 }
2109         }
2110
2111         /*
2112          * Check for duplicates in unique indexes
2113          *
2114          * We don't need to do a loop test like the @IDXDN case
2115          * above as we have a ban on long unique index values
2116          * at the start of this function.
2117          */
2118         if (list->count > 0 &&
2119             ((a != NULL
2120               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2121                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2122                 /*
2123                  * We do not want to print info about a possibly
2124                  * confidential DN that the conflict was with in the
2125                  * user-visible error string
2126                  */
2127
2128                 if (ltdb->cache->GUID_index_attribute == NULL) {
2129                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2130                                   __location__
2131                                   ": unique index violation on %s in %s, "
2132                                   "conficts with %*.*s in %s",
2133                                   el->name, ldb_dn_get_linearized(msg->dn),
2134                                   (int)list->dn[0].length,
2135                                   (int)list->dn[0].length,
2136                                   list->dn[0].data,
2137                                   ldb_dn_get_linearized(dn_key));
2138                 } else {
2139                         /* This can't fail, gives a default at worst */
2140                         const struct ldb_schema_attribute *attr
2141                                 = ldb_schema_attribute_by_name(
2142                                         ldb,
2143                                         ltdb->cache->GUID_index_attribute);
2144                         struct ldb_val v;
2145                         ret = attr->syntax->ldif_write_fn(ldb, list,
2146                                                           &list->dn[0], &v);
2147                         if (ret == LDB_SUCCESS) {
2148                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2149                                           __location__
2150                                           ": unique index violation on %s in "
2151                                           "%s, conficts with %s %*.*s in %s",
2152                                           el->name,
2153                                           ldb_dn_get_linearized(msg->dn),
2154                                           ltdb->cache->GUID_index_attribute,
2155                                           (int)v.length,
2156                                           (int)v.length,
2157                                           v.data,
2158                                           ldb_dn_get_linearized(dn_key));
2159                         }
2160                 }
2161                 ldb_asprintf_errstring(ldb,
2162                                        __location__ ": unique index violation "
2163                                        "on %s in %s",
2164                                        el->name,
2165                                        ldb_dn_get_linearized(msg->dn));
2166                 talloc_free(list);
2167                 return LDB_ERR_CONSTRAINT_VIOLATION;
2168         }
2169
2170         /* overallocate the list a bit, to reduce the number of
2171          * realloc trigered copies */
2172         alloc_len = ((list->count+1)+7) & ~7;
2173         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2174         if (list->dn == NULL) {
2175                 talloc_free(list);
2176                 return LDB_ERR_OPERATIONS_ERROR;
2177         }
2178
2179         if (ltdb->cache->GUID_index_attribute == NULL) {
2180                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2181                 list->dn[list->count].data
2182                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2183                 if (list->dn[list->count].data == NULL) {
2184                         talloc_free(list);
2185                         return LDB_ERR_OPERATIONS_ERROR;
2186                 }
2187                 list->dn[list->count].length = strlen(dn_str);
2188         } else {
2189                 const struct ldb_val *key_val;
2190                 struct ldb_val *exact = NULL, *next = NULL;
2191                 key_val = ldb_msg_find_ldb_val(msg,
2192                                                ltdb->cache->GUID_index_attribute);
2193                 if (key_val == NULL) {
2194                         talloc_free(list);
2195                         return ldb_module_operr(module);
2196                 }
2197
2198                 if (key_val->length != LTDB_GUID_SIZE) {
2199                         talloc_free(list);
2200                         return ldb_module_operr(module);
2201                 }
2202
2203                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2204                                         *key_val, ldb_val_equal_exact_ordered,
2205                                         exact, next);
2206
2207                 /*
2208                  * Give a warning rather than fail, this could be a
2209                  * duplicate value in the record allowed by a caller
2210                  * forcing in the value with
2211                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2212                  */
2213                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2214                         /* This can't fail, gives a default at worst */
2215                         const struct ldb_schema_attribute *attr
2216                                 = ldb_schema_attribute_by_name(
2217                                         ldb,
2218                                         ltdb->cache->GUID_index_attribute);
2219                         struct ldb_val v;
2220                         ret = attr->syntax->ldif_write_fn(ldb, list,
2221                                                           exact, &v);
2222                         if (ret == LDB_SUCCESS) {
2223                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2224                                           __location__
2225                                           ": duplicate attribute value in %s "
2226                                           "for index on %s, "
2227                                           "duplicate of %s %*.*s in %s",
2228                                           ldb_dn_get_linearized(msg->dn),
2229                                           el->name,
2230                                           ltdb->cache->GUID_index_attribute,
2231                                           (int)v.length,
2232                                           (int)v.length,
2233                                           v.data,
2234                                           ldb_dn_get_linearized(dn_key));
2235                         }
2236                 }
2237
2238                 if (next == NULL) {
2239                         next = &list->dn[list->count];
2240                 } else {
2241                         memmove(&next[1], next,
2242                                 sizeof(*next) * (list->count - (next - list->dn)));
2243                 }
2244                 *next = ldb_val_dup(list->dn, key_val);
2245                 if (next->data == NULL) {
2246                         talloc_free(list);
2247                         return ldb_module_operr(module);
2248                 }
2249         }
2250         list->count++;
2251
2252         ret = ltdb_dn_list_store(module, dn_key, list);
2253
2254         talloc_free(list);
2255
2256         return ret;
2257 }
2258
2259 /*
2260   add index entries for one elements in a message
2261  */
2262 static int ltdb_index_add_el(struct ldb_module *module,
2263                              struct ltdb_private *ltdb,
2264                              const struct ldb_message *msg,
2265                              struct ldb_message_element *el)
2266 {
2267         unsigned int i;
2268         for (i = 0; i < el->num_values; i++) {
2269                 int ret = ltdb_index_add1(module, ltdb,
2270                                           msg, el, i);
2271                 if (ret != LDB_SUCCESS) {
2272                         return ret;
2273                 }
2274         }
2275
2276         return LDB_SUCCESS;
2277 }
2278
2279 /*
2280   add index entries for all elements in a message
2281  */
2282 static int ltdb_index_add_all(struct ldb_module *module,
2283                               struct ltdb_private *ltdb,
2284                               const struct ldb_message *msg)
2285 {
2286         struct ldb_message_element *elements = msg->elements;
2287         unsigned int i;
2288         const char *dn_str;
2289         int ret;
2290
2291         if (ldb_dn_is_special(msg->dn)) {
2292                 return LDB_SUCCESS;
2293         }
2294
2295         dn_str = ldb_dn_get_linearized(msg->dn);
2296         if (dn_str == NULL) {
2297                 return LDB_ERR_OPERATIONS_ERROR;
2298         }
2299
2300         ret = ltdb_write_index_dn_guid(module, msg, 1);
2301         if (ret != LDB_SUCCESS) {
2302                 return ret;
2303         }
2304
2305         if (!ltdb->cache->attribute_indexes) {
2306                 /* no indexed fields */
2307                 return LDB_SUCCESS;
2308         }
2309
2310         for (i = 0; i < msg->num_elements; i++) {
2311                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
2312                         continue;
2313                 }
2314                 ret = ltdb_index_add_el(module, ltdb,
2315                                         msg, &elements[i]);
2316                 if (ret != LDB_SUCCESS) {
2317                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2318                         ldb_asprintf_errstring(ldb,
2319                                                __location__ ": Failed to re-index %s in %s - %s",
2320                                                elements[i].name, dn_str,
2321                                                ldb_errstring(ldb));
2322                         return ret;
2323                 }
2324         }
2325
2326         return LDB_SUCCESS;
2327 }
2328
2329
2330 /*
2331   insert a DN index for a message
2332 */
2333 static int ltdb_modify_index_dn(struct ldb_module *module,
2334                                 struct ltdb_private *ltdb,
2335                                 const struct ldb_message *msg,
2336                                 struct ldb_dn *dn,
2337                                 const char *index, int add)
2338 {
2339         struct ldb_message_element el;
2340         struct ldb_val val;
2341         int ret;
2342
2343         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2344         if (val.data == NULL) {
2345                 const char *dn_str = ldb_dn_get_linearized(dn);
2346                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2347                                        __location__
2348                                        ": Failed to modify %s "
2349                                        "against %s in %s: failed "
2350                                        "to get casefold DN",
2351                                        index,
2352                                        ltdb->cache->GUID_index_attribute,
2353                                        dn_str);
2354                 return LDB_ERR_OPERATIONS_ERROR;
2355         }
2356
2357         val.length = strlen((char *)val.data);
2358         el.name = index;
2359         el.values = &val;
2360         el.num_values = 1;
2361
2362         if (add) {
2363                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2364         } else { /* delete */
2365                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2366         }
2367
2368         if (ret != LDB_SUCCESS) {
2369                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2370                 const char *dn_str = ldb_dn_get_linearized(dn);
2371                 ldb_asprintf_errstring(ldb,
2372                                        __location__
2373                                        ": Failed to modify %s "
2374                                        "against %s in %s - %s",
2375                                        index,
2376                                        ltdb->cache->GUID_index_attribute,
2377                                        dn_str, ldb_errstring(ldb));
2378                 return ret;
2379         }
2380         return ret;
2381 }
2382
2383 /*
2384   insert a one level index for a message
2385 */
2386 static int ltdb_index_onelevel(struct ldb_module *module,
2387                                const struct ldb_message *msg, int add)
2388 {
2389         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2390                                                     struct ltdb_private);
2391         struct ldb_dn *pdn;
2392         int ret;
2393
2394         /* We index for ONE Level only if requested */
2395         if (!ltdb->cache->one_level_indexes) {
2396                 return LDB_SUCCESS;
2397         }
2398
2399         pdn = ldb_dn_get_parent(module, msg->dn);
2400         if (pdn == NULL) {
2401                 return LDB_ERR_OPERATIONS_ERROR;
2402         }
2403         ret = ltdb_modify_index_dn(module, ltdb,
2404                                    msg, pdn, LTDB_IDXONE, add);
2405
2406         talloc_free(pdn);
2407
2408         return ret;
2409 }
2410
2411 /*
2412   insert a one level index for a message
2413 */
2414 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2415                                     const struct ldb_message *msg,
2416                                     int add)
2417 {
2418         int ret;
2419         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2420                                                     struct ltdb_private);
2421
2422         /* We index for DN only if using a GUID index */
2423         if (ltdb->cache->GUID_index_attribute == NULL) {
2424                 return LDB_SUCCESS;
2425         }
2426
2427         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2428                                    LTDB_IDXDN, add);
2429
2430         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2431                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2432                                        "Entry %s already exists",
2433                                        ldb_dn_get_linearized(msg->dn));
2434                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2435         }
2436         return ret;
2437 }
2438
2439 /*
2440   add the index entries for a new element in a record
2441   The caller guarantees that these element values are not yet indexed
2442 */
2443 int ltdb_index_add_element(struct ldb_module *module,
2444                            struct ltdb_private *ltdb,
2445                            const struct ldb_message *msg,
2446                            struct ldb_message_element *el)
2447 {
2448         if (ldb_dn_is_special(msg->dn)) {
2449                 return LDB_SUCCESS;
2450         }
2451         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2452                 return LDB_SUCCESS;
2453         }
2454         return ltdb_index_add_el(module, ltdb, msg, el);
2455 }
2456
2457 /*
2458   add the index entries for a new record
2459 */
2460 int ltdb_index_add_new(struct ldb_module *module,
2461                        struct ltdb_private *ltdb,
2462                        const struct ldb_message *msg)
2463 {
2464         int ret;
2465
2466         if (ldb_dn_is_special(msg->dn)) {
2467                 return LDB_SUCCESS;
2468         }
2469
2470         ret = ltdb_index_add_all(module, ltdb, msg);
2471         if (ret != LDB_SUCCESS) {
2472                 /*
2473                  * Because we can't trust the caller to be doing
2474                  * transactions properly, clean up any index for this
2475                  * entry rather than relying on a transaction
2476                  * cleanup
2477                  */
2478
2479                 ltdb_index_delete(module, msg);
2480                 return ret;
2481         }
2482
2483         ret = ltdb_index_onelevel(module, msg, 1);
2484         if (ret != LDB_SUCCESS) {
2485                 /*
2486                  * Because we can't trust the caller to be doing
2487                  * transactions properly, clean up any index for this
2488                  * entry rather than relying on a transaction
2489                  * cleanup
2490                  */
2491                 ltdb_index_delete(module, msg);
2492                 return ret;
2493         }
2494         return ret;
2495 }
2496
2497
2498 /*
2499   delete an index entry for one message element
2500 */
2501 int ltdb_index_del_value(struct ldb_module *module,
2502                          struct ltdb_private *ltdb,
2503                          const struct ldb_message *msg,
2504                          struct ldb_message_element *el, unsigned int v_idx)
2505 {
2506         struct ldb_context *ldb;
2507         struct ldb_dn *dn_key;
2508         const char *dn_str;
2509         int ret, i;
2510         unsigned int j;
2511         struct dn_list *list;
2512         struct ldb_dn *dn = msg->dn;
2513         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2514
2515         ldb = ldb_module_get_ctx(module);
2516
2517         dn_str = ldb_dn_get_linearized(dn);
2518         if (dn_str == NULL) {
2519                 return LDB_ERR_OPERATIONS_ERROR;
2520         }
2521
2522         if (dn_str[0] == '@') {
2523                 return LDB_SUCCESS;
2524         }
2525
2526         dn_key = ltdb_index_key(ldb, ltdb,
2527                                 el->name, &el->values[v_idx],
2528                                 NULL, &truncation);
2529         /*
2530          * We ignore key truncation in ltdb_index_add1() so
2531          * match that by ignoring it here as well
2532          *
2533          * Multiple values are legitimate and accepted
2534          */
2535         if (!dn_key) {
2536                 return LDB_ERR_OPERATIONS_ERROR;
2537         }
2538
2539         list = talloc_zero(dn_key, struct dn_list);
2540         if (list == NULL) {
2541                 talloc_free(dn_key);
2542                 return LDB_ERR_OPERATIONS_ERROR;
2543         }
2544
2545         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2546         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2547                 /* it wasn't indexed. Did we have an earlier error? If we did then
2548                    its gone now */
2549                 talloc_free(dn_key);
2550                 return LDB_SUCCESS;
2551         }
2552
2553         if (ret != LDB_SUCCESS) {
2554                 talloc_free(dn_key);
2555                 return ret;
2556         }
2557
2558         /*
2559          * Find one of the values matching this message to remove
2560          */
2561         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2562         if (i == -1) {
2563                 /* nothing to delete */
2564                 talloc_free(dn_key);
2565                 return LDB_SUCCESS;
2566         }
2567
2568         j = (unsigned int) i;
2569         if (j != list->count - 1) {
2570                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2571         }
2572         list->count--;
2573         if (list->count == 0) {
2574                 talloc_free(list->dn);
2575                 list->dn = NULL;
2576         } else {
2577                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2578         }
2579
2580         ret = ltdb_dn_list_store(module, dn_key, list);
2581
2582         talloc_free(dn_key);
2583
2584         return ret;
2585 }
2586
2587 /*
2588   delete the index entries for a element
2589   return -1 on failure
2590 */
2591 int ltdb_index_del_element(struct ldb_module *module,
2592                            struct ltdb_private *ltdb,
2593                            const struct ldb_message *msg,
2594                            struct ldb_message_element *el)
2595 {
2596         const char *dn_str;
2597         int ret;
2598         unsigned int i;
2599
2600         if (!ltdb->cache->attribute_indexes) {
2601                 /* no indexed fields */
2602                 return LDB_SUCCESS;
2603         }
2604
2605         dn_str = ldb_dn_get_linearized(msg->dn);
2606         if (dn_str == NULL) {
2607                 return LDB_ERR_OPERATIONS_ERROR;
2608         }
2609
2610         if (dn_str[0] == '@') {
2611                 return LDB_SUCCESS;
2612         }
2613
2614         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2615                 return LDB_SUCCESS;
2616         }
2617         for (i = 0; i < el->num_values; i++) {
2618                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2619                 if (ret != LDB_SUCCESS) {
2620                         return ret;
2621                 }
2622         }
2623
2624         return LDB_SUCCESS;
2625 }
2626
2627 /*
2628   delete the index entries for a record
2629   return -1 on failure
2630 */
2631 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2632 {
2633         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2634         int ret;
2635         unsigned int i;
2636
2637         if (ldb_dn_is_special(msg->dn)) {
2638                 return LDB_SUCCESS;
2639         }
2640
2641         ret = ltdb_index_onelevel(module, msg, 0);
2642         if (ret != LDB_SUCCESS) {
2643                 return ret;
2644         }
2645
2646         ret = ltdb_write_index_dn_guid(module, msg, 0);
2647         if (ret != LDB_SUCCESS) {
2648                 return ret;
2649         }
2650
2651         if (!ltdb->cache->attribute_indexes) {
2652                 /* no indexed fields */
2653                 return LDB_SUCCESS;
2654         }
2655
2656         for (i = 0; i < msg->num_elements; i++) {
2657                 ret = ltdb_index_del_element(module, ltdb,
2658                                              msg, &msg->elements[i]);
2659                 if (ret != LDB_SUCCESS) {
2660                         return ret;
2661                 }
2662         }
2663
2664         return LDB_SUCCESS;
2665 }
2666
2667
2668 /*
2669   traversal function that deletes all @INDEX records in the in-memory
2670   TDB.
2671
2672   This does not touch the actual DB, that is done at transaction
2673   commit, which in turn greatly reduces DB churn as we will likely
2674   be able to do a direct update into the old record.
2675 */
2676 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2677 {
2678         struct ldb_module *module = state;
2679         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2680         const char *dnstr = "DN=" LTDB_INDEX ":";
2681         struct dn_list list;
2682         struct ldb_dn *dn;
2683         struct ldb_val v;
2684         int ret;
2685
2686         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
2687                 return 0;
2688         }
2689         /* we need to put a empty list in the internal tdb for this
2690          * index entry */
2691         list.dn = NULL;
2692         list.count = 0;
2693
2694         /* the offset of 3 is to remove the DN= prefix. */
2695         v.data = key.dptr + 3;
2696         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
2697
2698         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2699
2700         /*
2701          * This does not actually touch the DB quite yet, just
2702          * the in-memory index cache
2703          */
2704         ret = ltdb_dn_list_store(module, dn, &list);
2705         if (ret != LDB_SUCCESS) {
2706                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2707                                        "Unable to store null index for %s\n",
2708                                                 ldb_dn_get_linearized(dn));
2709                 talloc_free(dn);
2710                 return -1;
2711         }
2712         talloc_free(dn);
2713         return 0;
2714 }
2715
2716 struct ltdb_reindex_context {
2717         struct ldb_module *module;
2718         int error;
2719         uint32_t count;
2720 };
2721
2722 /*
2723   traversal function that adds @INDEX records during a re index
2724 */
2725 static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2726 {
2727         struct ldb_context *ldb;
2728         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2729         struct ldb_module *module = ctx->module;
2730         struct ldb_message *msg;
2731         unsigned int nb_elements_in_db;
2732         const struct ldb_val val = {
2733                 .data = data.dptr,
2734                 .length = data.dsize,
2735         };
2736         int ret;
2737         TDB_DATA key2;
2738         bool is_record;
2739         
2740         ldb = ldb_module_get_ctx(module);
2741
2742         if (key.dsize > 4 &&
2743             memcmp(key.dptr, "DN=@", 4) == 0) {
2744                 return 0;
2745         }
2746
2747         is_record = ltdb_key_is_record(key);
2748         if (is_record == false) {
2749                 return 0;
2750         }
2751         
2752         msg = ldb_msg_new(module);
2753         if (msg == NULL) {
2754                 return -1;
2755         }
2756
2757         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2758                                                    msg,
2759                                                    NULL, 0,
2760                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2761                                                    &nb_elements_in_db);
2762         if (ret != 0) {
2763                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2764                                                 ldb_dn_get_linearized(msg->dn));
2765                 ctx->error = ret;
2766                 talloc_free(msg);
2767                 return -1;
2768         }
2769
2770         if (msg->dn == NULL) {
2771                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2772                           "Refusing to re-index as GUID "
2773                           "key %*.*s with no DN\n",
2774                           (int)key.dsize, (int)key.dsize,
2775                           (char *)key.dptr);
2776                 talloc_free(msg);
2777                 return -1;
2778         }
2779         
2780         /* check if the DN key has changed, perhaps due to the case
2781            insensitivity of an element changing, or a change from DN
2782            to GUID keys */
2783         key2 = ltdb_key_msg(module, msg, msg);
2784         if (key2.dptr == NULL) {
2785                 /* probably a corrupt record ... darn */
2786                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2787                                                 ldb_dn_get_linearized(msg->dn));
2788                 talloc_free(msg);
2789                 return 0;
2790         }
2791         if (key.dsize != key2.dsize ||
2792             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2793                 int tdb_ret;
2794                 tdb_ret = tdb_delete(tdb, key);
2795                 if (tdb_ret != 0) {
2796                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2797                                   "Failed to delete %*.*s "
2798                                   "for rekey as %*.*s: %s",
2799                                   (int)key.dsize, (int)key.dsize,
2800                                   (const char *)key.dptr,
2801                                   (int)key2.dsize, (int)key2.dsize,
2802                                   (const char *)key.dptr,
2803                                   tdb_errorstr(tdb));
2804                         ctx->error = ltdb_err_map(tdb_error(tdb));
2805                         return -1;
2806                 }
2807                 tdb_ret = tdb_store(tdb, key2, data, 0);
2808                 if (tdb_ret != 0) {
2809                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2810                                   "Failed to rekey %*.*s as %*.*s: %s",
2811                                   (int)key.dsize, (int)key.dsize,
2812                                   (const char *)key.dptr,
2813                                   (int)key2.dsize, (int)key2.dsize,
2814                                   (const char *)key.dptr,
2815                                   tdb_errorstr(tdb));
2816                         ctx->error = ltdb_err_map(tdb_error(tdb));
2817                         return -1;
2818                 }
2819         }
2820         talloc_free(key2.dptr);
2821
2822         talloc_free(msg);
2823
2824         ctx->count++;
2825         if (ctx->count % 10000 == 0) {
2826                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2827                           "Reindexing: re-keyed %u records so far",
2828                           ctx->count);
2829         }
2830
2831         return 0;
2832 }
2833
2834 /*
2835   traversal function that adds @INDEX records during a re index
2836 */
2837 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2838 {
2839         struct ldb_context *ldb;
2840         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2841         struct ldb_module *module = ctx->module;
2842         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2843                                                     struct ltdb_private);
2844         struct ldb_message *msg;
2845         unsigned int nb_elements_in_db;
2846         const struct ldb_val val = {
2847                 .data = data.dptr,
2848                 .length = data.dsize,
2849         };
2850         int ret;
2851         bool is_record;
2852         
2853         ldb = ldb_module_get_ctx(module);
2854
2855         if (key.dsize > 4 &&
2856             memcmp(key.dptr, "DN=@", 4) == 0) {
2857                 return 0;
2858         }
2859
2860         is_record = ltdb_key_is_record(key);
2861         if (is_record == false) {
2862                 return 0;
2863         }
2864         
2865         msg = ldb_msg_new(module);
2866         if (msg == NULL) {
2867                 return -1;
2868         }
2869
2870         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2871                                                    msg,
2872                                                    NULL, 0,
2873                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2874                                                    &nb_elements_in_db);
2875         if (ret != 0) {
2876                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2877                                                 ldb_dn_get_linearized(msg->dn));
2878                 ctx->error = ret;
2879                 talloc_free(msg);
2880                 return -1;
2881         }
2882
2883         if (msg->dn == NULL) {
2884                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2885                           "Refusing to re-index as GUID "
2886                           "key %*.*s with no DN\n",
2887                           (int)key.dsize, (int)key.dsize,
2888                           (char *)key.dptr);
2889                 talloc_free(msg);
2890                 return -1;
2891         }
2892
2893         ret = ltdb_index_onelevel(module, msg, 1);
2894         if (ret != LDB_SUCCESS) {
2895                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2896                           "Adding special ONE LEVEL index failed (%s)!",
2897                                                 ldb_dn_get_linearized(msg->dn));
2898                 talloc_free(msg);
2899                 return -1;
2900         }
2901
2902         ret = ltdb_index_add_all(module, ltdb, msg);
2903
2904         if (ret != LDB_SUCCESS) {
2905                 ctx->error = ret;
2906                 talloc_free(msg);
2907                 return -1;
2908         }
2909
2910         talloc_free(msg);
2911
2912         ctx->count++;
2913         if (ctx->count % 10000 == 0) {
2914                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2915                           "Reindexing: re-indexed %u records so far",
2916                           ctx->count);
2917         }
2918
2919         return 0;
2920 }
2921
2922 /*
2923   force a complete reindex of the database
2924 */
2925 int ltdb_reindex(struct ldb_module *module)
2926 {
2927         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2928         int ret;
2929         struct ltdb_reindex_context ctx;
2930
2931         /*
2932          * Only triggered after a modification, but make clear we do
2933          * not re-index a read-only DB
2934          */
2935         if (ltdb->read_only) {
2936                 return LDB_ERR_UNWILLING_TO_PERFORM;
2937         }
2938
2939         if (ltdb_cache_reload(module) != 0) {
2940                 return LDB_ERR_OPERATIONS_ERROR;
2941         }
2942
2943         /*
2944          * Ensure we read (and so remove) the entries from the real
2945          * DB, no values stored so far are any use as we want to do a
2946          * re-index
2947          */
2948         ltdb_index_transaction_cancel(module);
2949
2950         ret = ltdb_index_transaction_start(module);
2951         if (ret != LDB_SUCCESS) {
2952                 return ret;
2953         }
2954
2955         /* first traverse the database deleting any @INDEX records by
2956          * putting NULL entries in the in-memory tdb
2957          */
2958         ret = tdb_traverse(ltdb->tdb, delete_index, module);
2959         if (ret < 0) {
2960                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2961                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
2962                                        ldb_errstring(ldb));
2963                 return LDB_ERR_OPERATIONS_ERROR;
2964         }
2965
2966         ctx.module = module;
2967         ctx.error = 0;
2968         ctx.count = 0;
2969
2970         /* now traverse adding any indexes for normal LDB records */
2971         ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
2972         if (ret < 0) {
2973                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2974                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
2975                                        ldb_errstring(ldb));
2976                 return LDB_ERR_OPERATIONS_ERROR;
2977         }
2978
2979         if (ctx.error != LDB_SUCCESS) {
2980                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2981                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2982                 return ctx.error;
2983         }
2984
2985         ctx.error = 0;
2986         ctx.count = 0;
2987
2988         /* now traverse adding any indexes for normal LDB records */
2989         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
2990         if (ret < 0) {
2991                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2992                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
2993                                        ldb_errstring(ldb));
2994                 return LDB_ERR_OPERATIONS_ERROR;
2995         }
2996
2997         if (ctx.error != LDB_SUCCESS) {
2998                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2999                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3000                 return ctx.error;
3001         }
3002
3003         if (ctx.count > 10000) {
3004                 ldb_debug(ldb_module_get_ctx(module),
3005                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3006                           "final index write-out will be in transaction commit",
3007                           ltdb->kv_ops->name(ltdb));
3008         }
3009         return LDB_SUCCESS;
3010 }