ldb: check return values
[samba.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ltdb_write_index_dn_guid(struct ldb_module *module,
174                                     const struct ldb_message *msg,
175                                     int add);
176 static int ltdb_index_dn_base_dn(struct ldb_module *module,
177                                  struct ltdb_private *ltdb,
178                                  struct ldb_dn *base_dn,
179                                  struct dn_list *dn_list,
180                                  enum key_truncation *truncation);
181
182 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
183                               struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
193         if (ltdb->max_key_length == 0){
194                 return UINT_MAX;
195         }
196         return ltdb->max_key_length;
197 }
198
199 /* enable the idxptr mode when transactions start */
200 int ltdb_index_transaction_start(struct ldb_module *module)
201 {
202         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
203         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
204         if (ltdb->idxptr == NULL) {
205                 return ldb_oom(ldb_module_get_ctx(module));
206         }
207
208         return LDB_SUCCESS;
209 }
210
211 /*
212   see if two ldb_val structures contain exactly the same data
213   return -1 or 1 for a mismatch, 0 for match
214 */
215 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
216                                          const struct ldb_val *v2)
217 {
218         if (v1->length > v2->length) {
219                 return -1;
220         }
221         if (v1->length < v2->length) {
222                 return 1;
223         }
224         return memcmp(v1->data, v2->data, v1->length);
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
232                                        const struct ldb_val *v2)
233 {
234         if (v1.length > v2->length) {
235                 return -1;
236         }
237         if (v1.length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1.data, v2->data, v1.length);
241 }
242
243
244 /*
245   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
246   binary-safe comparison for the 'dn' returns -1 if not found
247
248   This is therefore safe when the value is a GUID in the future
249  */
250 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
251                                  const struct dn_list *list,
252                                  const struct ldb_val *v)
253 {
254         unsigned int i;
255         struct ldb_val *exact = NULL, *next = NULL;
256
257         if (ltdb->cache->GUID_index_attribute == NULL) {
258                 for (i=0; i<list->count; i++) {
259                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
260                                 return i;
261                         }
262                 }
263                 return -1;
264         }
265
266         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
267                                 *v, ldb_val_equal_exact_ordered,
268                                 exact, next);
269         if (exact == NULL) {
270                 return -1;
271         }
272         /* Not required, but keeps the compiler quiet */
273         if (next != NULL) {
274                 return -1;
275         }
276
277         i = exact - list->dn;
278         return i;
279 }
280
281 /*
282   find a entry in a dn_list. Uses a case sensitive comparison with the dn
283   returns -1 if not found
284  */
285 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
286                                  struct dn_list *list,
287                                  const struct ldb_message *msg)
288 {
289         struct ldb_val v;
290         const struct ldb_val *key_val;
291         if (ltdb->cache->GUID_index_attribute == NULL) {
292                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
293                 v.data = discard_const_p(unsigned char, dn_str);
294                 v.length = strlen(dn_str);
295         } else {
296                 key_val = ldb_msg_find_ldb_val(msg,
297                                                ltdb->cache->GUID_index_attribute);
298                 if (key_val == NULL) {
299                         return -1;
300                 }
301                 v = *key_val;
302         }
303         return ltdb_dn_list_find_val(ltdb, list, &v);
304 }
305
306 /*
307   this is effectively a cast function, but with lots of paranoia
308   checks and also copes with CPUs that are fussy about pointer
309   alignment
310  */
311 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
312 {
313         struct dn_list *list;
314         if (rec.dsize != sizeof(void *)) {
315                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
316                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
317                 return NULL;
318         }
319         /* note that we can't just use a cast here, as rec.dptr may
320            not be aligned sufficiently for a pointer. A cast would cause
321            platforms like some ARM CPUs to crash */
322         memcpy(&list, rec.dptr, sizeof(void *));
323         list = talloc_get_type(list, struct dn_list);
324         if (list == NULL) {
325                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
326                                        "Bad type '%s' for idxptr",
327                                        talloc_get_name(list));
328                 return NULL;
329         }
330         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
331                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
332                                        "Bad parent '%s' for idxptr",
333                                        talloc_get_name(talloc_parent(list->dn)));
334                 return NULL;
335         }
336         return list;
337 }
338
339 /*
340   return the @IDX list in an index entry for a dn as a
341   struct dn_list
342  */
343 static int ltdb_dn_list_load(struct ldb_module *module,
344                              struct ltdb_private *ltdb,
345                              struct ldb_dn *dn, struct dn_list *list)
346 {
347         struct ldb_message *msg;
348         int ret, version;
349         struct ldb_message_element *el;
350         TDB_DATA rec;
351         struct dn_list *list2;
352         TDB_DATA key;
353
354         list->dn = NULL;
355         list->count = 0;
356
357         /* see if we have any in-memory index entries */
358         if (ltdb->idxptr == NULL ||
359             ltdb->idxptr->itdb == NULL) {
360                 goto normal_index;
361         }
362
363         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
364         key.dsize = strlen((char *)key.dptr);
365
366         rec = tdb_fetch(ltdb->idxptr->itdb, key);
367         if (rec.dptr == NULL) {
368                 goto normal_index;
369         }
370
371         /* we've found an in-memory index entry */
372         list2 = ltdb_index_idxptr(module, rec, true);
373         if (list2 == NULL) {
374                 free(rec.dptr);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         free(rec.dptr);
378
379         *list = *list2;
380         return LDB_SUCCESS;
381
382 normal_index:
383         msg = ldb_msg_new(list);
384         if (msg == NULL) {
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_search_dn1(module, dn, msg,
389                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
390                               |LDB_UNPACK_DATA_FLAG_NO_DN);
391         if (ret != LDB_SUCCESS) {
392                 talloc_free(msg);
393                 return ret;
394         }
395
396         el = ldb_msg_find_element(msg, LTDB_IDX);
397         if (!el) {
398                 talloc_free(msg);
399                 return LDB_SUCCESS;
400         }
401
402         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
403
404         /*
405          * we avoid copying the strings by stealing the list.  We have
406          * to steal msg onto el->values (which looks odd) because we
407          * asked for the memory to be allocated on msg, not on each
408          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
409          */
410         if (ltdb->cache->GUID_index_attribute == NULL) {
411                 /* check indexing version number */
412                 if (version != LTDB_INDEXING_VERSION) {
413                         ldb_debug_set(ldb_module_get_ctx(module),
414                                       LDB_DEBUG_ERROR,
415                                       "Wrong DN index version %d "
416                                       "expected %d for %s",
417                                       version, LTDB_INDEXING_VERSION,
418                                       ldb_dn_get_linearized(dn));
419                         talloc_free(msg);
420                         return LDB_ERR_OPERATIONS_ERROR;
421                 }
422
423                 talloc_steal(el->values, msg);
424                 list->dn = talloc_steal(list, el->values);
425                 list->count = el->num_values;
426         } else {
427                 unsigned int i;
428                 if (version != LTDB_GUID_INDEXING_VERSION) {
429                         /* This is quite likely during the DB startup
430                            on first upgrade to using a GUID index */
431                         ldb_debug_set(ldb_module_get_ctx(module),
432                                       LDB_DEBUG_ERROR,
433                                       "Wrong GUID index version %d "
434                                       "expected %d for %s",
435                                       version, LTDB_GUID_INDEXING_VERSION,
436                                       ldb_dn_get_linearized(dn));
437                         talloc_free(msg);
438                         return LDB_ERR_OPERATIONS_ERROR;
439                 }
440
441                 if (el->num_values == 0) {
442                         talloc_free(msg);
443                         return LDB_ERR_OPERATIONS_ERROR;
444                 }
445
446                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
447                         talloc_free(msg);
448                         return LDB_ERR_OPERATIONS_ERROR;
449                 }
450
451                 list->count = el->values[0].length / LTDB_GUID_SIZE;
452                 list->dn = talloc_array(list, struct ldb_val, list->count);
453                 if (list->dn == NULL) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 /*
459                  * The actual data is on msg, due to
460                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
461                  */
462                 talloc_steal(list->dn, msg);
463                 for (i = 0; i < list->count; i++) {
464                         list->dn[i].data
465                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
466                         list->dn[i].length = LTDB_GUID_SIZE;
467                 }
468         }
469
470         /* We don't need msg->elements any more */
471         talloc_free(msg->elements);
472         return LDB_SUCCESS;
473 }
474
475 int ltdb_key_dn_from_idx(struct ldb_module *module,
476                          struct ltdb_private *ltdb,
477                          TALLOC_CTX *mem_ctx,
478                          struct ldb_dn *dn,
479                          TDB_DATA *tdb_key)
480 {
481         struct ldb_context *ldb = ldb_module_get_ctx(module);
482         int ret;
483         int index = 0;
484         enum key_truncation truncation = KEY_NOT_TRUNCATED;
485         struct dn_list *list = talloc(mem_ctx, struct dn_list);
486         if (list == NULL) {
487                 ldb_oom(ldb);
488                 return LDB_ERR_OPERATIONS_ERROR;
489         }
490
491
492         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
493         if (ret != LDB_SUCCESS) {
494                 TALLOC_FREE(list);
495                 return ret;
496         }
497
498         if (list->count == 0) {
499                 TALLOC_FREE(list);
500                 return LDB_ERR_NO_SUCH_OBJECT;
501         }
502
503         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
504                 const char *dn_str = ldb_dn_get_linearized(dn);
505                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
506                                        __location__
507                                        ": Failed to read DN index "
508                                        "against %s for %s: too many "
509                                        "values (%u > 1)",
510                                        ltdb->cache->GUID_index_attribute,
511                                        dn_str, list->count);
512                 TALLOC_FREE(list);
513                 return LDB_ERR_CONSTRAINT_VIOLATION;
514         }
515
516         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
517                 /*
518                  * DN key has been truncated, need to inspect the actual
519                  * records to locate the actual DN
520                  */
521                 int i;
522                 index = -1;
523                 for (i=0; i < list->count; i++) {
524                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
525                         TDB_DATA key = {
526                                 .dptr = guid_key,
527                                 .dsize = sizeof(guid_key)
528                         };
529                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
530                         struct ldb_message *rec = ldb_msg_new(ldb);
531                         if (rec == NULL) {
532                                 TALLOC_FREE(list);
533                                 return LDB_ERR_OPERATIONS_ERROR;
534                         }
535
536                         ret = ltdb_idx_to_key(module, ltdb,
537                                               ldb, &list->dn[i],
538                                               &key);
539                         if (ret != LDB_SUCCESS) {
540                                 TALLOC_FREE(list);
541                                 TALLOC_FREE(rec);
542                                 return ret;
543                         }
544
545                         ret = ltdb_search_key(module, ltdb, key,
546                                               rec, flags);
547                         if (key.dptr != guid_key) {
548                                 TALLOC_FREE(key.dptr);
549                         }
550                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
551                                 /*
552                                  * the record has disappeared?
553                                  * yes, this can happen
554                                  */
555                                 TALLOC_FREE(rec);
556                                 continue;
557                         }
558
559                         if (ret != LDB_SUCCESS) {
560                                 /* an internal error */
561                                 TALLOC_FREE(rec);
562                                 TALLOC_FREE(list);
563                                 return LDB_ERR_OPERATIONS_ERROR;
564                         }
565
566                         /*
567                          * We found the actual DN that we wanted from in the
568                          * multiple values that matched the index
569                          * (due to truncation), so return that.
570                          *
571                          */
572                         if (ldb_dn_compare(dn, rec->dn) == 0) {
573                                 index = i;
574                                 TALLOC_FREE(rec);
575                                 break;
576                         }
577                 }
578
579                 /*
580                  * We matched the index but the actual DN we wanted
581                  * was not here.
582                  */
583                 if (index == -1) {
584                         TALLOC_FREE(list);
585                         return LDB_ERR_NO_SUCH_OBJECT;
586                 }
587         }
588
589         /* The tdb_key memory is allocated by the caller */
590         ret = ltdb_guid_to_key(module, ltdb,
591                                &list->dn[index], tdb_key);
592         TALLOC_FREE(list);
593
594         if (ret != LDB_SUCCESS) {
595                 return LDB_ERR_OPERATIONS_ERROR;
596         }
597
598         return LDB_SUCCESS;
599 }
600
601
602
603 /*
604   save a dn_list into a full @IDX style record
605  */
606 static int ltdb_dn_list_store_full(struct ldb_module *module,
607                                    struct ltdb_private *ltdb,
608                                    struct ldb_dn *dn,
609                                    struct dn_list *list)
610 {
611         struct ldb_message *msg;
612         int ret;
613
614         msg = ldb_msg_new(module);
615         if (!msg) {
616                 return ldb_module_oom(module);
617         }
618
619         msg->dn = dn;
620
621         if (list->count == 0) {
622                 ret = ltdb_delete_noindex(module, msg);
623                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
624                         ret = LDB_SUCCESS;
625                 }
626                 talloc_free(msg);
627                 return ret;
628         }
629
630         if (ltdb->cache->GUID_index_attribute == NULL) {
631                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
632                                       LTDB_INDEXING_VERSION);
633                 if (ret != LDB_SUCCESS) {
634                         talloc_free(msg);
635                         return ldb_module_oom(module);
636                 }
637         } else {
638                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
639                                       LTDB_GUID_INDEXING_VERSION);
640                 if (ret != LDB_SUCCESS) {
641                         talloc_free(msg);
642                         return ldb_module_oom(module);
643                 }
644         }
645
646         if (list->count > 0) {
647                 struct ldb_message_element *el;
648
649                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
650                 if (ret != LDB_SUCCESS) {
651                         talloc_free(msg);
652                         return ldb_module_oom(module);
653                 }
654
655                 if (ltdb->cache->GUID_index_attribute == NULL) {
656                         el->values = list->dn;
657                         el->num_values = list->count;
658                 } else {
659                         struct ldb_val v;
660                         unsigned int i;
661                         el->values = talloc_array(msg,
662                                                   struct ldb_val, 1);
663                         if (el->values == NULL) {
664                                 talloc_free(msg);
665                                 return ldb_module_oom(module);
666                         }
667
668                         v.data = talloc_array_size(el->values,
669                                                    list->count,
670                                                    LTDB_GUID_SIZE);
671                         if (v.data == NULL) {
672                                 talloc_free(msg);
673                                 return ldb_module_oom(module);
674                         }
675
676                         v.length = talloc_get_size(v.data);
677
678                         for (i = 0; i < list->count; i++) {
679                                 if (list->dn[i].length !=
680                                     LTDB_GUID_SIZE) {
681                                         talloc_free(msg);
682                                         return ldb_module_operr(module);
683                                 }
684                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
685                                        list->dn[i].data,
686                                        LTDB_GUID_SIZE);
687                         }
688                         el->values[0] = v;
689                         el->num_values = 1;
690                 }
691         }
692
693         ret = ltdb_store(module, msg, TDB_REPLACE);
694         talloc_free(msg);
695         return ret;
696 }
697
698 /*
699   save a dn_list into the database, in either @IDX or internal format
700  */
701 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
702                               struct dn_list *list)
703 {
704         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
705         TDB_DATA rec, key;
706         int ret;
707         struct dn_list *list2;
708
709         if (ltdb->idxptr == NULL) {
710                 return ltdb_dn_list_store_full(module, ltdb,
711                                                dn, list);
712         }
713
714         if (ltdb->idxptr->itdb == NULL) {
715                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
716                 if (ltdb->idxptr->itdb == NULL) {
717                         return LDB_ERR_OPERATIONS_ERROR;
718                 }
719         }
720
721         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
722         if (key.dptr == NULL) {
723                 return LDB_ERR_OPERATIONS_ERROR;
724         }
725         key.dsize = strlen((char *)key.dptr);
726
727         rec = tdb_fetch(ltdb->idxptr->itdb, key);
728         if (rec.dptr != NULL) {
729                 list2 = ltdb_index_idxptr(module, rec, false);
730                 if (list2 == NULL) {
731                         free(rec.dptr);
732                         return LDB_ERR_OPERATIONS_ERROR;
733                 }
734                 free(rec.dptr);
735                 list2->dn = talloc_steal(list2, list->dn);
736                 list2->count = list->count;
737                 return LDB_SUCCESS;
738         }
739
740         list2 = talloc(ltdb->idxptr, struct dn_list);
741         if (list2 == NULL) {
742                 return LDB_ERR_OPERATIONS_ERROR;
743         }
744         list2->dn = talloc_steal(list2, list->dn);
745         list2->count = list->count;
746
747         rec.dptr = (uint8_t *)&list2;
748         rec.dsize = sizeof(void *);
749
750
751         /*
752          * This is not a store into the main DB, but into an in-memory
753          * TDB, so we don't need a guard on ltdb->read_only
754          */
755         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
756         if (ret != 0) {
757                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
758         }
759         return LDB_SUCCESS;
760 }
761
762 /*
763   traverse function for storing the in-memory index entries on disk
764  */
765 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
766 {
767         struct ldb_module *module = state;
768         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
769         struct ldb_dn *dn;
770         struct ldb_context *ldb = ldb_module_get_ctx(module);
771         struct ldb_val v;
772         struct dn_list *list;
773
774         list = ltdb_index_idxptr(module, data, true);
775         if (list == NULL) {
776                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
777                 return -1;
778         }
779
780         v.data = key.dptr;
781         v.length = strnlen((char *)key.dptr, key.dsize);
782
783         dn = ldb_dn_from_ldb_val(module, ldb, &v);
784         if (dn == NULL) {
785                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
786                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
787                 return -1;
788         }
789
790         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
791                                                       dn, list);
792         talloc_free(dn);
793         if (ltdb->idxptr->error != 0) {
794                 return -1;
795         }
796         return 0;
797 }
798
799 /* cleanup the idxptr mode when transaction commits */
800 int ltdb_index_transaction_commit(struct ldb_module *module)
801 {
802         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
803         int ret;
804
805         struct ldb_context *ldb = ldb_module_get_ctx(module);
806
807         ldb_reset_err_string(ldb);
808
809         if (ltdb->idxptr->itdb) {
810                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
811                 tdb_close(ltdb->idxptr->itdb);
812         }
813
814         ret = ltdb->idxptr->error;
815         if (ret != LDB_SUCCESS) {
816                 if (!ldb_errstring(ldb)) {
817                         ldb_set_errstring(ldb, ldb_strerror(ret));
818                 }
819                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
820         }
821
822         talloc_free(ltdb->idxptr);
823         ltdb->idxptr = NULL;
824         return ret;
825 }
826
827 /* cleanup the idxptr mode when transaction cancels */
828 int ltdb_index_transaction_cancel(struct ldb_module *module)
829 {
830         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
831         if (ltdb->idxptr && ltdb->idxptr->itdb) {
832                 tdb_close(ltdb->idxptr->itdb);
833         }
834         talloc_free(ltdb->idxptr);
835         ltdb->idxptr = NULL;
836         return LDB_SUCCESS;
837 }
838
839
840 /*
841   return the dn key to be used for an index
842   the caller is responsible for freeing
843 */
844 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
845                                      struct ltdb_private *ltdb,
846                                      const char *attr, const struct ldb_val *value,
847                                      const struct ldb_schema_attribute **ap,
848                                      enum key_truncation *truncation)
849 {
850         struct ldb_dn *ret;
851         struct ldb_val v;
852         const struct ldb_schema_attribute *a = NULL;
853         char *attr_folded = NULL;
854         const char *attr_for_dn = NULL;
855         int r;
856         bool should_b64_encode;
857
858         unsigned int max_key_length = ltdb_max_key_length(ltdb);
859         size_t key_len = 0;
860         size_t attr_len = 0;
861         const size_t indx_len = sizeof(LTDB_INDEX) - 1;
862         unsigned frmt_len = 0;
863         const size_t additional_key_length = 4;
864         unsigned int num_separators = 3; /* Estimate for overflow check */
865         const size_t min_data = 1;
866         const size_t min_key_length = additional_key_length
867                 + indx_len + num_separators + min_data;
868
869         if (attr[0] == '@') {
870                 attr_for_dn = attr;
871                 v = *value;
872                 if (ap != NULL) {
873                         *ap = NULL;
874                 }
875         } else {
876                 attr_folded = ldb_attr_casefold(ldb, attr);
877                 if (!attr_folded) {
878                         return NULL;
879                 }
880
881                 attr_for_dn = attr_folded;
882
883                 a = ldb_schema_attribute_by_name(ldb, attr);
884                 if (ap) {
885                         *ap = a;
886                 }
887                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
888                 if (r != LDB_SUCCESS) {
889                         const char *errstr = ldb_errstring(ldb);
890                         /* canonicalisation can be refused. For
891                            example, a attribute that takes wildcards
892                            will refuse to canonicalise if the value
893                            contains a wildcard */
894                         ldb_asprintf_errstring(ldb,
895                                                "Failed to create index "
896                                                "key for attribute '%s':%s%s%s",
897                                                attr, ldb_strerror(r),
898                                                (errstr?":":""),
899                                                (errstr?errstr:""));
900                         talloc_free(attr_folded);
901                         return NULL;
902                 }
903         }
904         attr_len = strlen(attr_for_dn);
905
906         /*
907          * Check if there is any hope this will fit into the DB.
908          * Overflow here is not actually critical the code below
909          * checks again to make the printf and the DB does another
910          * check for too long keys
911          */
912         if (max_key_length - attr_len < min_key_length) {
913                 ldb_asprintf_errstring(
914                         ldb,
915                         __location__ ": max_key_length "
916                         "is too small (%u) < (%u)",
917                         max_key_length,
918                         (unsigned)(min_key_length + attr_len));
919                 talloc_free(attr_folded);
920                 return NULL;
921         }
922
923         /*
924          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
925          * "DN=" and a trailing string terminator
926          */
927         max_key_length -= additional_key_length;
928
929         /*
930          * We do not base 64 encode a DN in a key, it has already been
931          * casefold and lineraized, that is good enough.  That already
932          * avoids embedded NUL etc.
933          */
934         if (ltdb->cache->GUID_index_attribute != NULL) {
935                 if (strcmp(attr, LTDB_IDXDN) == 0) {
936                         should_b64_encode = false;
937                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
938                         /*
939                          * We can only change the behaviour for IDXONE
940                          * when the GUID index is enabled
941                          */
942                         should_b64_encode = false;
943                 } else {
944                         should_b64_encode
945                                 = ldb_should_b64_encode(ldb, &v);
946                 }
947         } else {
948                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
949         }
950
951         if (should_b64_encode) {
952                 size_t vstr_len = 0;
953                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
954                 if (!vstr) {
955                         talloc_free(attr_folded);
956                         return NULL;
957                 }
958                 vstr_len = strlen(vstr);
959                 /* 
960                  * Overflow here is not critical as we only use this
961                  * to choose the printf truncation
962                  */
963                 key_len = num_separators + indx_len + attr_len + vstr_len;
964                 if (key_len > max_key_length) {
965                         size_t excess = key_len - max_key_length;
966                         frmt_len = vstr_len - excess;
967                         *truncation = KEY_TRUNCATED;
968                         /*
969                         * Truncated keys are placed in a separate key space
970                         * from the non truncated keys
971                         * Note: the double hash "##" is not a typo and
972                         * indicates that the following value is base64 encoded
973                         */
974                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
975                                              LTDB_INDEX, attr_for_dn,
976                                              frmt_len, vstr);
977                 } else {
978                         frmt_len = vstr_len;
979                         *truncation = KEY_NOT_TRUNCATED;
980                         /*
981                          * Note: the double colon "::" is not a typo and
982                          * indicates that the following value is base64 encoded
983                          */
984                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
985                                              LTDB_INDEX, attr_for_dn,
986                                              frmt_len, vstr);
987                 }
988                 talloc_free(vstr);
989         } else {
990                 /* Only need two seperators */
991                 num_separators = 2;
992
993                 /*
994                  * Overflow here is not critical as we only use this
995                  * to choose the printf truncation
996                  */
997                 key_len = num_separators + indx_len + attr_len + (int)v.length;
998                 if (key_len > max_key_length) {
999                         size_t excess = key_len - max_key_length;
1000                         frmt_len = v.length - excess;
1001                         *truncation = KEY_TRUNCATED;
1002                         /*
1003                          * Truncated keys are placed in a separate key space
1004                          * from the non truncated keys
1005                          */
1006                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1007                                              LTDB_INDEX, attr_for_dn,
1008                                              frmt_len, (char *)v.data);
1009                 } else {
1010                         frmt_len = v.length;
1011                         *truncation = KEY_NOT_TRUNCATED;
1012                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1013                                              LTDB_INDEX, attr_for_dn,
1014                                              frmt_len, (char *)v.data);
1015                 }
1016         }
1017
1018         if (v.data != value->data) {
1019                 talloc_free(v.data);
1020         }
1021         talloc_free(attr_folded);
1022
1023         return ret;
1024 }
1025
1026 /*
1027   see if a attribute value is in the list of indexed attributes
1028 */
1029 static bool ltdb_is_indexed(struct ldb_module *module,
1030                             struct ltdb_private *ltdb,
1031                             const char *attr)
1032 {
1033         struct ldb_context *ldb = ldb_module_get_ctx(module);
1034         unsigned int i;
1035         struct ldb_message_element *el;
1036
1037         if ((ltdb->cache->GUID_index_attribute != NULL) &&
1038             (ldb_attr_cmp(attr,
1039                           ltdb->cache->GUID_index_attribute) == 0)) {
1040                 /* Implicity covered, this is the index key */
1041                 return false;
1042         }
1043         if (ldb->schema.index_handler_override) {
1044                 const struct ldb_schema_attribute *a
1045                         = ldb_schema_attribute_by_name(ldb, attr);
1046
1047                 if (a == NULL) {
1048                         return false;
1049                 }
1050
1051                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1052                         return true;
1053                 } else {
1054                         return false;
1055                 }
1056         }
1057
1058         if (!ltdb->cache->attribute_indexes) {
1059                 return false;
1060         }
1061
1062         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
1063         if (el == NULL) {
1064                 return false;
1065         }
1066
1067         /* TODO: this is too expensive! At least use a binary search */
1068         for (i=0; i<el->num_values; i++) {
1069                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1070                         return true;
1071                 }
1072         }
1073         return false;
1074 }
1075
1076 /*
1077   in the following logic functions, the return value is treated as
1078   follows:
1079
1080      LDB_SUCCESS: we found some matching index values
1081
1082      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1083
1084      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1085                                we'll need a full search
1086  */
1087
1088 /*
1089   return a list of dn's that might match a simple indexed search (an
1090   equality search only)
1091  */
1092 static int ltdb_index_dn_simple(struct ldb_module *module,
1093                                 struct ltdb_private *ltdb,
1094                                 const struct ldb_parse_tree *tree,
1095                                 struct dn_list *list)
1096 {
1097         struct ldb_context *ldb;
1098         struct ldb_dn *dn;
1099         int ret;
1100         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1101
1102         ldb = ldb_module_get_ctx(module);
1103
1104         list->count = 0;
1105         list->dn = NULL;
1106
1107         /* if the attribute isn't in the list of indexed attributes then
1108            this node needs a full search */
1109         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
1110                 return LDB_ERR_OPERATIONS_ERROR;
1111         }
1112
1113         /* the attribute is indexed. Pull the list of DNs that match the
1114            search criterion */
1115         dn = ltdb_index_key(ldb, ltdb,
1116                             tree->u.equality.attr,
1117                             &tree->u.equality.value, NULL, &truncation);
1118         /*
1119          * We ignore truncation here and allow multi-valued matches
1120          * as ltdb_search_indexed will filter out the wrong one in
1121          * ltdb_index_filter() which calls ldb_match_message().
1122          */
1123         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1124
1125         ret = ltdb_dn_list_load(module, ltdb, dn, list);
1126         talloc_free(dn);
1127         return ret;
1128 }
1129
1130
1131 static bool list_union(struct ldb_context *ldb,
1132                        struct ltdb_private *ltdb,
1133                        struct dn_list *list, struct dn_list *list2);
1134
1135 /*
1136   return a list of dn's that might match a leaf indexed search
1137  */
1138 static int ltdb_index_dn_leaf(struct ldb_module *module,
1139                               struct ltdb_private *ltdb,
1140                               const struct ldb_parse_tree *tree,
1141                               struct dn_list *list)
1142 {
1143         if (ltdb->disallow_dn_filter &&
1144             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1145                 /* in AD mode we do not support "(dn=...)" search filters */
1146                 list->dn = NULL;
1147                 list->count = 0;
1148                 return LDB_SUCCESS;
1149         }
1150         if (tree->u.equality.attr[0] == '@') {
1151                 /* Do not allow a indexed search against an @ */
1152                 list->dn = NULL;
1153                 list->count = 0;
1154                 return LDB_SUCCESS;
1155         }
1156         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1157                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1158                 struct ldb_dn *dn
1159                         = ldb_dn_from_ldb_val(list,
1160                                               ldb_module_get_ctx(module),
1161                                               &tree->u.equality.value);
1162                 if (dn == NULL) {
1163                         /* If we can't parse it, no match */
1164                         list->dn = NULL;
1165                         list->count = 0;
1166                         return LDB_SUCCESS;
1167                 }
1168
1169                 /*
1170                  * Re-use the same code we use for a SCOPE_BASE
1171                  * search
1172                  *
1173                  * We can't call TALLOC_FREE(dn) as this must belong
1174                  * to list for the memory to remain valid.
1175                  */
1176                 return ltdb_index_dn_base_dn(module, ltdb, dn, list,
1177                                              &truncation);
1178                 /*
1179                  * We ignore truncation here and allow multi-valued matches
1180                  * as ltdb_search_indexed will filter out the wrong one in
1181                  * ltdb_index_filter() which calls ldb_match_message().
1182                  */
1183
1184         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
1185                    (ldb_attr_cmp(tree->u.equality.attr,
1186                                  ltdb->cache->GUID_index_attribute) == 0)) {
1187                 int ret;
1188                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1189                 list->dn = talloc_array(list, struct ldb_val, 1);
1190                 if (list->dn == NULL) {
1191                         ldb_module_oom(module);
1192                         return LDB_ERR_OPERATIONS_ERROR;
1193                 }
1194                 /*
1195                  * We need to go via the canonicalise_fn() to
1196                  * ensure we get the index in binary, rather
1197                  * than a string
1198                  */
1199                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
1200                                                                list->dn,
1201                                                                &tree->u.equality.value,
1202                                                                &list->dn[0]);
1203                 if (ret != LDB_SUCCESS) {
1204                         return LDB_ERR_OPERATIONS_ERROR;
1205                 }
1206                 list->count = 1;
1207                 return LDB_SUCCESS;
1208         }
1209
1210         return ltdb_index_dn_simple(module, ltdb, tree, list);
1211 }
1212
1213
1214 /*
1215   list intersection
1216   list = list & list2
1217 */
1218 static bool list_intersect(struct ldb_context *ldb,
1219                            struct ltdb_private *ltdb,
1220                            struct dn_list *list, const struct dn_list *list2)
1221 {
1222         const struct dn_list *short_list, *long_list;
1223         struct dn_list *list3;
1224         unsigned int i;
1225
1226         if (list->count == 0) {
1227                 /* 0 & X == 0 */
1228                 return true;
1229         }
1230         if (list2->count == 0) {
1231                 /* X & 0 == 0 */
1232                 list->count = 0;
1233                 list->dn = NULL;
1234                 return true;
1235         }
1236
1237         /* the indexing code is allowed to return a longer list than
1238            what really matches, as all results are filtered by the
1239            full expression at the end - this shortcut avoids a lot of
1240            work in some cases */
1241         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1242                 return true;
1243         }
1244         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1245                 list->count = list2->count;
1246                 list->dn = list2->dn;
1247                 /* note that list2 may not be the parent of list2->dn,
1248                    as list2->dn may be owned by ltdb->idxptr. In that
1249                    case we expect this reparent call to fail, which is
1250                    OK */
1251                 talloc_reparent(list2, list, list2->dn);
1252                 return true;
1253         }
1254
1255         if (list->count > list2->count) {
1256                 short_list = list2;
1257                 long_list = list;
1258         } else {
1259                 short_list = list;
1260                 long_list = list2;
1261         }
1262
1263         list3 = talloc_zero(list, struct dn_list);
1264         if (list3 == NULL) {
1265                 return false;
1266         }
1267
1268         list3->dn = talloc_array(list3, struct ldb_val,
1269                                  MIN(list->count, list2->count));
1270         if (!list3->dn) {
1271                 talloc_free(list3);
1272                 return false;
1273         }
1274         list3->count = 0;
1275
1276         for (i=0;i<short_list->count;i++) {
1277                 /* For the GUID index case, this is a binary search */
1278                 if (ltdb_dn_list_find_val(ltdb, long_list,
1279                                           &short_list->dn[i]) != -1) {
1280                         list3->dn[list3->count] = short_list->dn[i];
1281                         list3->count++;
1282                 }
1283         }
1284
1285         list->strict |= list2->strict;
1286         list->dn = talloc_steal(list, list3->dn);
1287         list->count = list3->count;
1288         talloc_free(list3);
1289
1290         return true;
1291 }
1292
1293
1294 /*
1295   list union
1296   list = list | list2
1297 */
1298 static bool list_union(struct ldb_context *ldb,
1299                        struct ltdb_private *ltdb,
1300                        struct dn_list *list, struct dn_list *list2)
1301 {
1302         struct ldb_val *dn3;
1303         unsigned int i = 0, j = 0, k = 0;
1304
1305         if (list2->count == 0) {
1306                 /* X | 0 == X */
1307                 return true;
1308         }
1309
1310         if (list->count == 0) {
1311                 /* 0 | X == X */
1312                 list->count = list2->count;
1313                 list->dn = list2->dn;
1314                 /* note that list2 may not be the parent of list2->dn,
1315                    as list2->dn may be owned by ltdb->idxptr. In that
1316                    case we expect this reparent call to fail, which is
1317                    OK */
1318                 talloc_reparent(list2, list, list2->dn);
1319                 return true;
1320         }
1321
1322         /*
1323          * Sort the lists (if not in GUID DN mode) so we can do
1324          * the de-duplication during the merge
1325          *
1326          * NOTE: This can sort the in-memory index values, as list or
1327          * list2 might not be a copy!
1328          */
1329         ltdb_dn_list_sort(ltdb, list);
1330         ltdb_dn_list_sort(ltdb, list2);
1331
1332         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1333         if (!dn3) {
1334                 ldb_oom(ldb);
1335                 return false;
1336         }
1337
1338         while (i < list->count || j < list2->count) {
1339                 int cmp;
1340                 if (i >= list->count) {
1341                         cmp = 1;
1342                 } else if (j >= list2->count) {
1343                         cmp = -1;
1344                 } else {
1345                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1346                                                           &list2->dn[j]);
1347                 }
1348
1349                 if (cmp < 0) {
1350                         /* Take list */
1351                         dn3[k] = list->dn[i];
1352                         i++;
1353                         k++;
1354                 } else if (cmp > 0) {
1355                         /* Take list2 */
1356                         dn3[k] = list2->dn[j];
1357                         j++;
1358                         k++;
1359                 } else {
1360                         /* Equal, take list */
1361                         dn3[k] = list->dn[i];
1362                         i++;
1363                         j++;
1364                         k++;
1365                 }
1366         }
1367
1368         list->dn = dn3;
1369         list->count = k;
1370
1371         return true;
1372 }
1373
1374 static int ltdb_index_dn(struct ldb_module *module,
1375                          struct ltdb_private *ltdb,
1376                          const struct ldb_parse_tree *tree,
1377                          struct dn_list *list);
1378
1379
1380 /*
1381   process an OR list (a union)
1382  */
1383 static int ltdb_index_dn_or(struct ldb_module *module,
1384                             struct ltdb_private *ltdb,
1385                             const struct ldb_parse_tree *tree,
1386                             struct dn_list *list)
1387 {
1388         struct ldb_context *ldb;
1389         unsigned int i;
1390
1391         ldb = ldb_module_get_ctx(module);
1392
1393         list->dn = NULL;
1394         list->count = 0;
1395
1396         for (i=0; i<tree->u.list.num_elements; i++) {
1397                 struct dn_list *list2;
1398                 int ret;
1399
1400                 list2 = talloc_zero(list, struct dn_list);
1401                 if (list2 == NULL) {
1402                         return LDB_ERR_OPERATIONS_ERROR;
1403                 }
1404
1405                 ret = ltdb_index_dn(module, ltdb,
1406                                     tree->u.list.elements[i], list2);
1407
1408                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1409                         /* X || 0 == X */
1410                         talloc_free(list2);
1411                         continue;
1412                 }
1413
1414                 if (ret != LDB_SUCCESS) {
1415                         /* X || * == * */
1416                         talloc_free(list2);
1417                         return ret;
1418                 }
1419
1420                 if (!list_union(ldb, ltdb, list, list2)) {
1421                         talloc_free(list2);
1422                         return LDB_ERR_OPERATIONS_ERROR;
1423                 }
1424         }
1425
1426         if (list->count == 0) {
1427                 return LDB_ERR_NO_SUCH_OBJECT;
1428         }
1429
1430         return LDB_SUCCESS;
1431 }
1432
1433
1434 /*
1435   NOT an index results
1436  */
1437 static int ltdb_index_dn_not(struct ldb_module *module,
1438                              struct ltdb_private *ltdb,
1439                              const struct ldb_parse_tree *tree,
1440                              struct dn_list *list)
1441 {
1442         /* the only way to do an indexed not would be if we could
1443            negate the not via another not or if we knew the total
1444            number of database elements so we could know that the
1445            existing expression covered the whole database.
1446
1447            instead, we just give up, and rely on a full index scan
1448            (unless an outer & manages to reduce the list)
1449         */
1450         return LDB_ERR_OPERATIONS_ERROR;
1451 }
1452
1453 /*
1454  * These things are unique, so avoid a full scan if this is a search
1455  * by GUID, DN or a unique attribute
1456  */
1457 static bool ltdb_index_unique(struct ldb_context *ldb,
1458                               struct ltdb_private *ltdb,
1459                               const char *attr)
1460 {
1461         const struct ldb_schema_attribute *a;
1462         if (ltdb->cache->GUID_index_attribute != NULL) {
1463                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1464                         return true;
1465                 }
1466         }
1467         if (ldb_attr_dn(attr) == 0) {
1468                 return true;
1469         }
1470
1471         a = ldb_schema_attribute_by_name(ldb, attr);
1472         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1473                 return true;
1474         }
1475         return false;
1476 }
1477
1478 /*
1479   process an AND expression (intersection)
1480  */
1481 static int ltdb_index_dn_and(struct ldb_module *module,
1482                              struct ltdb_private *ltdb,
1483                              const struct ldb_parse_tree *tree,
1484                              struct dn_list *list)
1485 {
1486         struct ldb_context *ldb;
1487         unsigned int i;
1488         bool found;
1489
1490         ldb = ldb_module_get_ctx(module);
1491
1492         list->dn = NULL;
1493         list->count = 0;
1494
1495         /* in the first pass we only look for unique simple
1496            equality tests, in the hope of avoiding having to look
1497            at any others */
1498         for (i=0; i<tree->u.list.num_elements; i++) {
1499                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1500                 int ret;
1501
1502                 if (subtree->operation != LDB_OP_EQUALITY ||
1503                     !ltdb_index_unique(ldb, ltdb,
1504                                        subtree->u.equality.attr)) {
1505                         continue;
1506                 }
1507
1508                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1509                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1510                         /* 0 && X == 0 */
1511                         return LDB_ERR_NO_SUCH_OBJECT;
1512                 }
1513                 if (ret == LDB_SUCCESS) {
1514                         /* a unique index match means we can
1515                          * stop. Note that we don't care if we return
1516                          * a few too many objects, due to later
1517                          * filtering */
1518                         return LDB_SUCCESS;
1519                 }
1520         }
1521
1522         /* now do a full intersection */
1523         found = false;
1524
1525         for (i=0; i<tree->u.list.num_elements; i++) {
1526                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1527                 struct dn_list *list2;
1528                 int ret;
1529
1530                 list2 = talloc_zero(list, struct dn_list);
1531                 if (list2 == NULL) {
1532                         return ldb_module_oom(module);
1533                 }
1534
1535                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1536
1537                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1538                         /* X && 0 == 0 */
1539                         list->dn = NULL;
1540                         list->count = 0;
1541                         talloc_free(list2);
1542                         return LDB_ERR_NO_SUCH_OBJECT;
1543                 }
1544
1545                 if (ret != LDB_SUCCESS) {
1546                         /* this didn't adding anything */
1547                         talloc_free(list2);
1548                         continue;
1549                 }
1550
1551                 if (!found) {
1552                         talloc_reparent(list2, list, list->dn);
1553                         list->dn = list2->dn;
1554                         list->count = list2->count;
1555                         found = true;
1556                 } else if (!list_intersect(ldb, ltdb,
1557                                            list, list2)) {
1558                         talloc_free(list2);
1559                         return LDB_ERR_OPERATIONS_ERROR;
1560                 }
1561
1562                 if (list->count == 0) {
1563                         list->dn = NULL;
1564                         return LDB_ERR_NO_SUCH_OBJECT;
1565                 }
1566
1567                 if (list->count < 2) {
1568                         /* it isn't worth loading the next part of the tree */
1569                         return LDB_SUCCESS;
1570                 }
1571         }
1572
1573         if (!found) {
1574                 /* none of the attributes were indexed */
1575                 return LDB_ERR_OPERATIONS_ERROR;
1576         }
1577
1578         return LDB_SUCCESS;
1579 }
1580
1581 /*
1582   return a list of matching objects using a one-level index
1583  */
1584 static int ltdb_index_dn_attr(struct ldb_module *module,
1585                               struct ltdb_private *ltdb,
1586                               const char *attr,
1587                               struct ldb_dn *dn,
1588                               struct dn_list *list,
1589                               enum key_truncation *truncation)
1590 {
1591         struct ldb_context *ldb;
1592         struct ldb_dn *key;
1593         struct ldb_val val;
1594         int ret;
1595
1596         ldb = ldb_module_get_ctx(module);
1597
1598         /* work out the index key from the parent DN */
1599         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1600         val.length = strlen((char *)val.data);
1601         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
1602         if (!key) {
1603                 ldb_oom(ldb);
1604                 return LDB_ERR_OPERATIONS_ERROR;
1605         }
1606
1607         ret = ltdb_dn_list_load(module, ltdb, key, list);
1608         talloc_free(key);
1609         if (ret != LDB_SUCCESS) {
1610                 return ret;
1611         }
1612
1613         if (list->count == 0) {
1614                 return LDB_ERR_NO_SUCH_OBJECT;
1615         }
1616
1617         return LDB_SUCCESS;
1618 }
1619
1620 /*
1621   return a list of matching objects using a one-level index
1622  */
1623 static int ltdb_index_dn_one(struct ldb_module *module,
1624                              struct ltdb_private *ltdb,
1625                              struct ldb_dn *parent_dn,
1626                              struct dn_list *list,
1627                              enum key_truncation *truncation)
1628 {
1629         /* Ensure we do not shortcut on intersection for this list */
1630         list->strict = true;
1631         return ltdb_index_dn_attr(module, ltdb,
1632                                   LTDB_IDXONE, parent_dn, list, truncation);
1633
1634 }
1635
1636 /*
1637   return a list of matching objects using the DN index
1638  */
1639 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1640                                  struct ltdb_private *ltdb,
1641                                  struct ldb_dn *base_dn,
1642                                  struct dn_list *dn_list,
1643                                  enum key_truncation *truncation)
1644 {
1645         const struct ldb_val *guid_val = NULL;
1646         if (ltdb->cache->GUID_index_attribute == NULL) {
1647                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1648                 if (dn_list->dn == NULL) {
1649                         return ldb_module_oom(module);
1650                 }
1651                 dn_list->dn[0].data = discard_const_p(unsigned char,
1652                                                       ldb_dn_get_linearized(base_dn));
1653                 if (dn_list->dn[0].data == NULL) {
1654                         return ldb_module_oom(module);
1655                 }
1656                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1657                 dn_list->count = 1;
1658
1659                 return LDB_SUCCESS;
1660         }
1661
1662         if (ltdb->cache->GUID_index_dn_component != NULL) {
1663                 guid_val = ldb_dn_get_extended_component(base_dn,
1664                                                          ltdb->cache->GUID_index_dn_component);
1665         }
1666
1667         if (guid_val != NULL) {
1668                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1669                 if (dn_list->dn == NULL) {
1670                         return ldb_module_oom(module);
1671                 }
1672                 dn_list->dn[0].data = guid_val->data;
1673                 dn_list->dn[0].length = guid_val->length;
1674                 dn_list->count = 1;
1675
1676                 return LDB_SUCCESS;
1677         }
1678
1679         return ltdb_index_dn_attr(module, ltdb,
1680                                   LTDB_IDXDN, base_dn, dn_list, truncation);
1681 }
1682
1683 /*
1684   return a list of dn's that might match a indexed search or
1685   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1686  */
1687 static int ltdb_index_dn(struct ldb_module *module,
1688                          struct ltdb_private *ltdb,
1689                          const struct ldb_parse_tree *tree,
1690                          struct dn_list *list)
1691 {
1692         int ret = LDB_ERR_OPERATIONS_ERROR;
1693
1694         switch (tree->operation) {
1695         case LDB_OP_AND:
1696                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1697                 break;
1698
1699         case LDB_OP_OR:
1700                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1701                 break;
1702
1703         case LDB_OP_NOT:
1704                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1705                 break;
1706
1707         case LDB_OP_EQUALITY:
1708                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1709                 break;
1710
1711         case LDB_OP_SUBSTRING:
1712         case LDB_OP_GREATER:
1713         case LDB_OP_LESS:
1714         case LDB_OP_PRESENT:
1715         case LDB_OP_APPROX:
1716         case LDB_OP_EXTENDED:
1717                 /* we can't index with fancy bitops yet */
1718                 ret = LDB_ERR_OPERATIONS_ERROR;
1719                 break;
1720         }
1721
1722         return ret;
1723 }
1724
1725 /*
1726   filter a candidate dn_list from an indexed search into a set of results
1727   extracting just the given attributes
1728 */
1729 static int ltdb_index_filter(struct ltdb_private *ltdb,
1730                              const struct dn_list *dn_list,
1731                              struct ltdb_context *ac,
1732                              uint32_t *match_count,
1733                              enum key_truncation scope_one_truncation)
1734 {
1735         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1736         struct ldb_message *msg;
1737         struct ldb_message *filtered_msg;
1738         unsigned int i;
1739         unsigned int num_keys = 0;
1740         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1741         TDB_DATA *keys = NULL;
1742
1743         /*
1744          * We have to allocate the key list (rather than just walk the
1745          * caller supplied list) as the callback could change the list
1746          * (by modifying an indexed attribute hosted in the in-memory
1747          * index cache!)
1748          */
1749         keys = talloc_array(ac, TDB_DATA, dn_list->count);
1750         if (keys == NULL) {
1751                 return ldb_module_oom(ac->module);
1752         }
1753
1754         if (ltdb->cache->GUID_index_attribute != NULL) {
1755                 /*
1756                  * We speculate that the keys will be GUID based and so
1757                  * pre-fill in enough space for a GUID (avoiding a pile of
1758                  * small allocations)
1759                  */
1760                 struct guid_tdb_key {
1761                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1762                 } *key_values = NULL;
1763
1764                 key_values = talloc_array(keys,
1765                                           struct guid_tdb_key,
1766                                           dn_list->count);
1767
1768                 for (i = 0; i < dn_list->count; i++) {
1769                         keys[i].dptr = key_values[i].guid_key;
1770                         keys[i].dsize = sizeof(key_values[i].guid_key);
1771                 }
1772                 if (key_values == NULL) {
1773                         return ldb_module_oom(ac->module);
1774                 }
1775         } else {
1776                 for (i = 0; i < dn_list->count; i++) {
1777                         keys[i].dptr = NULL;
1778                         keys[i].dsize = 0;
1779                 }
1780         }
1781
1782         for (i = 0; i < dn_list->count; i++) {
1783                 int ret;
1784
1785                 ret = ltdb_idx_to_key(ac->module,
1786                                       ltdb,
1787                                       keys,
1788                                       &dn_list->dn[i],
1789                                       &keys[num_keys]);
1790                 if (ret != LDB_SUCCESS) {
1791                         return ret;
1792                 }
1793
1794                 if (ltdb->cache->GUID_index_attribute != NULL) {
1795                         /*
1796                          * If we are in GUID index mode, then the dn_list is
1797                          * sorted.  If we got a duplicate, forget about it, as
1798                          * otherwise we would send the same entry back more
1799                          * than once.
1800                          *
1801                          * This is needed in the truncated DN case, or if a
1802                          * duplicate was forced in via
1803                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1804                          */
1805
1806                         if (memcmp(previous_guid_key,
1807                                    keys[num_keys].dptr,
1808                                    sizeof(previous_guid_key)) == 0) {
1809                                 continue;
1810                         }
1811
1812                         memcpy(previous_guid_key,
1813                                keys[num_keys].dptr,
1814                                sizeof(previous_guid_key));
1815                 }
1816                 num_keys++;
1817         }
1818
1819
1820         /*
1821          * Now that the list is a safe copy, send the callbacks
1822          */
1823         for (i = 0; i < num_keys; i++) {
1824                 int ret;
1825                 bool matched;
1826                 msg = ldb_msg_new(ac);
1827                 if (!msg) {
1828                         return LDB_ERR_OPERATIONS_ERROR;
1829                 }
1830
1831                 ret = ltdb_search_key(ac->module, ltdb,
1832                                       keys[i], msg,
1833                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1834                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1835                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1836                         /*
1837                          * the record has disappeared? yes, this can
1838                          * happen if the entry is deleted by something
1839                          * operating in the callback (not another
1840                          * process, as we have a read lock)
1841                          */
1842                         talloc_free(msg);
1843                         continue;
1844                 }
1845
1846                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1847                         /* an internal error */
1848                         talloc_free(msg);
1849                         return LDB_ERR_OPERATIONS_ERROR;
1850                 }
1851
1852                 /*
1853                  * We trust the index for LDB_SCOPE_ONELEVEL
1854                  * unless the index key has been truncated.
1855                  *
1856                  * LDB_SCOPE_BASE is not passed in by our only caller.
1857                  */
1858                 if (ac->scope == LDB_SCOPE_ONELEVEL
1859                     && ltdb->cache->one_level_indexes
1860                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1861                         ret = ldb_match_message(ldb, msg, ac->tree,
1862                                                 ac->scope, &matched);
1863                 } else {
1864                         ret = ldb_match_msg_error(ldb, msg,
1865                                                   ac->tree, ac->base,
1866                                                   ac->scope, &matched);
1867                 }
1868
1869                 if (ret != LDB_SUCCESS) {
1870                         talloc_free(msg);
1871                         return ret;
1872                 }
1873                 if (!matched) {
1874                         talloc_free(msg);
1875                         continue;
1876                 }
1877
1878                 /* filter the attributes that the user wants */
1879                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1880
1881                 talloc_free(msg);
1882
1883                 if (ret == -1) {
1884                         return LDB_ERR_OPERATIONS_ERROR;
1885                 }
1886
1887                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1888                 if (ret != LDB_SUCCESS) {
1889                         /* Regardless of success or failure, the msg
1890                          * is the callbacks responsiblity, and should
1891                          * not be talloc_free()'ed */
1892                         ac->request_terminated = true;
1893                         return ret;
1894                 }
1895
1896                 (*match_count)++;
1897         }
1898
1899         TALLOC_FREE(keys);
1900         return LDB_SUCCESS;
1901 }
1902
1903 /*
1904   sort a DN list
1905  */
1906 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1907                               struct dn_list *list)
1908 {
1909         if (list->count < 2) {
1910                 return;
1911         }
1912
1913         /* We know the list is sorted when using the GUID index */
1914         if (ltdb->cache->GUID_index_attribute != NULL) {
1915                 return;
1916         }
1917
1918         TYPESAFE_QSORT(list->dn, list->count,
1919                        ldb_val_equal_exact_for_qsort);
1920 }
1921
1922 /*
1923   search the database with a LDAP-like expression using indexes
1924   returns -1 if an indexed search is not possible, in which
1925   case the caller should call ltdb_search_full()
1926 */
1927 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1928 {
1929         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1930         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1931         struct dn_list *dn_list;
1932         int ret;
1933         enum ldb_scope index_scope;
1934         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1935
1936         /* see if indexing is enabled */
1937         if (!ltdb->cache->attribute_indexes &&
1938             !ltdb->cache->one_level_indexes &&
1939             ac->scope != LDB_SCOPE_BASE) {
1940                 /* fallback to a full search */
1941                 return LDB_ERR_OPERATIONS_ERROR;
1942         }
1943
1944         dn_list = talloc_zero(ac, struct dn_list);
1945         if (dn_list == NULL) {
1946                 return ldb_module_oom(ac->module);
1947         }
1948
1949         /*
1950          * For the purposes of selecting the switch arm below, if we
1951          * don't have a one-level index then treat it like a subtree
1952          * search
1953          */
1954         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1955             !ltdb->cache->one_level_indexes) {
1956                 index_scope = LDB_SCOPE_SUBTREE;
1957         } else {
1958                 index_scope = ac->scope;
1959         }
1960
1961         switch (index_scope) {
1962         case LDB_SCOPE_BASE:
1963                 /*
1964                  * The only caller will have filtered the operation out
1965                  * so we should never get here
1966                  */
1967                 return ldb_operr(ldb);
1968
1969         case LDB_SCOPE_ONELEVEL:
1970                 /*
1971                  * If we ever start to also load the index values for
1972                  * the tree, we must ensure we strictly intersect with
1973                  * this list, as we trust the ONELEVEL index
1974                  */
1975                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
1976                                         &scope_one_truncation);
1977                 if (ret != LDB_SUCCESS) {
1978                         talloc_free(dn_list);
1979                         return ret;
1980                 }
1981
1982                 /*
1983                  * If we have too many matches, running the filter
1984                  * tree over the SCOPE_ONELEVEL can be quite expensive
1985                  * so we now check the filter tree index as well.
1986                  *
1987                  * We only do this in the GUID index mode, which is
1988                  * O(n*log(m)) otherwise the intersection below will
1989                  * be too costly at O(n*m).
1990                  *
1991                  * We don't set a heuristic for 'too many' but instead
1992                  * do it always and rely on the index lookup being
1993                  * fast enough in the small case.
1994                  */
1995                 if (ltdb->cache->GUID_index_attribute != NULL) {
1996                         struct dn_list *idx_one_tree_list
1997                                 = talloc_zero(ac, struct dn_list);
1998                         if (idx_one_tree_list == NULL) {
1999                                 return ldb_module_oom(ac->module);
2000                         }
2001
2002                         if (!ltdb->cache->attribute_indexes) {
2003                                 talloc_free(idx_one_tree_list);
2004                                 talloc_free(dn_list);
2005                                 return LDB_ERR_OPERATIONS_ERROR;
2006                         }
2007                         /*
2008                          * Here we load the index for the tree.
2009                          *
2010                          * We only care if this is successful, if the
2011                          * index can't trim the result list down then
2012                          * the ONELEVEL index is still good enough.
2013                          */
2014                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
2015                                             idx_one_tree_list);
2016                         if (ret == LDB_SUCCESS) {
2017                                 if (!list_intersect(ldb, ltdb,
2018                                                     dn_list,
2019                                                     idx_one_tree_list)) {
2020                                         talloc_free(idx_one_tree_list);
2021                                         talloc_free(dn_list);
2022                                         return LDB_ERR_OPERATIONS_ERROR;
2023                                 }
2024                         }
2025                 }
2026                 break;
2027
2028         case LDB_SCOPE_SUBTREE:
2029         case LDB_SCOPE_DEFAULT:
2030                 if (!ltdb->cache->attribute_indexes) {
2031                         talloc_free(dn_list);
2032                         return LDB_ERR_OPERATIONS_ERROR;
2033                 }
2034                 /*
2035                  * Here we load the index for the tree.  We have no
2036                  * index for the subtree.
2037                  */
2038                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
2039                 if (ret != LDB_SUCCESS) {
2040                         talloc_free(dn_list);
2041                         return ret;
2042                 }
2043                 break;
2044         }
2045
2046         /*
2047          * It is critical that this function do the re-filter even
2048          * on things found by the index as the index can over-match
2049          * in cases of truncation (as well as when it decides it is
2050          * not worth further filtering)
2051          *
2052          * If this changes, then the index code above would need to
2053          * pass up a flag to say if any index was truncated during
2054          * processing as the truncation here refers only to the
2055          * SCOPE_ONELEVEL index.
2056          */
2057         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
2058                                 scope_one_truncation);
2059         talloc_free(dn_list);
2060         return ret;
2061 }
2062
2063 /**
2064  * @brief Add a DN in the index list of a given attribute name/value pair
2065  *
2066  * This function will add the DN in the index list for the index for
2067  * the given attribute name and value.
2068  *
2069  * @param[in]  module       A ldb_module structure
2070  *
2071  * @param[in]  dn           The string representation of the DN as it
2072  *                          will be stored in the index entry
2073  *
2074  * @param[in]  el           A ldb_message_element array, one of the entry
2075  *                          referred by the v_idx is the attribute name and
2076  *                          value pair which will be used to construct the
2077  *                          index name
2078  *
2079  * @param[in]  v_idx        The index of element in the el array to use
2080  *
2081  * @return                  An ldb error code
2082  */
2083 static int ltdb_index_add1(struct ldb_module *module,
2084                            struct ltdb_private *ltdb,
2085                            const struct ldb_message *msg,
2086                            struct ldb_message_element *el, int v_idx)
2087 {
2088         struct ldb_context *ldb;
2089         struct ldb_dn *dn_key;
2090         int ret;
2091         const struct ldb_schema_attribute *a;
2092         struct dn_list *list;
2093         unsigned alloc_len;
2094         enum key_truncation truncation = KEY_TRUNCATED;
2095
2096
2097         ldb = ldb_module_get_ctx(module);
2098
2099         list = talloc_zero(module, struct dn_list);
2100         if (list == NULL) {
2101                 return LDB_ERR_OPERATIONS_ERROR;
2102         }
2103
2104         dn_key = ltdb_index_key(ldb, ltdb,
2105                                 el->name, &el->values[v_idx], &a, &truncation);
2106         if (!dn_key) {
2107                 talloc_free(list);
2108                 return LDB_ERR_OPERATIONS_ERROR;
2109         }
2110         /*
2111          * Samba only maintains unique indexes on the objectSID and objectGUID
2112          * so if a unique index key exceeds the maximum length there is a
2113          * problem.
2114          */
2115         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2116                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2117                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2118
2119                 ldb_asprintf_errstring(
2120                         ldb,
2121                         __location__ ": unique index key on %s in %s, "
2122                         "exceeds maximum key length of %u (encoded).",
2123                         el->name,
2124                         ldb_dn_get_linearized(msg->dn),
2125                         ltdb->max_key_length);
2126                 talloc_free(list);
2127                 return LDB_ERR_CONSTRAINT_VIOLATION;
2128         }
2129         talloc_steal(list, dn_key);
2130
2131         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2132         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2133                 talloc_free(list);
2134                 return ret;
2135         }
2136
2137         /*
2138          * Check for duplicates in the @IDXDN DN -> GUID record
2139          *
2140          * This is very normal, it just means a duplicate DN creation
2141          * was attempted, so don't set the error string or print scary
2142          * messages.
2143          */
2144         if (list->count > 0 &&
2145             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2146             truncation == KEY_NOT_TRUNCATED) {
2147
2148                 talloc_free(list);
2149                 return LDB_ERR_CONSTRAINT_VIOLATION;
2150
2151         } else if (list->count > 0
2152                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2153
2154                 /*
2155                  * At least one existing entry in the DN->GUID index, which
2156                  * arises when the DN indexes have been truncated
2157                  *
2158                  * So need to pull the DN's to check if it's really a duplicate
2159                  */
2160                 int i;
2161                 for (i=0; i < list->count; i++) {
2162                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2163                         TDB_DATA key = {
2164                                 .dptr = guid_key,
2165                                 .dsize = sizeof(guid_key)
2166                         };
2167                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2168                         struct ldb_message *rec = ldb_msg_new(ldb);
2169                         if (rec == NULL) {
2170                                 return LDB_ERR_OPERATIONS_ERROR;
2171                         }
2172
2173                         ret = ltdb_idx_to_key(module, ltdb,
2174                                               ldb, &list->dn[i],
2175                                               &key);
2176                         if (ret != LDB_SUCCESS) {
2177                                 TALLOC_FREE(list);
2178                                 TALLOC_FREE(rec);
2179                                 return ret;
2180                         }
2181
2182                         ret = ltdb_search_key(module, ltdb, key,
2183                                               rec, flags);
2184                         if (key.dptr != guid_key) {
2185                                 TALLOC_FREE(key.dptr);
2186                         }
2187                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2188                                 /*
2189                                  * the record has disappeared?
2190                                  * yes, this can happen
2191                                  */
2192                                 talloc_free(rec);
2193                                 continue;
2194                         }
2195
2196                         if (ret != LDB_SUCCESS) {
2197                                 /* an internal error */
2198                                 TALLOC_FREE(rec);
2199                                 TALLOC_FREE(list);
2200                                 return LDB_ERR_OPERATIONS_ERROR;
2201                         }
2202                         /*
2203                          * The DN we are trying to add to the DB and index
2204                          * is already here, so we must deny the addition
2205                          */
2206                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2207                                 TALLOC_FREE(rec);
2208                                 TALLOC_FREE(list);
2209                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2210                         }
2211                 }
2212         }
2213
2214         /*
2215          * Check for duplicates in unique indexes
2216          *
2217          * We don't need to do a loop test like the @IDXDN case
2218          * above as we have a ban on long unique index values
2219          * at the start of this function.
2220          */
2221         if (list->count > 0 &&
2222             ((a != NULL
2223               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2224                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2225                 /*
2226                  * We do not want to print info about a possibly
2227                  * confidential DN that the conflict was with in the
2228                  * user-visible error string
2229                  */
2230
2231                 if (ltdb->cache->GUID_index_attribute == NULL) {
2232                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2233                                   __location__
2234                                   ": unique index violation on %s in %s, "
2235                                   "conficts with %*.*s in %s",
2236                                   el->name, ldb_dn_get_linearized(msg->dn),
2237                                   (int)list->dn[0].length,
2238                                   (int)list->dn[0].length,
2239                                   list->dn[0].data,
2240                                   ldb_dn_get_linearized(dn_key));
2241                 } else {
2242                         /* This can't fail, gives a default at worst */
2243                         const struct ldb_schema_attribute *attr
2244                                 = ldb_schema_attribute_by_name(
2245                                         ldb,
2246                                         ltdb->cache->GUID_index_attribute);
2247                         struct ldb_val v;
2248                         ret = attr->syntax->ldif_write_fn(ldb, list,
2249                                                           &list->dn[0], &v);
2250                         if (ret == LDB_SUCCESS) {
2251                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2252                                           __location__
2253                                           ": unique index violation on %s in "
2254                                           "%s, conficts with %s %*.*s in %s",
2255                                           el->name,
2256                                           ldb_dn_get_linearized(msg->dn),
2257                                           ltdb->cache->GUID_index_attribute,
2258                                           (int)v.length,
2259                                           (int)v.length,
2260                                           v.data,
2261                                           ldb_dn_get_linearized(dn_key));
2262                         }
2263                 }
2264                 ldb_asprintf_errstring(ldb,
2265                                        __location__ ": unique index violation "
2266                                        "on %s in %s",
2267                                        el->name,
2268                                        ldb_dn_get_linearized(msg->dn));
2269                 talloc_free(list);
2270                 return LDB_ERR_CONSTRAINT_VIOLATION;
2271         }
2272
2273         /* overallocate the list a bit, to reduce the number of
2274          * realloc trigered copies */
2275         alloc_len = ((list->count+1)+7) & ~7;
2276         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2277         if (list->dn == NULL) {
2278                 talloc_free(list);
2279                 return LDB_ERR_OPERATIONS_ERROR;
2280         }
2281
2282         if (ltdb->cache->GUID_index_attribute == NULL) {
2283                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2284                 list->dn[list->count].data
2285                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2286                 if (list->dn[list->count].data == NULL) {
2287                         talloc_free(list);
2288                         return LDB_ERR_OPERATIONS_ERROR;
2289                 }
2290                 list->dn[list->count].length = strlen(dn_str);
2291         } else {
2292                 const struct ldb_val *key_val;
2293                 struct ldb_val *exact = NULL, *next = NULL;
2294                 key_val = ldb_msg_find_ldb_val(msg,
2295                                                ltdb->cache->GUID_index_attribute);
2296                 if (key_val == NULL) {
2297                         talloc_free(list);
2298                         return ldb_module_operr(module);
2299                 }
2300
2301                 if (key_val->length != LTDB_GUID_SIZE) {
2302                         talloc_free(list);
2303                         return ldb_module_operr(module);
2304                 }
2305
2306                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2307                                         *key_val, ldb_val_equal_exact_ordered,
2308                                         exact, next);
2309
2310                 /*
2311                  * Give a warning rather than fail, this could be a
2312                  * duplicate value in the record allowed by a caller
2313                  * forcing in the value with
2314                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2315                  */
2316                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2317                         /* This can't fail, gives a default at worst */
2318                         const struct ldb_schema_attribute *attr
2319                                 = ldb_schema_attribute_by_name(
2320                                         ldb,
2321                                         ltdb->cache->GUID_index_attribute);
2322                         struct ldb_val v;
2323                         ret = attr->syntax->ldif_write_fn(ldb, list,
2324                                                           exact, &v);
2325                         if (ret == LDB_SUCCESS) {
2326                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2327                                           __location__
2328                                           ": duplicate attribute value in %s "
2329                                           "for index on %s, "
2330                                           "duplicate of %s %*.*s in %s",
2331                                           ldb_dn_get_linearized(msg->dn),
2332                                           el->name,
2333                                           ltdb->cache->GUID_index_attribute,
2334                                           (int)v.length,
2335                                           (int)v.length,
2336                                           v.data,
2337                                           ldb_dn_get_linearized(dn_key));
2338                         }
2339                 }
2340
2341                 if (next == NULL) {
2342                         next = &list->dn[list->count];
2343                 } else {
2344                         memmove(&next[1], next,
2345                                 sizeof(*next) * (list->count - (next - list->dn)));
2346                 }
2347                 *next = ldb_val_dup(list->dn, key_val);
2348                 if (next->data == NULL) {
2349                         talloc_free(list);
2350                         return ldb_module_operr(module);
2351                 }
2352         }
2353         list->count++;
2354
2355         ret = ltdb_dn_list_store(module, dn_key, list);
2356
2357         talloc_free(list);
2358
2359         return ret;
2360 }
2361
2362 /*
2363   add index entries for one elements in a message
2364  */
2365 static int ltdb_index_add_el(struct ldb_module *module,
2366                              struct ltdb_private *ltdb,
2367                              const struct ldb_message *msg,
2368                              struct ldb_message_element *el)
2369 {
2370         unsigned int i;
2371         for (i = 0; i < el->num_values; i++) {
2372                 int ret = ltdb_index_add1(module, ltdb,
2373                                           msg, el, i);
2374                 if (ret != LDB_SUCCESS) {
2375                         return ret;
2376                 }
2377         }
2378
2379         return LDB_SUCCESS;
2380 }
2381
2382 /*
2383   add index entries for all elements in a message
2384  */
2385 static int ltdb_index_add_all(struct ldb_module *module,
2386                               struct ltdb_private *ltdb,
2387                               const struct ldb_message *msg)
2388 {
2389         struct ldb_message_element *elements = msg->elements;
2390         unsigned int i;
2391         const char *dn_str;
2392         int ret;
2393
2394         if (ldb_dn_is_special(msg->dn)) {
2395                 return LDB_SUCCESS;
2396         }
2397
2398         dn_str = ldb_dn_get_linearized(msg->dn);
2399         if (dn_str == NULL) {
2400                 return LDB_ERR_OPERATIONS_ERROR;
2401         }
2402
2403         ret = ltdb_write_index_dn_guid(module, msg, 1);
2404         if (ret != LDB_SUCCESS) {
2405                 return ret;
2406         }
2407
2408         if (!ltdb->cache->attribute_indexes) {
2409                 /* no indexed fields */
2410                 return LDB_SUCCESS;
2411         }
2412
2413         for (i = 0; i < msg->num_elements; i++) {
2414                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
2415                         continue;
2416                 }
2417                 ret = ltdb_index_add_el(module, ltdb,
2418                                         msg, &elements[i]);
2419                 if (ret != LDB_SUCCESS) {
2420                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2421                         ldb_asprintf_errstring(ldb,
2422                                                __location__ ": Failed to re-index %s in %s - %s",
2423                                                elements[i].name, dn_str,
2424                                                ldb_errstring(ldb));
2425                         return ret;
2426                 }
2427         }
2428
2429         return LDB_SUCCESS;
2430 }
2431
2432
2433 /*
2434   insert a DN index for a message
2435 */
2436 static int ltdb_modify_index_dn(struct ldb_module *module,
2437                                 struct ltdb_private *ltdb,
2438                                 const struct ldb_message *msg,
2439                                 struct ldb_dn *dn,
2440                                 const char *index, int add)
2441 {
2442         struct ldb_message_element el;
2443         struct ldb_val val;
2444         int ret;
2445
2446         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2447         if (val.data == NULL) {
2448                 const char *dn_str = ldb_dn_get_linearized(dn);
2449                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2450                                        __location__
2451                                        ": Failed to modify %s "
2452                                        "against %s in %s: failed "
2453                                        "to get casefold DN",
2454                                        index,
2455                                        ltdb->cache->GUID_index_attribute,
2456                                        dn_str);
2457                 return LDB_ERR_OPERATIONS_ERROR;
2458         }
2459
2460         val.length = strlen((char *)val.data);
2461         el.name = index;
2462         el.values = &val;
2463         el.num_values = 1;
2464
2465         if (add) {
2466                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2467         } else { /* delete */
2468                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2469         }
2470
2471         if (ret != LDB_SUCCESS) {
2472                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2473                 const char *dn_str = ldb_dn_get_linearized(dn);
2474                 ldb_asprintf_errstring(ldb,
2475                                        __location__
2476                                        ": Failed to modify %s "
2477                                        "against %s in %s - %s",
2478                                        index,
2479                                        ltdb->cache->GUID_index_attribute,
2480                                        dn_str, ldb_errstring(ldb));
2481                 return ret;
2482         }
2483         return ret;
2484 }
2485
2486 /*
2487   insert a one level index for a message
2488 */
2489 static int ltdb_index_onelevel(struct ldb_module *module,
2490                                const struct ldb_message *msg, int add)
2491 {
2492         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2493                                                     struct ltdb_private);
2494         struct ldb_dn *pdn;
2495         int ret;
2496
2497         /* We index for ONE Level only if requested */
2498         if (!ltdb->cache->one_level_indexes) {
2499                 return LDB_SUCCESS;
2500         }
2501
2502         pdn = ldb_dn_get_parent(module, msg->dn);
2503         if (pdn == NULL) {
2504                 return LDB_ERR_OPERATIONS_ERROR;
2505         }
2506         ret = ltdb_modify_index_dn(module, ltdb,
2507                                    msg, pdn, LTDB_IDXONE, add);
2508
2509         talloc_free(pdn);
2510
2511         return ret;
2512 }
2513
2514 /*
2515   insert a one level index for a message
2516 */
2517 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2518                                     const struct ldb_message *msg,
2519                                     int add)
2520 {
2521         int ret;
2522         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2523                                                     struct ltdb_private);
2524
2525         /* We index for DN only if using a GUID index */
2526         if (ltdb->cache->GUID_index_attribute == NULL) {
2527                 return LDB_SUCCESS;
2528         }
2529
2530         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2531                                    LTDB_IDXDN, add);
2532
2533         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2534                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2535                                        "Entry %s already exists",
2536                                        ldb_dn_get_linearized(msg->dn));
2537                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2538         }
2539         return ret;
2540 }
2541
2542 /*
2543   add the index entries for a new element in a record
2544   The caller guarantees that these element values are not yet indexed
2545 */
2546 int ltdb_index_add_element(struct ldb_module *module,
2547                            struct ltdb_private *ltdb,
2548                            const struct ldb_message *msg,
2549                            struct ldb_message_element *el)
2550 {
2551         if (ldb_dn_is_special(msg->dn)) {
2552                 return LDB_SUCCESS;
2553         }
2554         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2555                 return LDB_SUCCESS;
2556         }
2557         return ltdb_index_add_el(module, ltdb, msg, el);
2558 }
2559
2560 /*
2561   add the index entries for a new record
2562 */
2563 int ltdb_index_add_new(struct ldb_module *module,
2564                        struct ltdb_private *ltdb,
2565                        const struct ldb_message *msg)
2566 {
2567         int ret;
2568
2569         if (ldb_dn_is_special(msg->dn)) {
2570                 return LDB_SUCCESS;
2571         }
2572
2573         ret = ltdb_index_add_all(module, ltdb, msg);
2574         if (ret != LDB_SUCCESS) {
2575                 /*
2576                  * Because we can't trust the caller to be doing
2577                  * transactions properly, clean up any index for this
2578                  * entry rather than relying on a transaction
2579                  * cleanup
2580                  */
2581
2582                 ltdb_index_delete(module, msg);
2583                 return ret;
2584         }
2585
2586         ret = ltdb_index_onelevel(module, msg, 1);
2587         if (ret != LDB_SUCCESS) {
2588                 /*
2589                  * Because we can't trust the caller to be doing
2590                  * transactions properly, clean up any index for this
2591                  * entry rather than relying on a transaction
2592                  * cleanup
2593                  */
2594                 ltdb_index_delete(module, msg);
2595                 return ret;
2596         }
2597         return ret;
2598 }
2599
2600
2601 /*
2602   delete an index entry for one message element
2603 */
2604 int ltdb_index_del_value(struct ldb_module *module,
2605                          struct ltdb_private *ltdb,
2606                          const struct ldb_message *msg,
2607                          struct ldb_message_element *el, unsigned int v_idx)
2608 {
2609         struct ldb_context *ldb;
2610         struct ldb_dn *dn_key;
2611         const char *dn_str;
2612         int ret, i;
2613         unsigned int j;
2614         struct dn_list *list;
2615         struct ldb_dn *dn = msg->dn;
2616         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2617
2618         ldb = ldb_module_get_ctx(module);
2619
2620         dn_str = ldb_dn_get_linearized(dn);
2621         if (dn_str == NULL) {
2622                 return LDB_ERR_OPERATIONS_ERROR;
2623         }
2624
2625         if (dn_str[0] == '@') {
2626                 return LDB_SUCCESS;
2627         }
2628
2629         dn_key = ltdb_index_key(ldb, ltdb,
2630                                 el->name, &el->values[v_idx],
2631                                 NULL, &truncation);
2632         /*
2633          * We ignore key truncation in ltdb_index_add1() so
2634          * match that by ignoring it here as well
2635          *
2636          * Multiple values are legitimate and accepted
2637          */
2638         if (!dn_key) {
2639                 return LDB_ERR_OPERATIONS_ERROR;
2640         }
2641
2642         list = talloc_zero(dn_key, struct dn_list);
2643         if (list == NULL) {
2644                 talloc_free(dn_key);
2645                 return LDB_ERR_OPERATIONS_ERROR;
2646         }
2647
2648         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2649         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2650                 /* it wasn't indexed. Did we have an earlier error? If we did then
2651                    its gone now */
2652                 talloc_free(dn_key);
2653                 return LDB_SUCCESS;
2654         }
2655
2656         if (ret != LDB_SUCCESS) {
2657                 talloc_free(dn_key);
2658                 return ret;
2659         }
2660
2661         /*
2662          * Find one of the values matching this message to remove
2663          */
2664         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2665         if (i == -1) {
2666                 /* nothing to delete */
2667                 talloc_free(dn_key);
2668                 return LDB_SUCCESS;
2669         }
2670
2671         j = (unsigned int) i;
2672         if (j != list->count - 1) {
2673                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2674         }
2675         list->count--;
2676         if (list->count == 0) {
2677                 talloc_free(list->dn);
2678                 list->dn = NULL;
2679         } else {
2680                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2681         }
2682
2683         ret = ltdb_dn_list_store(module, dn_key, list);
2684
2685         talloc_free(dn_key);
2686
2687         return ret;
2688 }
2689
2690 /*
2691   delete the index entries for a element
2692   return -1 on failure
2693 */
2694 int ltdb_index_del_element(struct ldb_module *module,
2695                            struct ltdb_private *ltdb,
2696                            const struct ldb_message *msg,
2697                            struct ldb_message_element *el)
2698 {
2699         const char *dn_str;
2700         int ret;
2701         unsigned int i;
2702
2703         if (!ltdb->cache->attribute_indexes) {
2704                 /* no indexed fields */
2705                 return LDB_SUCCESS;
2706         }
2707
2708         dn_str = ldb_dn_get_linearized(msg->dn);
2709         if (dn_str == NULL) {
2710                 return LDB_ERR_OPERATIONS_ERROR;
2711         }
2712
2713         if (dn_str[0] == '@') {
2714                 return LDB_SUCCESS;
2715         }
2716
2717         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2718                 return LDB_SUCCESS;
2719         }
2720         for (i = 0; i < el->num_values; i++) {
2721                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2722                 if (ret != LDB_SUCCESS) {
2723                         return ret;
2724                 }
2725         }
2726
2727         return LDB_SUCCESS;
2728 }
2729
2730 /*
2731   delete the index entries for a record
2732   return -1 on failure
2733 */
2734 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2735 {
2736         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2737         int ret;
2738         unsigned int i;
2739
2740         if (ldb_dn_is_special(msg->dn)) {
2741                 return LDB_SUCCESS;
2742         }
2743
2744         ret = ltdb_index_onelevel(module, msg, 0);
2745         if (ret != LDB_SUCCESS) {
2746                 return ret;
2747         }
2748
2749         ret = ltdb_write_index_dn_guid(module, msg, 0);
2750         if (ret != LDB_SUCCESS) {
2751                 return ret;
2752         }
2753
2754         if (!ltdb->cache->attribute_indexes) {
2755                 /* no indexed fields */
2756                 return LDB_SUCCESS;
2757         }
2758
2759         for (i = 0; i < msg->num_elements; i++) {
2760                 ret = ltdb_index_del_element(module, ltdb,
2761                                              msg, &msg->elements[i]);
2762                 if (ret != LDB_SUCCESS) {
2763                         return ret;
2764                 }
2765         }
2766
2767         return LDB_SUCCESS;
2768 }
2769
2770
2771 /*
2772   traversal function that deletes all @INDEX records in the in-memory
2773   TDB.
2774
2775   This does not touch the actual DB, that is done at transaction
2776   commit, which in turn greatly reduces DB churn as we will likely
2777   be able to do a direct update into the old record.
2778 */
2779 static int delete_index(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, void *state)
2780 {
2781         struct ldb_module *module = state;
2782         const char *dnstr = "DN=" LTDB_INDEX ":";
2783         struct dn_list list;
2784         struct ldb_dn *dn;
2785         struct ldb_val v;
2786         int ret;
2787
2788         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2789                 return 0;
2790         }
2791         /* we need to put a empty list in the internal tdb for this
2792          * index entry */
2793         list.dn = NULL;
2794         list.count = 0;
2795
2796         /* the offset of 3 is to remove the DN= prefix. */
2797         v.data = key.data + 3;
2798         v.length = strnlen((char *)key.data, key.length) - 3;
2799
2800         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2801
2802         /*
2803          * This does not actually touch the DB quite yet, just
2804          * the in-memory index cache
2805          */
2806         ret = ltdb_dn_list_store(module, dn, &list);
2807         if (ret != LDB_SUCCESS) {
2808                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2809                                        "Unable to store null index for %s\n",
2810                                                 ldb_dn_get_linearized(dn));
2811                 talloc_free(dn);
2812                 return -1;
2813         }
2814         talloc_free(dn);
2815         return 0;
2816 }
2817
2818 /*
2819   traversal function that adds @INDEX records during a re index TODO wrong comment
2820 */
2821 static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2822 {
2823         struct ldb_context *ldb;
2824         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2825         struct ldb_module *module = ctx->module;
2826         struct ldb_message *msg;
2827         unsigned int nb_elements_in_db;
2828         int ret;
2829         TDB_DATA key2;
2830         bool is_record;
2831         TDB_DATA key = {
2832                 .dptr = ldb_key.data,
2833                 .dsize = ldb_key.length
2834         };
2835         
2836         ldb = ldb_module_get_ctx(module);
2837
2838         if (key.dsize > 4 &&
2839             memcmp(key.dptr, "DN=@", 4) == 0) {
2840                 return 0;
2841         }
2842
2843         is_record = ltdb_key_is_record(key);
2844         if (is_record == false) {
2845                 return 0;
2846         }
2847         
2848         msg = ldb_msg_new(module);
2849         if (msg == NULL) {
2850                 return -1;
2851         }
2852
2853         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2854                                                    msg,
2855                                                    NULL, 0,
2856                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2857                                                    &nb_elements_in_db);
2858         if (ret != 0) {
2859                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2860                                                 ldb_dn_get_linearized(msg->dn));
2861                 ctx->error = ret;
2862                 talloc_free(msg);
2863                 return -1;
2864         }
2865
2866         if (msg->dn == NULL) {
2867                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2868                           "Refusing to re-index as GUID "
2869                           "key %*.*s with no DN\n",
2870                           (int)key.dsize, (int)key.dsize,
2871                           (char *)key.dptr);
2872                 talloc_free(msg);
2873                 return -1;
2874         }
2875         
2876         /* check if the DN key has changed, perhaps due to the case
2877            insensitivity of an element changing, or a change from DN
2878            to GUID keys */
2879         key2 = ltdb_key_msg(module, msg, msg);
2880         if (key2.dptr == NULL) {
2881                 /* probably a corrupt record ... darn */
2882                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2883                                                 ldb_dn_get_linearized(msg->dn));
2884                 talloc_free(msg);
2885                 return 0;
2886         }
2887         if (key.dsize != key2.dsize ||
2888             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2889                 struct ldb_val ldb_key2 = {
2890                         .data = key2.dptr,
2891                         .length = key2.dsize
2892                 };
2893                 ltdb->kv_ops->update_in_iterate(ltdb, ldb_key, ldb_key2, val, ctx);
2894         }
2895         talloc_free(key2.dptr);
2896
2897         talloc_free(msg);
2898
2899         ctx->count++;
2900         if (ctx->count % 10000 == 0) {
2901                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2902                           "Reindexing: re-keyed %u records so far",
2903                           ctx->count);
2904         }
2905
2906         return 0;
2907 }
2908
2909 /*
2910   traversal function that adds @INDEX records during a re index
2911 */
2912 static int re_index(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2913 {
2914         struct ldb_context *ldb;
2915         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2916         struct ldb_module *module = ctx->module;
2917         struct ldb_message *msg;
2918         unsigned int nb_elements_in_db;
2919         TDB_DATA key = {
2920                 .dptr = ldb_key.data,
2921                 .dsize = ldb_key.length
2922         };
2923         int ret;
2924         bool is_record;
2925         
2926         ldb = ldb_module_get_ctx(module);
2927
2928         if (key.dsize > 4 &&
2929             memcmp(key.dptr, "DN=@", 4) == 0) {
2930                 return 0;
2931         }
2932
2933         is_record = ltdb_key_is_record(key);
2934         if (is_record == false) {
2935                 return 0;
2936         }
2937         
2938         msg = ldb_msg_new(module);
2939         if (msg == NULL) {
2940                 return -1;
2941         }
2942
2943         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2944                                                    msg,
2945                                                    NULL, 0,
2946                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2947                                                    &nb_elements_in_db);
2948         if (ret != 0) {
2949                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2950                                                 ldb_dn_get_linearized(msg->dn));
2951                 ctx->error = ret;
2952                 talloc_free(msg);
2953                 return -1;
2954         }
2955
2956         if (msg->dn == NULL) {
2957                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2958                           "Refusing to re-index as GUID "
2959                           "key %*.*s with no DN\n",
2960                           (int)key.dsize, (int)key.dsize,
2961                           (char *)key.dptr);
2962                 talloc_free(msg);
2963                 return -1;
2964         }
2965
2966         ret = ltdb_index_onelevel(module, msg, 1);
2967         if (ret != LDB_SUCCESS) {
2968                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2969                           "Adding special ONE LEVEL index failed (%s)!",
2970                                                 ldb_dn_get_linearized(msg->dn));
2971                 talloc_free(msg);
2972                 return -1;
2973         }
2974
2975         ret = ltdb_index_add_all(module, ltdb, msg);
2976
2977         if (ret != LDB_SUCCESS) {
2978                 ctx->error = ret;
2979                 talloc_free(msg);
2980                 return -1;
2981         }
2982
2983         talloc_free(msg);
2984
2985         ctx->count++;
2986         if (ctx->count % 10000 == 0) {
2987                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2988                           "Reindexing: re-indexed %u records so far",
2989                           ctx->count);
2990         }
2991
2992         return 0;
2993 }
2994
2995 /*
2996   force a complete reindex of the database
2997 */
2998 int ltdb_reindex(struct ldb_module *module)
2999 {
3000         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
3001         int ret;
3002         struct ltdb_reindex_context ctx;
3003
3004         /*
3005          * Only triggered after a modification, but make clear we do
3006          * not re-index a read-only DB
3007          */
3008         if (ltdb->read_only) {
3009                 return LDB_ERR_UNWILLING_TO_PERFORM;
3010         }
3011
3012         if (ltdb_cache_reload(module) != 0) {
3013                 return LDB_ERR_OPERATIONS_ERROR;
3014         }
3015
3016         /*
3017          * Ensure we read (and so remove) the entries from the real
3018          * DB, no values stored so far are any use as we want to do a
3019          * re-index
3020          */
3021         ltdb_index_transaction_cancel(module);
3022
3023         ret = ltdb_index_transaction_start(module);
3024         if (ret != LDB_SUCCESS) {
3025                 return ret;
3026         }
3027
3028         /* first traverse the database deleting any @INDEX records by
3029          * putting NULL entries in the in-memory tdb
3030          */
3031         ret = ltdb->kv_ops->iterate(ltdb, delete_index, module);
3032         if (ret < 0) {
3033                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3034                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3035                                        ldb_errstring(ldb));
3036                 return LDB_ERR_OPERATIONS_ERROR;
3037         }
3038
3039         ctx.module = module;
3040         ctx.error = 0;
3041         ctx.count = 0;
3042
3043         ret = ltdb->kv_ops->iterate(ltdb, re_key, &ctx);
3044         if (ret < 0) {
3045                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3046                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3047                                        ldb_errstring(ldb));
3048                 return LDB_ERR_OPERATIONS_ERROR;
3049         }
3050
3051         if (ctx.error != LDB_SUCCESS) {
3052                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3053                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3054                 return ctx.error;
3055         }
3056
3057         ctx.error = 0;
3058         ctx.count = 0;
3059
3060         /* now traverse adding any indexes for normal LDB records */
3061         ret = ltdb->kv_ops->iterate(ltdb, re_index, &ctx);
3062         if (ret < 0) {
3063                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3064                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3065                                        ldb_errstring(ldb));
3066                 return LDB_ERR_OPERATIONS_ERROR;
3067         }
3068
3069         if (ctx.error != LDB_SUCCESS) {
3070                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3071                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3072                 return ctx.error;
3073         }
3074
3075         if (ctx.count > 10000) {
3076                 ldb_debug(ldb_module_get_ctx(module),
3077                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3078                           "final index write-out will be in transaction commit",
3079                           ltdb->kv_ops->name(ltdb));
3080         }
3081         return LDB_SUCCESS;
3082 }