ldb: Save a copy of the index result before calling the callbacks.
[kai/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ltdb_write_index_dn_guid(struct ldb_module *module,
174                                     const struct ldb_message *msg,
175                                     int add);
176 static int ltdb_index_dn_base_dn(struct ldb_module *module,
177                                  struct ltdb_private *ltdb,
178                                  struct ldb_dn *base_dn,
179                                  struct dn_list *dn_list,
180                                  enum key_truncation *truncation);
181
182 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
183                               struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
193         if (ltdb->max_key_length == 0){
194                 return UINT_MAX;
195         }
196         return ltdb->max_key_length;
197 }
198
199 /* enable the idxptr mode when transactions start */
200 int ltdb_index_transaction_start(struct ldb_module *module)
201 {
202         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
203         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
204         if (ltdb->idxptr == NULL) {
205                 return ldb_oom(ldb_module_get_ctx(module));
206         }
207
208         return LDB_SUCCESS;
209 }
210
211 /*
212   see if two ldb_val structures contain exactly the same data
213   return -1 or 1 for a mismatch, 0 for match
214 */
215 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
216                                          const struct ldb_val *v2)
217 {
218         if (v1->length > v2->length) {
219                 return -1;
220         }
221         if (v1->length < v2->length) {
222                 return 1;
223         }
224         return memcmp(v1->data, v2->data, v1->length);
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
232                                        const struct ldb_val *v2)
233 {
234         if (v1.length > v2->length) {
235                 return -1;
236         }
237         if (v1.length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1.data, v2->data, v1.length);
241 }
242
243
244 /*
245   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
246   binary-safe comparison for the 'dn' returns -1 if not found
247
248   This is therefore safe when the value is a GUID in the future
249  */
250 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
251                                  const struct dn_list *list,
252                                  const struct ldb_val *v)
253 {
254         unsigned int i;
255         struct ldb_val *exact = NULL, *next = NULL;
256
257         if (ltdb->cache->GUID_index_attribute == NULL) {
258                 for (i=0; i<list->count; i++) {
259                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
260                                 return i;
261                         }
262                 }
263                 return -1;
264         }
265
266         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
267                                 *v, ldb_val_equal_exact_ordered,
268                                 exact, next);
269         if (exact == NULL) {
270                 return -1;
271         }
272         /* Not required, but keeps the compiler quiet */
273         if (next != NULL) {
274                 return -1;
275         }
276
277         i = exact - list->dn;
278         return i;
279 }
280
281 /*
282   find a entry in a dn_list. Uses a case sensitive comparison with the dn
283   returns -1 if not found
284  */
285 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
286                                  struct dn_list *list,
287                                  const struct ldb_message *msg)
288 {
289         struct ldb_val v;
290         const struct ldb_val *key_val;
291         if (ltdb->cache->GUID_index_attribute == NULL) {
292                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
293                 v.data = discard_const_p(unsigned char, dn_str);
294                 v.length = strlen(dn_str);
295         } else {
296                 key_val = ldb_msg_find_ldb_val(msg,
297                                                ltdb->cache->GUID_index_attribute);
298                 if (key_val == NULL) {
299                         return -1;
300                 }
301                 v = *key_val;
302         }
303         return ltdb_dn_list_find_val(ltdb, list, &v);
304 }
305
306 /*
307   this is effectively a cast function, but with lots of paranoia
308   checks and also copes with CPUs that are fussy about pointer
309   alignment
310  */
311 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
312 {
313         struct dn_list *list;
314         if (rec.dsize != sizeof(void *)) {
315                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
316                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
317                 return NULL;
318         }
319         /* note that we can't just use a cast here, as rec.dptr may
320            not be aligned sufficiently for a pointer. A cast would cause
321            platforms like some ARM CPUs to crash */
322         memcpy(&list, rec.dptr, sizeof(void *));
323         list = talloc_get_type(list, struct dn_list);
324         if (list == NULL) {
325                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
326                                        "Bad type '%s' for idxptr",
327                                        talloc_get_name(list));
328                 return NULL;
329         }
330         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
331                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
332                                        "Bad parent '%s' for idxptr",
333                                        talloc_get_name(talloc_parent(list->dn)));
334                 return NULL;
335         }
336         return list;
337 }
338
339 /*
340   return the @IDX list in an index entry for a dn as a
341   struct dn_list
342  */
343 static int ltdb_dn_list_load(struct ldb_module *module,
344                              struct ltdb_private *ltdb,
345                              struct ldb_dn *dn, struct dn_list *list)
346 {
347         struct ldb_message *msg;
348         int ret, version;
349         struct ldb_message_element *el;
350         TDB_DATA rec;
351         struct dn_list *list2;
352         TDB_DATA key;
353
354         list->dn = NULL;
355         list->count = 0;
356
357         /* see if we have any in-memory index entries */
358         if (ltdb->idxptr == NULL ||
359             ltdb->idxptr->itdb == NULL) {
360                 goto normal_index;
361         }
362
363         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
364         key.dsize = strlen((char *)key.dptr);
365
366         rec = tdb_fetch(ltdb->idxptr->itdb, key);
367         if (rec.dptr == NULL) {
368                 goto normal_index;
369         }
370
371         /* we've found an in-memory index entry */
372         list2 = ltdb_index_idxptr(module, rec, true);
373         if (list2 == NULL) {
374                 free(rec.dptr);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         free(rec.dptr);
378
379         *list = *list2;
380         return LDB_SUCCESS;
381
382 normal_index:
383         msg = ldb_msg_new(list);
384         if (msg == NULL) {
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_search_dn1(module, dn, msg,
389                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
390                               |LDB_UNPACK_DATA_FLAG_NO_DN);
391         if (ret != LDB_SUCCESS) {
392                 talloc_free(msg);
393                 return ret;
394         }
395
396         el = ldb_msg_find_element(msg, LTDB_IDX);
397         if (!el) {
398                 talloc_free(msg);
399                 return LDB_SUCCESS;
400         }
401
402         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
403
404         /*
405          * we avoid copying the strings by stealing the list.  We have
406          * to steal msg onto el->values (which looks odd) because we
407          * asked for the memory to be allocated on msg, not on each
408          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
409          */
410         if (ltdb->cache->GUID_index_attribute == NULL) {
411                 /* check indexing version number */
412                 if (version != LTDB_INDEXING_VERSION) {
413                         ldb_debug_set(ldb_module_get_ctx(module),
414                                       LDB_DEBUG_ERROR,
415                                       "Wrong DN index version %d "
416                                       "expected %d for %s",
417                                       version, LTDB_INDEXING_VERSION,
418                                       ldb_dn_get_linearized(dn));
419                         return LDB_ERR_OPERATIONS_ERROR;
420                 }
421
422                 talloc_steal(el->values, msg);
423                 list->dn = talloc_steal(list, el->values);
424                 list->count = el->num_values;
425         } else {
426                 unsigned int i;
427                 if (version != LTDB_GUID_INDEXING_VERSION) {
428                         /* This is quite likely during the DB startup
429                            on first upgrade to using a GUID index */
430                         ldb_debug_set(ldb_module_get_ctx(module),
431                                       LDB_DEBUG_ERROR,
432                                       "Wrong GUID index version %d "
433                                       "expected %d for %s",
434                                       version, LTDB_GUID_INDEXING_VERSION,
435                                       ldb_dn_get_linearized(dn));
436                         return LDB_ERR_OPERATIONS_ERROR;
437                 }
438
439                 if (el->num_values == 0) {
440                         return LDB_ERR_OPERATIONS_ERROR;
441                 }
442
443                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 list->count = el->values[0].length / LTDB_GUID_SIZE;
448                 list->dn = talloc_array(list, struct ldb_val, list->count);
449
450                 /*
451                  * The actual data is on msg, due to
452                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
453                  */
454                 talloc_steal(list->dn, msg);
455                 for (i = 0; i < list->count; i++) {
456                         list->dn[i].data
457                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
458                         list->dn[i].length = LTDB_GUID_SIZE;
459                 }
460         }
461
462         /* We don't need msg->elements any more */
463         talloc_free(msg->elements);
464         return LDB_SUCCESS;
465 }
466
467 int ltdb_key_dn_from_idx(struct ldb_module *module,
468                          struct ltdb_private *ltdb,
469                          TALLOC_CTX *mem_ctx,
470                          struct ldb_dn *dn,
471                          TDB_DATA *tdb_key)
472 {
473         struct ldb_context *ldb = ldb_module_get_ctx(module);
474         int ret;
475         int index = 0;
476         enum key_truncation truncation = KEY_NOT_TRUNCATED;
477         struct dn_list *list = talloc(mem_ctx, struct dn_list);
478         if (list == NULL) {
479                 ldb_oom(ldb);
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483
484         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
485         if (ret != LDB_SUCCESS) {
486                 TALLOC_FREE(list);
487                 return ret;
488         }
489
490         if (list->count == 0) {
491                 TALLOC_FREE(list);
492                 return LDB_ERR_NO_SUCH_OBJECT;
493         }
494
495         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
496                 const char *dn_str = ldb_dn_get_linearized(dn);
497                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
498                                        __location__
499                                        ": Failed to read DN index "
500                                        "against %s for %s: too many "
501                                        "values (%u > 1)",
502                                        ltdb->cache->GUID_index_attribute,
503                                        dn_str, list->count);
504                 TALLOC_FREE(list);
505                 return LDB_ERR_CONSTRAINT_VIOLATION;
506         }
507
508         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
509                 /*
510                  * DN key has been truncated, need to inspect the actual
511                  * records to locate the actual DN
512                  */
513                 int i;
514                 index = -1;
515                 for (i=0; i < list->count; i++) {
516                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
517                         TDB_DATA key = {
518                                 .dptr = guid_key,
519                                 .dsize = sizeof(guid_key)
520                         };
521                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
522                         struct ldb_message *rec = ldb_msg_new(ldb);
523                         if (rec == NULL) {
524                                 return LDB_ERR_OPERATIONS_ERROR;
525                         }
526
527                         ret = ltdb_idx_to_key(module, ltdb,
528                                               ldb, &list->dn[i],
529                                               &key);
530                         if (ret != LDB_SUCCESS) {
531                                 TALLOC_FREE(list);
532                                 TALLOC_FREE(rec);
533                                 return ret;
534                         }
535
536                         ret = ltdb_search_key(module, ltdb, key,
537                                               rec, flags);
538                         if (key.dptr != guid_key) {
539                                 TALLOC_FREE(key.dptr);
540                         }
541                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
542                                 /*
543                                  * the record has disappeared?
544                                  * yes, this can happen
545                                  */
546                                 TALLOC_FREE(rec);
547                                 continue;
548                         }
549
550                         if (ret != LDB_SUCCESS) {
551                                 /* an internal error */
552                                 TALLOC_FREE(rec);
553                                 TALLOC_FREE(list);
554                                 return LDB_ERR_OPERATIONS_ERROR;
555                         }
556
557                         /*
558                          * We found the actual DN that we wanted from in the
559                          * multiple values that matched the index
560                          * (due to truncation), so return that.
561                          *
562                          */
563                         if (ldb_dn_compare(dn, rec->dn) == 0) {
564                                 index = i;
565                                 TALLOC_FREE(rec);
566                                 break;
567                         }
568                 }
569
570                 /*
571                  * We matched the index but the actual DN we wanted
572                  * was not here.
573                  */
574                 if (index == -1) {
575                         TALLOC_FREE(list);
576                         return LDB_ERR_NO_SUCH_OBJECT;
577                 }
578         }
579
580         /* The tdb_key memory is allocated by the caller */
581         ret = ltdb_guid_to_key(module, ltdb,
582                                &list->dn[index], tdb_key);
583         TALLOC_FREE(list);
584
585         if (ret != LDB_SUCCESS) {
586                 return LDB_ERR_OPERATIONS_ERROR;
587         }
588
589         return LDB_SUCCESS;
590 }
591
592
593
594 /*
595   save a dn_list into a full @IDX style record
596  */
597 static int ltdb_dn_list_store_full(struct ldb_module *module,
598                                    struct ltdb_private *ltdb,
599                                    struct ldb_dn *dn,
600                                    struct dn_list *list)
601 {
602         struct ldb_message *msg;
603         int ret;
604
605         msg = ldb_msg_new(module);
606         if (!msg) {
607                 return ldb_module_oom(module);
608         }
609
610         msg->dn = dn;
611
612         if (list->count == 0) {
613                 ret = ltdb_delete_noindex(module, msg);
614                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
615                         talloc_free(msg);
616                         return LDB_SUCCESS;
617                 }
618                 return ret;
619         }
620
621         if (ltdb->cache->GUID_index_attribute == NULL) {
622                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
623                                       LTDB_INDEXING_VERSION);
624                 if (ret != LDB_SUCCESS) {
625                         talloc_free(msg);
626                         return ldb_module_oom(module);
627                 }
628         } else {
629                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
630                                       LTDB_GUID_INDEXING_VERSION);
631                 if (ret != LDB_SUCCESS) {
632                         talloc_free(msg);
633                         return ldb_module_oom(module);
634                 }
635         }
636
637         if (list->count > 0) {
638                 struct ldb_message_element *el;
639
640                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
641                 if (ret != LDB_SUCCESS) {
642                         talloc_free(msg);
643                         return ldb_module_oom(module);
644                 }
645
646                 if (ltdb->cache->GUID_index_attribute == NULL) {
647                         el->values = list->dn;
648                         el->num_values = list->count;
649                 } else {
650                         struct ldb_val v;
651                         unsigned int i;
652                         el->values = talloc_array(msg,
653                                                   struct ldb_val, 1);
654                         if (el->values == NULL) {
655                                 talloc_free(msg);
656                                 return ldb_module_oom(module);
657                         }
658
659                         v.data = talloc_array_size(el->values,
660                                                    list->count,
661                                                    LTDB_GUID_SIZE);
662                         if (v.data == NULL) {
663                                 talloc_free(msg);
664                                 return ldb_module_oom(module);
665                         }
666
667                         v.length = talloc_get_size(v.data);
668
669                         for (i = 0; i < list->count; i++) {
670                                 if (list->dn[i].length !=
671                                     LTDB_GUID_SIZE) {
672                                         talloc_free(msg);
673                                         return ldb_module_operr(module);
674                                 }
675                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
676                                        list->dn[i].data,
677                                        LTDB_GUID_SIZE);
678                         }
679                         el->values[0] = v;
680                         el->num_values = 1;
681                 }
682         }
683
684         ret = ltdb_store(module, msg, TDB_REPLACE);
685         talloc_free(msg);
686         return ret;
687 }
688
689 /*
690   save a dn_list into the database, in either @IDX or internal format
691  */
692 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
693                               struct dn_list *list)
694 {
695         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
696         TDB_DATA rec, key;
697         int ret;
698         struct dn_list *list2;
699
700         if (ltdb->idxptr == NULL) {
701                 return ltdb_dn_list_store_full(module, ltdb,
702                                                dn, list);
703         }
704
705         if (ltdb->idxptr->itdb == NULL) {
706                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
707                 if (ltdb->idxptr->itdb == NULL) {
708                         return LDB_ERR_OPERATIONS_ERROR;
709                 }
710         }
711
712         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
713         key.dsize = strlen((char *)key.dptr);
714
715         rec = tdb_fetch(ltdb->idxptr->itdb, key);
716         if (rec.dptr != NULL) {
717                 list2 = ltdb_index_idxptr(module, rec, false);
718                 if (list2 == NULL) {
719                         free(rec.dptr);
720                         return LDB_ERR_OPERATIONS_ERROR;
721                 }
722                 free(rec.dptr);
723                 list2->dn = talloc_steal(list2, list->dn);
724                 list2->count = list->count;
725                 return LDB_SUCCESS;
726         }
727
728         list2 = talloc(ltdb->idxptr, struct dn_list);
729         if (list2 == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         list2->dn = talloc_steal(list2, list->dn);
733         list2->count = list->count;
734
735         rec.dptr = (uint8_t *)&list2;
736         rec.dsize = sizeof(void *);
737
738
739         /*
740          * This is not a store into the main DB, but into an in-memory
741          * TDB, so we don't need a guard on ltdb->read_only
742          */
743         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
744         if (ret != 0) {
745                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
746         }
747         return LDB_SUCCESS;
748 }
749
750 /*
751   traverse function for storing the in-memory index entries on disk
752  */
753 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
754 {
755         struct ldb_module *module = state;
756         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
757         struct ldb_dn *dn;
758         struct ldb_context *ldb = ldb_module_get_ctx(module);
759         struct ldb_val v;
760         struct dn_list *list;
761
762         list = ltdb_index_idxptr(module, data, true);
763         if (list == NULL) {
764                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
765                 return -1;
766         }
767
768         v.data = key.dptr;
769         v.length = strnlen((char *)key.dptr, key.dsize);
770
771         dn = ldb_dn_from_ldb_val(module, ldb, &v);
772         if (dn == NULL) {
773                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
774                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
775                 return -1;
776         }
777
778         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
779                                                       dn, list);
780         talloc_free(dn);
781         if (ltdb->idxptr->error != 0) {
782                 return -1;
783         }
784         return 0;
785 }
786
787 /* cleanup the idxptr mode when transaction commits */
788 int ltdb_index_transaction_commit(struct ldb_module *module)
789 {
790         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
791         int ret;
792
793         struct ldb_context *ldb = ldb_module_get_ctx(module);
794
795         ldb_reset_err_string(ldb);
796
797         if (ltdb->idxptr->itdb) {
798                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
799                 tdb_close(ltdb->idxptr->itdb);
800         }
801
802         ret = ltdb->idxptr->error;
803         if (ret != LDB_SUCCESS) {
804                 if (!ldb_errstring(ldb)) {
805                         ldb_set_errstring(ldb, ldb_strerror(ret));
806                 }
807                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
808         }
809
810         talloc_free(ltdb->idxptr);
811         ltdb->idxptr = NULL;
812         return ret;
813 }
814
815 /* cleanup the idxptr mode when transaction cancels */
816 int ltdb_index_transaction_cancel(struct ldb_module *module)
817 {
818         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
819         if (ltdb->idxptr && ltdb->idxptr->itdb) {
820                 tdb_close(ltdb->idxptr->itdb);
821         }
822         talloc_free(ltdb->idxptr);
823         ltdb->idxptr = NULL;
824         return LDB_SUCCESS;
825 }
826
827
828 /*
829   return the dn key to be used for an index
830   the caller is responsible for freeing
831 */
832 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
833                                      struct ltdb_private *ltdb,
834                                      const char *attr, const struct ldb_val *value,
835                                      const struct ldb_schema_attribute **ap,
836                                      enum key_truncation *truncation)
837 {
838         struct ldb_dn *ret;
839         struct ldb_val v;
840         const struct ldb_schema_attribute *a = NULL;
841         char *attr_folded = NULL;
842         const char *attr_for_dn = NULL;
843         int r;
844         bool should_b64_encode;
845
846         unsigned int max_key_length = ltdb_max_key_length(ltdb);
847         size_t key_len = 0;
848         size_t attr_len = 0;
849         const size_t indx_len = sizeof(LTDB_INDEX) - 1;
850         unsigned frmt_len = 0;
851         const size_t additional_key_length = 4;
852         unsigned int num_separators = 3; /* Estimate for overflow check */
853         const size_t min_data = 1;
854         const size_t min_key_length = additional_key_length
855                 + indx_len + num_separators + min_data;
856
857         if (attr[0] == '@') {
858                 attr_for_dn = attr;
859                 v = *value;
860                 if (ap != NULL) {
861                         *ap = NULL;
862                 }
863         } else {
864                 attr_folded = ldb_attr_casefold(ldb, attr);
865                 if (!attr_folded) {
866                         return NULL;
867                 }
868
869                 attr_for_dn = attr_folded;
870
871                 a = ldb_schema_attribute_by_name(ldb, attr);
872                 if (ap) {
873                         *ap = a;
874                 }
875                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
876                 if (r != LDB_SUCCESS) {
877                         const char *errstr = ldb_errstring(ldb);
878                         /* canonicalisation can be refused. For
879                            example, a attribute that takes wildcards
880                            will refuse to canonicalise if the value
881                            contains a wildcard */
882                         ldb_asprintf_errstring(ldb,
883                                                "Failed to create index "
884                                                "key for attribute '%s':%s%s%s",
885                                                attr, ldb_strerror(r),
886                                                (errstr?":":""),
887                                                (errstr?errstr:""));
888                         talloc_free(attr_folded);
889                         return NULL;
890                 }
891         }
892         attr_len = strlen(attr_for_dn);
893
894         /*
895          * Check if there is any hope this will fit into the DB.
896          * Overflow here is not actually critical the code below
897          * checks again to make the printf and the DB does another
898          * check for too long keys
899          */
900         if (max_key_length - attr_len < min_key_length) {
901                 ldb_asprintf_errstring(
902                         ldb,
903                         __location__ ": max_key_length "
904                         "is too small (%u) < (%u)",
905                         max_key_length,
906                         (unsigned)(min_key_length + attr_len));
907                 talloc_free(attr_folded);
908                 return NULL;
909         }
910
911         /*
912          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
913          * "DN=" and a trailing string terminator
914          */
915         max_key_length -= additional_key_length;
916
917         /*
918          * We do not base 64 encode a DN in a key, it has already been
919          * casefold and lineraized, that is good enough.  That already
920          * avoids embedded NUL etc.
921          */
922         if (ltdb->cache->GUID_index_attribute != NULL) {
923                 if (strcmp(attr, LTDB_IDXDN) == 0) {
924                         should_b64_encode = false;
925                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
926                         /*
927                          * We can only change the behaviour for IDXONE
928                          * when the GUID index is enabled
929                          */
930                         should_b64_encode = false;
931                 } else {
932                         should_b64_encode
933                                 = ldb_should_b64_encode(ldb, &v);
934                 }
935         } else {
936                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
937         }
938
939         if (should_b64_encode) {
940                 size_t vstr_len = 0;
941                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
942                 if (!vstr) {
943                         talloc_free(attr_folded);
944                         return NULL;
945                 }
946                 vstr_len = strlen(vstr);
947                 /* 
948                  * Overflow here is not critical as we only use this
949                  * to choose the printf truncation
950                  */
951                 key_len = num_separators + indx_len + attr_len + vstr_len;
952                 if (key_len > max_key_length) {
953                         size_t excess = key_len - max_key_length;
954                         frmt_len = vstr_len - excess;
955                         *truncation = KEY_TRUNCATED;
956                         /*
957                         * Truncated keys are placed in a separate key space
958                         * from the non truncated keys
959                         * Note: the double hash "##" is not a typo and
960                         * indicates that the following value is base64 encoded
961                         */
962                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
963                                              LTDB_INDEX, attr_for_dn,
964                                              frmt_len, vstr);
965                 } else {
966                         frmt_len = vstr_len;
967                         *truncation = KEY_NOT_TRUNCATED;
968                         /*
969                          * Note: the double colon "::" is not a typo and
970                          * indicates that the following value is base64 encoded
971                          */
972                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
973                                              LTDB_INDEX, attr_for_dn,
974                                              frmt_len, vstr);
975                 }
976                 talloc_free(vstr);
977         } else {
978                 /* Only need two seperators */
979                 num_separators = 2;
980
981                 /*
982                  * Overflow here is not critical as we only use this
983                  * to choose the printf truncation
984                  */
985                 key_len = num_separators + indx_len + attr_len + (int)v.length;
986                 if (key_len > max_key_length) {
987                         size_t excess = key_len - max_key_length;
988                         frmt_len = v.length - excess;
989                         *truncation = KEY_TRUNCATED;
990                         /*
991                          * Truncated keys are placed in a separate key space
992                          * from the non truncated keys
993                          */
994                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
995                                              LTDB_INDEX, attr_for_dn,
996                                              frmt_len, (char *)v.data);
997                 } else {
998                         frmt_len = v.length;
999                         *truncation = KEY_NOT_TRUNCATED;
1000                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1001                                              LTDB_INDEX, attr_for_dn,
1002                                              frmt_len, (char *)v.data);
1003                 }
1004         }
1005
1006         if (v.data != value->data) {
1007                 talloc_free(v.data);
1008         }
1009         talloc_free(attr_folded);
1010
1011         return ret;
1012 }
1013
1014 /*
1015   see if a attribute value is in the list of indexed attributes
1016 */
1017 static bool ltdb_is_indexed(struct ldb_module *module,
1018                             struct ltdb_private *ltdb,
1019                             const char *attr)
1020 {
1021         struct ldb_context *ldb = ldb_module_get_ctx(module);
1022         unsigned int i;
1023         struct ldb_message_element *el;
1024
1025         if ((ltdb->cache->GUID_index_attribute != NULL) &&
1026             (ldb_attr_cmp(attr,
1027                           ltdb->cache->GUID_index_attribute) == 0)) {
1028                 /* Implicity covered, this is the index key */
1029                 return false;
1030         }
1031         if (ldb->schema.index_handler_override) {
1032                 const struct ldb_schema_attribute *a
1033                         = ldb_schema_attribute_by_name(ldb, attr);
1034
1035                 if (a == NULL) {
1036                         return false;
1037                 }
1038
1039                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1040                         return true;
1041                 } else {
1042                         return false;
1043                 }
1044         }
1045
1046         if (!ltdb->cache->attribute_indexes) {
1047                 return false;
1048         }
1049
1050         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
1051         if (el == NULL) {
1052                 return false;
1053         }
1054
1055         /* TODO: this is too expensive! At least use a binary search */
1056         for (i=0; i<el->num_values; i++) {
1057                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1058                         return true;
1059                 }
1060         }
1061         return false;
1062 }
1063
1064 /*
1065   in the following logic functions, the return value is treated as
1066   follows:
1067
1068      LDB_SUCCESS: we found some matching index values
1069
1070      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1071
1072      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1073                                we'll need a full search
1074  */
1075
1076 /*
1077   return a list of dn's that might match a simple indexed search (an
1078   equality search only)
1079  */
1080 static int ltdb_index_dn_simple(struct ldb_module *module,
1081                                 struct ltdb_private *ltdb,
1082                                 const struct ldb_parse_tree *tree,
1083                                 struct dn_list *list)
1084 {
1085         struct ldb_context *ldb;
1086         struct ldb_dn *dn;
1087         int ret;
1088         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1089
1090         ldb = ldb_module_get_ctx(module);
1091
1092         list->count = 0;
1093         list->dn = NULL;
1094
1095         /* if the attribute isn't in the list of indexed attributes then
1096            this node needs a full search */
1097         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
1098                 return LDB_ERR_OPERATIONS_ERROR;
1099         }
1100
1101         /* the attribute is indexed. Pull the list of DNs that match the
1102            search criterion */
1103         dn = ltdb_index_key(ldb, ltdb,
1104                             tree->u.equality.attr,
1105                             &tree->u.equality.value, NULL, &truncation);
1106         /*
1107          * We ignore truncation here and allow multi-valued matches
1108          * as ltdb_search_indexed will filter out the wrong one in
1109          * ltdb_index_filter() which calls ldb_match_message().
1110          */
1111         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1112
1113         ret = ltdb_dn_list_load(module, ltdb, dn, list);
1114         talloc_free(dn);
1115         return ret;
1116 }
1117
1118
1119 static bool list_union(struct ldb_context *ldb,
1120                        struct ltdb_private *ltdb,
1121                        struct dn_list *list, struct dn_list *list2);
1122
1123 /*
1124   return a list of dn's that might match a leaf indexed search
1125  */
1126 static int ltdb_index_dn_leaf(struct ldb_module *module,
1127                               struct ltdb_private *ltdb,
1128                               const struct ldb_parse_tree *tree,
1129                               struct dn_list *list)
1130 {
1131         if (ltdb->disallow_dn_filter &&
1132             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1133                 /* in AD mode we do not support "(dn=...)" search filters */
1134                 list->dn = NULL;
1135                 list->count = 0;
1136                 return LDB_SUCCESS;
1137         }
1138         if (tree->u.equality.attr[0] == '@') {
1139                 /* Do not allow a indexed search against an @ */
1140                 list->dn = NULL;
1141                 list->count = 0;
1142                 return LDB_SUCCESS;
1143         }
1144         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1145                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1146                 struct ldb_dn *dn
1147                         = ldb_dn_from_ldb_val(list,
1148                                               ldb_module_get_ctx(module),
1149                                               &tree->u.equality.value);
1150                 if (dn == NULL) {
1151                         /* If we can't parse it, no match */
1152                         list->dn = NULL;
1153                         list->count = 0;
1154                         return LDB_SUCCESS;
1155                 }
1156
1157                 /*
1158                  * Re-use the same code we use for a SCOPE_BASE
1159                  * search
1160                  *
1161                  * We can't call TALLOC_FREE(dn) as this must belong
1162                  * to list for the memory to remain valid.
1163                  */
1164                 return ltdb_index_dn_base_dn(module, ltdb, dn, list,
1165                                              &truncation);
1166                 /*
1167                  * We ignore truncation here and allow multi-valued matches
1168                  * as ltdb_search_indexed will filter out the wrong one in
1169                  * ltdb_index_filter() which calls ldb_match_message().
1170                  */
1171
1172         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
1173                    (ldb_attr_cmp(tree->u.equality.attr,
1174                                  ltdb->cache->GUID_index_attribute) == 0)) {
1175                 int ret;
1176                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1177                 list->dn = talloc_array(list, struct ldb_val, 1);
1178                 if (list->dn == NULL) {
1179                         ldb_module_oom(module);
1180                         return LDB_ERR_OPERATIONS_ERROR;
1181                 }
1182                 /*
1183                  * We need to go via the canonicalise_fn() to
1184                  * ensure we get the index in binary, rather
1185                  * than a string
1186                  */
1187                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
1188                                                                list->dn,
1189                                                                &tree->u.equality.value,
1190                                                                &list->dn[0]);
1191                 if (ret != LDB_SUCCESS) {
1192                         return LDB_ERR_OPERATIONS_ERROR;
1193                 }
1194                 list->count = 1;
1195                 return LDB_SUCCESS;
1196         }
1197
1198         return ltdb_index_dn_simple(module, ltdb, tree, list);
1199 }
1200
1201
1202 /*
1203   list intersection
1204   list = list & list2
1205 */
1206 static bool list_intersect(struct ldb_context *ldb,
1207                            struct ltdb_private *ltdb,
1208                            struct dn_list *list, const struct dn_list *list2)
1209 {
1210         const struct dn_list *short_list, *long_list;
1211         struct dn_list *list3;
1212         unsigned int i;
1213
1214         if (list->count == 0) {
1215                 /* 0 & X == 0 */
1216                 return true;
1217         }
1218         if (list2->count == 0) {
1219                 /* X & 0 == 0 */
1220                 list->count = 0;
1221                 list->dn = NULL;
1222                 return true;
1223         }
1224
1225         /* the indexing code is allowed to return a longer list than
1226            what really matches, as all results are filtered by the
1227            full expression at the end - this shortcut avoids a lot of
1228            work in some cases */
1229         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1230                 return true;
1231         }
1232         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1233                 list->count = list2->count;
1234                 list->dn = list2->dn;
1235                 /* note that list2 may not be the parent of list2->dn,
1236                    as list2->dn may be owned by ltdb->idxptr. In that
1237                    case we expect this reparent call to fail, which is
1238                    OK */
1239                 talloc_reparent(list2, list, list2->dn);
1240                 return true;
1241         }
1242
1243         if (list->count > list2->count) {
1244                 short_list = list2;
1245                 long_list = list;
1246         } else {
1247                 short_list = list;
1248                 long_list = list2;
1249         }
1250
1251         list3 = talloc_zero(list, struct dn_list);
1252         if (list3 == NULL) {
1253                 return false;
1254         }
1255
1256         list3->dn = talloc_array(list3, struct ldb_val,
1257                                  MIN(list->count, list2->count));
1258         if (!list3->dn) {
1259                 talloc_free(list3);
1260                 return false;
1261         }
1262         list3->count = 0;
1263
1264         for (i=0;i<short_list->count;i++) {
1265                 /* For the GUID index case, this is a binary search */
1266                 if (ltdb_dn_list_find_val(ltdb, long_list,
1267                                           &short_list->dn[i]) != -1) {
1268                         list3->dn[list3->count] = short_list->dn[i];
1269                         list3->count++;
1270                 }
1271         }
1272
1273         list->strict |= list2->strict;
1274         list->dn = talloc_steal(list, list3->dn);
1275         list->count = list3->count;
1276         talloc_free(list3);
1277
1278         return true;
1279 }
1280
1281
1282 /*
1283   list union
1284   list = list | list2
1285 */
1286 static bool list_union(struct ldb_context *ldb,
1287                        struct ltdb_private *ltdb,
1288                        struct dn_list *list, struct dn_list *list2)
1289 {
1290         struct ldb_val *dn3;
1291         unsigned int i = 0, j = 0, k = 0;
1292
1293         if (list2->count == 0) {
1294                 /* X | 0 == X */
1295                 return true;
1296         }
1297
1298         if (list->count == 0) {
1299                 /* 0 | X == X */
1300                 list->count = list2->count;
1301                 list->dn = list2->dn;
1302                 /* note that list2 may not be the parent of list2->dn,
1303                    as list2->dn may be owned by ltdb->idxptr. In that
1304                    case we expect this reparent call to fail, which is
1305                    OK */
1306                 talloc_reparent(list2, list, list2->dn);
1307                 return true;
1308         }
1309
1310         /*
1311          * Sort the lists (if not in GUID DN mode) so we can do
1312          * the de-duplication during the merge
1313          */
1314         ltdb_dn_list_sort(ltdb, list);
1315         ltdb_dn_list_sort(ltdb, list2);
1316
1317         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1318         if (!dn3) {
1319                 ldb_oom(ldb);
1320                 return false;
1321         }
1322
1323         while (i < list->count || j < list2->count) {
1324                 int cmp;
1325                 if (i >= list->count) {
1326                         cmp = 1;
1327                 } else if (j >= list2->count) {
1328                         cmp = -1;
1329                 } else {
1330                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1331                                                           &list2->dn[j]);
1332                 }
1333
1334                 if (cmp < 0) {
1335                         /* Take list */
1336                         dn3[k] = list->dn[i];
1337                         i++;
1338                         k++;
1339                 } else if (cmp > 0) {
1340                         /* Take list2 */
1341                         dn3[k] = list2->dn[j];
1342                         j++;
1343                         k++;
1344                 } else {
1345                         /* Equal, take list */
1346                         dn3[k] = list->dn[i];
1347                         i++;
1348                         j++;
1349                         k++;
1350                 }
1351         }
1352
1353         list->dn = dn3;
1354         list->count = k;
1355
1356         return true;
1357 }
1358
1359 static int ltdb_index_dn(struct ldb_module *module,
1360                          struct ltdb_private *ltdb,
1361                          const struct ldb_parse_tree *tree,
1362                          struct dn_list *list);
1363
1364
1365 /*
1366   process an OR list (a union)
1367  */
1368 static int ltdb_index_dn_or(struct ldb_module *module,
1369                             struct ltdb_private *ltdb,
1370                             const struct ldb_parse_tree *tree,
1371                             struct dn_list *list)
1372 {
1373         struct ldb_context *ldb;
1374         unsigned int i;
1375
1376         ldb = ldb_module_get_ctx(module);
1377
1378         list->dn = NULL;
1379         list->count = 0;
1380
1381         for (i=0; i<tree->u.list.num_elements; i++) {
1382                 struct dn_list *list2;
1383                 int ret;
1384
1385                 list2 = talloc_zero(list, struct dn_list);
1386                 if (list2 == NULL) {
1387                         return LDB_ERR_OPERATIONS_ERROR;
1388                 }
1389
1390                 ret = ltdb_index_dn(module, ltdb,
1391                                     tree->u.list.elements[i], list2);
1392
1393                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1394                         /* X || 0 == X */
1395                         talloc_free(list2);
1396                         continue;
1397                 }
1398
1399                 if (ret != LDB_SUCCESS) {
1400                         /* X || * == * */
1401                         talloc_free(list2);
1402                         return ret;
1403                 }
1404
1405                 if (!list_union(ldb, ltdb, list, list2)) {
1406                         talloc_free(list2);
1407                         return LDB_ERR_OPERATIONS_ERROR;
1408                 }
1409         }
1410
1411         if (list->count == 0) {
1412                 return LDB_ERR_NO_SUCH_OBJECT;
1413         }
1414
1415         return LDB_SUCCESS;
1416 }
1417
1418
1419 /*
1420   NOT an index results
1421  */
1422 static int ltdb_index_dn_not(struct ldb_module *module,
1423                              struct ltdb_private *ltdb,
1424                              const struct ldb_parse_tree *tree,
1425                              struct dn_list *list)
1426 {
1427         /* the only way to do an indexed not would be if we could
1428            negate the not via another not or if we knew the total
1429            number of database elements so we could know that the
1430            existing expression covered the whole database.
1431
1432            instead, we just give up, and rely on a full index scan
1433            (unless an outer & manages to reduce the list)
1434         */
1435         return LDB_ERR_OPERATIONS_ERROR;
1436 }
1437
1438 /*
1439  * These things are unique, so avoid a full scan if this is a search
1440  * by GUID, DN or a unique attribute
1441  */
1442 static bool ltdb_index_unique(struct ldb_context *ldb,
1443                               struct ltdb_private *ltdb,
1444                               const char *attr)
1445 {
1446         const struct ldb_schema_attribute *a;
1447         if (ltdb->cache->GUID_index_attribute != NULL) {
1448                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1449                         return true;
1450                 }
1451         }
1452         if (ldb_attr_dn(attr) == 0) {
1453                 return true;
1454         }
1455
1456         a = ldb_schema_attribute_by_name(ldb, attr);
1457         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1458                 return true;
1459         }
1460         return false;
1461 }
1462
1463 /*
1464   process an AND expression (intersection)
1465  */
1466 static int ltdb_index_dn_and(struct ldb_module *module,
1467                              struct ltdb_private *ltdb,
1468                              const struct ldb_parse_tree *tree,
1469                              struct dn_list *list)
1470 {
1471         struct ldb_context *ldb;
1472         unsigned int i;
1473         bool found;
1474
1475         ldb = ldb_module_get_ctx(module);
1476
1477         list->dn = NULL;
1478         list->count = 0;
1479
1480         /* in the first pass we only look for unique simple
1481            equality tests, in the hope of avoiding having to look
1482            at any others */
1483         for (i=0; i<tree->u.list.num_elements; i++) {
1484                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1485                 int ret;
1486
1487                 if (subtree->operation != LDB_OP_EQUALITY ||
1488                     !ltdb_index_unique(ldb, ltdb,
1489                                        subtree->u.equality.attr)) {
1490                         continue;
1491                 }
1492
1493                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1494                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1495                         /* 0 && X == 0 */
1496                         return LDB_ERR_NO_SUCH_OBJECT;
1497                 }
1498                 if (ret == LDB_SUCCESS) {
1499                         /* a unique index match means we can
1500                          * stop. Note that we don't care if we return
1501                          * a few too many objects, due to later
1502                          * filtering */
1503                         return LDB_SUCCESS;
1504                 }
1505         }
1506
1507         /* now do a full intersection */
1508         found = false;
1509
1510         for (i=0; i<tree->u.list.num_elements; i++) {
1511                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1512                 struct dn_list *list2;
1513                 int ret;
1514
1515                 list2 = talloc_zero(list, struct dn_list);
1516                 if (list2 == NULL) {
1517                         return ldb_module_oom(module);
1518                 }
1519
1520                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1521
1522                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1523                         /* X && 0 == 0 */
1524                         list->dn = NULL;
1525                         list->count = 0;
1526                         talloc_free(list2);
1527                         return LDB_ERR_NO_SUCH_OBJECT;
1528                 }
1529
1530                 if (ret != LDB_SUCCESS) {
1531                         /* this didn't adding anything */
1532                         talloc_free(list2);
1533                         continue;
1534                 }
1535
1536                 if (!found) {
1537                         talloc_reparent(list2, list, list->dn);
1538                         list->dn = list2->dn;
1539                         list->count = list2->count;
1540                         found = true;
1541                 } else if (!list_intersect(ldb, ltdb,
1542                                            list, list2)) {
1543                         talloc_free(list2);
1544                         return LDB_ERR_OPERATIONS_ERROR;
1545                 }
1546
1547                 if (list->count == 0) {
1548                         list->dn = NULL;
1549                         return LDB_ERR_NO_SUCH_OBJECT;
1550                 }
1551
1552                 if (list->count < 2) {
1553                         /* it isn't worth loading the next part of the tree */
1554                         return LDB_SUCCESS;
1555                 }
1556         }
1557
1558         if (!found) {
1559                 /* none of the attributes were indexed */
1560                 return LDB_ERR_OPERATIONS_ERROR;
1561         }
1562
1563         return LDB_SUCCESS;
1564 }
1565
1566 /*
1567   return a list of matching objects using a one-level index
1568  */
1569 static int ltdb_index_dn_attr(struct ldb_module *module,
1570                               struct ltdb_private *ltdb,
1571                               const char *attr,
1572                               struct ldb_dn *dn,
1573                               struct dn_list *list,
1574                               enum key_truncation *truncation)
1575 {
1576         struct ldb_context *ldb;
1577         struct ldb_dn *key;
1578         struct ldb_val val;
1579         int ret;
1580
1581         ldb = ldb_module_get_ctx(module);
1582
1583         /* work out the index key from the parent DN */
1584         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1585         val.length = strlen((char *)val.data);
1586         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
1587         if (!key) {
1588                 ldb_oom(ldb);
1589                 return LDB_ERR_OPERATIONS_ERROR;
1590         }
1591
1592         ret = ltdb_dn_list_load(module, ltdb, key, list);
1593         talloc_free(key);
1594         if (ret != LDB_SUCCESS) {
1595                 return ret;
1596         }
1597
1598         if (list->count == 0) {
1599                 return LDB_ERR_NO_SUCH_OBJECT;
1600         }
1601
1602         return LDB_SUCCESS;
1603 }
1604
1605 /*
1606   return a list of matching objects using a one-level index
1607  */
1608 static int ltdb_index_dn_one(struct ldb_module *module,
1609                              struct ltdb_private *ltdb,
1610                              struct ldb_dn *parent_dn,
1611                              struct dn_list *list,
1612                              enum key_truncation *truncation)
1613 {
1614         /* Ensure we do not shortcut on intersection for this list */
1615         list->strict = true;
1616         return ltdb_index_dn_attr(module, ltdb,
1617                                   LTDB_IDXONE, parent_dn, list, truncation);
1618
1619 }
1620
1621 /*
1622   return a list of matching objects using the DN index
1623  */
1624 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1625                                  struct ltdb_private *ltdb,
1626                                  struct ldb_dn *base_dn,
1627                                  struct dn_list *dn_list,
1628                                  enum key_truncation *truncation)
1629 {
1630         const struct ldb_val *guid_val = NULL;
1631         if (ltdb->cache->GUID_index_attribute == NULL) {
1632                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1633                 if (dn_list->dn == NULL) {
1634                         return ldb_module_oom(module);
1635                 }
1636                 dn_list->dn[0].data = discard_const_p(unsigned char,
1637                                                       ldb_dn_get_linearized(base_dn));
1638                 if (dn_list->dn[0].data == NULL) {
1639                         return ldb_module_oom(module);
1640                 }
1641                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1642                 dn_list->count = 1;
1643
1644                 return LDB_SUCCESS;
1645         }
1646
1647         if (ltdb->cache->GUID_index_dn_component != NULL) {
1648                 guid_val = ldb_dn_get_extended_component(base_dn,
1649                                                          ltdb->cache->GUID_index_dn_component);
1650         }
1651
1652         if (guid_val != NULL) {
1653                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1654                 if (dn_list->dn == NULL) {
1655                         return ldb_module_oom(module);
1656                 }
1657                 dn_list->dn[0].data = guid_val->data;
1658                 dn_list->dn[0].length = guid_val->length;
1659                 dn_list->count = 1;
1660
1661                 return LDB_SUCCESS;
1662         }
1663
1664         return ltdb_index_dn_attr(module, ltdb,
1665                                   LTDB_IDXDN, base_dn, dn_list, truncation);
1666 }
1667
1668 /*
1669   return a list of dn's that might match a indexed search or
1670   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1671  */
1672 static int ltdb_index_dn(struct ldb_module *module,
1673                          struct ltdb_private *ltdb,
1674                          const struct ldb_parse_tree *tree,
1675                          struct dn_list *list)
1676 {
1677         int ret = LDB_ERR_OPERATIONS_ERROR;
1678
1679         switch (tree->operation) {
1680         case LDB_OP_AND:
1681                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1682                 break;
1683
1684         case LDB_OP_OR:
1685                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1686                 break;
1687
1688         case LDB_OP_NOT:
1689                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1690                 break;
1691
1692         case LDB_OP_EQUALITY:
1693                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1694                 break;
1695
1696         case LDB_OP_SUBSTRING:
1697         case LDB_OP_GREATER:
1698         case LDB_OP_LESS:
1699         case LDB_OP_PRESENT:
1700         case LDB_OP_APPROX:
1701         case LDB_OP_EXTENDED:
1702                 /* we can't index with fancy bitops yet */
1703                 ret = LDB_ERR_OPERATIONS_ERROR;
1704                 break;
1705         }
1706
1707         return ret;
1708 }
1709
1710 /*
1711   filter a candidate dn_list from an indexed search into a set of results
1712   extracting just the given attributes
1713 */
1714 static int ltdb_index_filter(struct ltdb_private *ltdb,
1715                              const struct dn_list *dn_list,
1716                              struct ltdb_context *ac,
1717                              uint32_t *match_count,
1718                              enum key_truncation scope_one_truncation)
1719 {
1720         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1721         struct ldb_message *msg;
1722         struct ldb_message *filtered_msg;
1723         unsigned int i;
1724         unsigned int num_keys = 0;
1725         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1726         TDB_DATA *keys = NULL;
1727
1728         /*
1729          * We have to allocate the key list (rather than just walk the
1730          * caller supplied list) as the callback could change the list
1731          * (by modifying an indexed attribute hosted in the in-memory
1732          * index cache!)
1733          */
1734         keys = talloc_array(ac, TDB_DATA, dn_list->count);
1735         if (keys == NULL) {
1736                 return ldb_module_oom(ac->module);
1737         }
1738
1739         if (ltdb->cache->GUID_index_attribute != NULL) {
1740                 /*
1741                  * We speculate that the keys will be GUID based and so
1742                  * pre-fill in enough space for a GUID (avoiding a pile of
1743                  * small allocations)
1744                  */
1745                 struct guid_tdb_key {
1746                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1747                 } *key_values = NULL;
1748
1749                 key_values = talloc_array(keys,
1750                                           struct guid_tdb_key,
1751                                           dn_list->count);
1752
1753                 for (i = 0; i < dn_list->count; i++) {
1754                         keys[i].dptr = key_values[i].guid_key;
1755                         keys[i].dsize = sizeof(key_values[i].guid_key);
1756                 }
1757                 if (key_values == NULL) {
1758                         return ldb_module_oom(ac->module);
1759                 }
1760         } else {
1761                 for (i = 0; i < dn_list->count; i++) {
1762                         keys[i].dptr = NULL;
1763                         keys[i].dsize = 0;
1764                 }
1765         }
1766
1767         for (i = 0; i < dn_list->count; i++) {
1768                 int ret;
1769
1770                 ret = ltdb_idx_to_key(ac->module,
1771                                       ltdb,
1772                                       keys,
1773                                       &dn_list->dn[i],
1774                                       &keys[num_keys]);
1775                 if (ret != LDB_SUCCESS) {
1776                         return ret;
1777                 }
1778
1779                 if (ltdb->cache->GUID_index_attribute != NULL) {
1780                         /*
1781                          * If we are in GUID index mode, then the dn_list is
1782                          * sorted.  If we got a duplicate, forget about it, as
1783                          * otherwise we would send the same entry back more
1784                          * than once.
1785                          *
1786                          * This is needed in the truncated DN case, or if a
1787                          * duplicate was forced in via
1788                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1789                          */
1790
1791                         if (memcmp(previous_guid_key,
1792                                    keys[num_keys].dptr,
1793                                    sizeof(previous_guid_key)) == 0) {
1794                                 continue;
1795                         }
1796
1797                         memcpy(previous_guid_key,
1798                                keys[num_keys].dptr,
1799                                sizeof(previous_guid_key));
1800                 }
1801                 num_keys++;
1802         }
1803
1804
1805         /*
1806          * Now that the list is a safe copy, send the callbacks
1807          */
1808         for (i = 0; i < num_keys; i++) {
1809                 int ret;
1810                 bool matched;
1811                 msg = ldb_msg_new(ac);
1812                 if (!msg) {
1813                         return LDB_ERR_OPERATIONS_ERROR;
1814                 }
1815
1816                 ret = ltdb_search_key(ac->module, ltdb,
1817                                       keys[i], msg,
1818                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1819                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1820                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1821                         /* the record has disappeared? yes, this can happen */
1822                         talloc_free(msg);
1823                         continue;
1824                 }
1825
1826                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1827                         /* an internal error */
1828                         talloc_free(msg);
1829                         return LDB_ERR_OPERATIONS_ERROR;
1830                 }
1831
1832                 /*
1833                  * We trust the index for LDB_SCOPE_ONELEVEL
1834                  * unless the index key has been truncated.
1835                  *
1836                  * LDB_SCOPE_BASE is not passed in by our only caller.
1837                  */
1838                 if (ac->scope == LDB_SCOPE_ONELEVEL
1839                     && ltdb->cache->one_level_indexes
1840                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1841                         ret = ldb_match_message(ldb, msg, ac->tree,
1842                                                 ac->scope, &matched);
1843                 } else {
1844                         ret = ldb_match_msg_error(ldb, msg,
1845                                                   ac->tree, ac->base,
1846                                                   ac->scope, &matched);
1847                 }
1848
1849                 if (ret != LDB_SUCCESS) {
1850                         talloc_free(msg);
1851                         return ret;
1852                 }
1853                 if (!matched) {
1854                         talloc_free(msg);
1855                         continue;
1856                 }
1857
1858                 /* filter the attributes that the user wants */
1859                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1860
1861                 talloc_free(msg);
1862
1863                 if (ret == -1) {
1864                         return LDB_ERR_OPERATIONS_ERROR;
1865                 }
1866
1867                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1868                 if (ret != LDB_SUCCESS) {
1869                         /* Regardless of success or failure, the msg
1870                          * is the callbacks responsiblity, and should
1871                          * not be talloc_free()'ed */
1872                         ac->request_terminated = true;
1873                         return ret;
1874                 }
1875
1876                 (*match_count)++;
1877         }
1878
1879         TALLOC_FREE(keys);
1880         return LDB_SUCCESS;
1881 }
1882
1883 /*
1884   sort a DN list
1885  */
1886 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1887                               struct dn_list *list)
1888 {
1889         if (list->count < 2) {
1890                 return;
1891         }
1892
1893         /* We know the list is sorted when using the GUID index */
1894         if (ltdb->cache->GUID_index_attribute != NULL) {
1895                 return;
1896         }
1897
1898         TYPESAFE_QSORT(list->dn, list->count,
1899                        ldb_val_equal_exact_for_qsort);
1900 }
1901
1902 /*
1903   search the database with a LDAP-like expression using indexes
1904   returns -1 if an indexed search is not possible, in which
1905   case the caller should call ltdb_search_full()
1906 */
1907 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1908 {
1909         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1910         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1911         struct dn_list *dn_list;
1912         int ret;
1913         enum ldb_scope index_scope;
1914         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1915
1916         /* see if indexing is enabled */
1917         if (!ltdb->cache->attribute_indexes &&
1918             !ltdb->cache->one_level_indexes &&
1919             ac->scope != LDB_SCOPE_BASE) {
1920                 /* fallback to a full search */
1921                 return LDB_ERR_OPERATIONS_ERROR;
1922         }
1923
1924         dn_list = talloc_zero(ac, struct dn_list);
1925         if (dn_list == NULL) {
1926                 return ldb_module_oom(ac->module);
1927         }
1928
1929         /*
1930          * For the purposes of selecting the switch arm below, if we
1931          * don't have a one-level index then treat it like a subtree
1932          * search
1933          */
1934         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1935             !ltdb->cache->one_level_indexes) {
1936                 index_scope = LDB_SCOPE_SUBTREE;
1937         } else {
1938                 index_scope = ac->scope;
1939         }
1940
1941         switch (index_scope) {
1942         case LDB_SCOPE_BASE:
1943                 /*
1944                  * The only caller will have filtered the operation out
1945                  * so we should never get here
1946                  */
1947                 return ldb_operr(ldb);
1948
1949         case LDB_SCOPE_ONELEVEL:
1950                 /*
1951                  * If we ever start to also load the index values for
1952                  * the tree, we must ensure we strictly intersect with
1953                  * this list, as we trust the ONELEVEL index
1954                  */
1955                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
1956                                         &scope_one_truncation);
1957                 if (ret != LDB_SUCCESS) {
1958                         talloc_free(dn_list);
1959                         return ret;
1960                 }
1961
1962                 /*
1963                  * If we have too many matches, running the filter
1964                  * tree over the SCOPE_ONELEVEL can be quite expensive
1965                  * so we now check the filter tree index as well.
1966                  *
1967                  * We only do this in the GUID index mode, which is
1968                  * O(n*log(m)) otherwise the intersection below will
1969                  * be too costly at O(n*m).
1970                  *
1971                  * We don't set a heuristic for 'too many' but instead
1972                  * do it always and rely on the index lookup being
1973                  * fast enough in the small case.
1974                  */
1975                 if (ltdb->cache->GUID_index_attribute != NULL) {
1976                         struct dn_list *idx_one_tree_list
1977                                 = talloc_zero(ac, struct dn_list);
1978                         if (idx_one_tree_list == NULL) {
1979                                 return ldb_module_oom(ac->module);
1980                         }
1981
1982                         if (!ltdb->cache->attribute_indexes) {
1983                                 talloc_free(idx_one_tree_list);
1984                                 talloc_free(dn_list);
1985                                 return LDB_ERR_OPERATIONS_ERROR;
1986                         }
1987                         /*
1988                          * Here we load the index for the tree.
1989                          */
1990                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
1991                                             idx_one_tree_list);
1992                         if (ret != LDB_SUCCESS) {
1993                                 talloc_free(idx_one_tree_list);
1994                                 talloc_free(dn_list);
1995                                 return ret;
1996                         }
1997
1998                         if (!list_intersect(ldb, ltdb,
1999                                             dn_list, idx_one_tree_list)) {
2000                                 talloc_free(idx_one_tree_list);
2001                                 talloc_free(dn_list);
2002                                 return LDB_ERR_OPERATIONS_ERROR;
2003                         }
2004                 }
2005                 break;
2006
2007         case LDB_SCOPE_SUBTREE:
2008         case LDB_SCOPE_DEFAULT:
2009                 if (!ltdb->cache->attribute_indexes) {
2010                         talloc_free(dn_list);
2011                         return LDB_ERR_OPERATIONS_ERROR;
2012                 }
2013                 /*
2014                  * Here we load the index for the tree.  We have no
2015                  * index for the subtree.
2016                  */
2017                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
2018                 if (ret != LDB_SUCCESS) {
2019                         talloc_free(dn_list);
2020                         return ret;
2021                 }
2022                 break;
2023         }
2024
2025         /*
2026          * It is critical that this function do the re-filter even
2027          * on things found by the index as the index can over-match
2028          * in cases of truncation (as well as when it decides it is
2029          * not worth further filtering)
2030          *
2031          * If this changes, then the index code above would need to
2032          * pass up a flag to say if any index was truncated during
2033          * processing as the truncation here refers only to the
2034          * SCOPE_ONELEVEL index.
2035          */
2036         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
2037                                 scope_one_truncation);
2038         talloc_free(dn_list);
2039         return ret;
2040 }
2041
2042 /**
2043  * @brief Add a DN in the index list of a given attribute name/value pair
2044  *
2045  * This function will add the DN in the index list for the index for
2046  * the given attribute name and value.
2047  *
2048  * @param[in]  module       A ldb_module structure
2049  *
2050  * @param[in]  dn           The string representation of the DN as it
2051  *                          will be stored in the index entry
2052  *
2053  * @param[in]  el           A ldb_message_element array, one of the entry
2054  *                          referred by the v_idx is the attribute name and
2055  *                          value pair which will be used to construct the
2056  *                          index name
2057  *
2058  * @param[in]  v_idx        The index of element in the el array to use
2059  *
2060  * @return                  An ldb error code
2061  */
2062 static int ltdb_index_add1(struct ldb_module *module,
2063                            struct ltdb_private *ltdb,
2064                            const struct ldb_message *msg,
2065                            struct ldb_message_element *el, int v_idx)
2066 {
2067         struct ldb_context *ldb;
2068         struct ldb_dn *dn_key;
2069         int ret;
2070         const struct ldb_schema_attribute *a;
2071         struct dn_list *list;
2072         unsigned alloc_len;
2073         enum key_truncation truncation = KEY_TRUNCATED;
2074
2075
2076         ldb = ldb_module_get_ctx(module);
2077
2078         list = talloc_zero(module, struct dn_list);
2079         if (list == NULL) {
2080                 return LDB_ERR_OPERATIONS_ERROR;
2081         }
2082
2083         dn_key = ltdb_index_key(ldb, ltdb,
2084                                 el->name, &el->values[v_idx], &a, &truncation);
2085         if (!dn_key) {
2086                 talloc_free(list);
2087                 return LDB_ERR_OPERATIONS_ERROR;
2088         }
2089         /*
2090          * Samba only maintains unique indexes on the objectSID and objectGUID
2091          * so if a unique index key exceeds the maximum length there is a
2092          * problem.
2093          */
2094         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2095                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2096                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2097
2098                 ldb_asprintf_errstring(
2099                         ldb,
2100                         __location__ ": unique index key on %s in %s, "
2101                         "exceeds maximum key length of %u (encoded).",
2102                         el->name,
2103                         ldb_dn_get_linearized(msg->dn),
2104                         ltdb->max_key_length);
2105                 talloc_free(list);
2106                 return LDB_ERR_CONSTRAINT_VIOLATION;
2107         }
2108         talloc_steal(list, dn_key);
2109
2110         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2111         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2112                 talloc_free(list);
2113                 return ret;
2114         }
2115
2116         /*
2117          * Check for duplicates in the @IDXDN DN -> GUID record
2118          *
2119          * This is very normal, it just means a duplicate DN creation
2120          * was attempted, so don't set the error string or print scary
2121          * messages.
2122          */
2123         if (list->count > 0 &&
2124             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2125             truncation == KEY_NOT_TRUNCATED) {
2126
2127                 talloc_free(list);
2128                 return LDB_ERR_CONSTRAINT_VIOLATION;
2129
2130         } else if (list->count > 0
2131                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2132
2133                 /*
2134                  * At least one existing entry in the DN->GUID index, which
2135                  * arises when the DN indexes have been truncated
2136                  *
2137                  * So need to pull the DN's to check if it's really a duplicate
2138                  */
2139                 int i;
2140                 for (i=0; i < list->count; i++) {
2141                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2142                         TDB_DATA key = {
2143                                 .dptr = guid_key,
2144                                 .dsize = sizeof(guid_key)
2145                         };
2146                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2147                         struct ldb_message *rec = ldb_msg_new(ldb);
2148                         if (rec == NULL) {
2149                                 return LDB_ERR_OPERATIONS_ERROR;
2150                         }
2151
2152                         ret = ltdb_idx_to_key(module, ltdb,
2153                                               ldb, &list->dn[i],
2154                                               &key);
2155                         if (ret != LDB_SUCCESS) {
2156                                 TALLOC_FREE(list);
2157                                 TALLOC_FREE(rec);
2158                                 return ret;
2159                         }
2160
2161                         ret = ltdb_search_key(module, ltdb, key,
2162                                               rec, flags);
2163                         if (key.dptr != guid_key) {
2164                                 TALLOC_FREE(key.dptr);
2165                         }
2166                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2167                                 /*
2168                                  * the record has disappeared?
2169                                  * yes, this can happen
2170                                  */
2171                                 talloc_free(rec);
2172                                 continue;
2173                         }
2174
2175                         if (ret != LDB_SUCCESS) {
2176                                 /* an internal error */
2177                                 TALLOC_FREE(rec);
2178                                 TALLOC_FREE(list);
2179                                 return LDB_ERR_OPERATIONS_ERROR;
2180                         }
2181                         /*
2182                          * The DN we are trying to add to the DB and index
2183                          * is already here, so we must deny the addition
2184                          */
2185                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2186                                 TALLOC_FREE(rec);
2187                                 TALLOC_FREE(list);
2188                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2189                         }
2190                 }
2191         }
2192
2193         /*
2194          * Check for duplicates in unique indexes
2195          *
2196          * We don't need to do a loop test like the @IDXDN case
2197          * above as we have a ban on long unique index values
2198          * at the start of this function.
2199          */
2200         if (list->count > 0 &&
2201             ((a != NULL
2202               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2203                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2204                 /*
2205                  * We do not want to print info about a possibly
2206                  * confidential DN that the conflict was with in the
2207                  * user-visible error string
2208                  */
2209
2210                 if (ltdb->cache->GUID_index_attribute == NULL) {
2211                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2212                                   __location__
2213                                   ": unique index violation on %s in %s, "
2214                                   "conficts with %*.*s in %s",
2215                                   el->name, ldb_dn_get_linearized(msg->dn),
2216                                   (int)list->dn[0].length,
2217                                   (int)list->dn[0].length,
2218                                   list->dn[0].data,
2219                                   ldb_dn_get_linearized(dn_key));
2220                 } else {
2221                         /* This can't fail, gives a default at worst */
2222                         const struct ldb_schema_attribute *attr
2223                                 = ldb_schema_attribute_by_name(
2224                                         ldb,
2225                                         ltdb->cache->GUID_index_attribute);
2226                         struct ldb_val v;
2227                         ret = attr->syntax->ldif_write_fn(ldb, list,
2228                                                           &list->dn[0], &v);
2229                         if (ret == LDB_SUCCESS) {
2230                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2231                                           __location__
2232                                           ": unique index violation on %s in "
2233                                           "%s, conficts with %s %*.*s in %s",
2234                                           el->name,
2235                                           ldb_dn_get_linearized(msg->dn),
2236                                           ltdb->cache->GUID_index_attribute,
2237                                           (int)v.length,
2238                                           (int)v.length,
2239                                           v.data,
2240                                           ldb_dn_get_linearized(dn_key));
2241                         }
2242                 }
2243                 ldb_asprintf_errstring(ldb,
2244                                        __location__ ": unique index violation "
2245                                        "on %s in %s",
2246                                        el->name,
2247                                        ldb_dn_get_linearized(msg->dn));
2248                 talloc_free(list);
2249                 return LDB_ERR_CONSTRAINT_VIOLATION;
2250         }
2251
2252         /* overallocate the list a bit, to reduce the number of
2253          * realloc trigered copies */
2254         alloc_len = ((list->count+1)+7) & ~7;
2255         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2256         if (list->dn == NULL) {
2257                 talloc_free(list);
2258                 return LDB_ERR_OPERATIONS_ERROR;
2259         }
2260
2261         if (ltdb->cache->GUID_index_attribute == NULL) {
2262                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2263                 list->dn[list->count].data
2264                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2265                 if (list->dn[list->count].data == NULL) {
2266                         talloc_free(list);
2267                         return LDB_ERR_OPERATIONS_ERROR;
2268                 }
2269                 list->dn[list->count].length = strlen(dn_str);
2270         } else {
2271                 const struct ldb_val *key_val;
2272                 struct ldb_val *exact = NULL, *next = NULL;
2273                 key_val = ldb_msg_find_ldb_val(msg,
2274                                                ltdb->cache->GUID_index_attribute);
2275                 if (key_val == NULL) {
2276                         talloc_free(list);
2277                         return ldb_module_operr(module);
2278                 }
2279
2280                 if (key_val->length != LTDB_GUID_SIZE) {
2281                         talloc_free(list);
2282                         return ldb_module_operr(module);
2283                 }
2284
2285                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2286                                         *key_val, ldb_val_equal_exact_ordered,
2287                                         exact, next);
2288
2289                 /*
2290                  * Give a warning rather than fail, this could be a
2291                  * duplicate value in the record allowed by a caller
2292                  * forcing in the value with
2293                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2294                  */
2295                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2296                         /* This can't fail, gives a default at worst */
2297                         const struct ldb_schema_attribute *attr
2298                                 = ldb_schema_attribute_by_name(
2299                                         ldb,
2300                                         ltdb->cache->GUID_index_attribute);
2301                         struct ldb_val v;
2302                         ret = attr->syntax->ldif_write_fn(ldb, list,
2303                                                           exact, &v);
2304                         if (ret == LDB_SUCCESS) {
2305                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2306                                           __location__
2307                                           ": duplicate attribute value in %s "
2308                                           "for index on %s, "
2309                                           "duplicate of %s %*.*s in %s",
2310                                           ldb_dn_get_linearized(msg->dn),
2311                                           el->name,
2312                                           ltdb->cache->GUID_index_attribute,
2313                                           (int)v.length,
2314                                           (int)v.length,
2315                                           v.data,
2316                                           ldb_dn_get_linearized(dn_key));
2317                         }
2318                 }
2319
2320                 if (next == NULL) {
2321                         next = &list->dn[list->count];
2322                 } else {
2323                         memmove(&next[1], next,
2324                                 sizeof(*next) * (list->count - (next - list->dn)));
2325                 }
2326                 *next = ldb_val_dup(list->dn, key_val);
2327                 if (next->data == NULL) {
2328                         talloc_free(list);
2329                         return ldb_module_operr(module);
2330                 }
2331         }
2332         list->count++;
2333
2334         ret = ltdb_dn_list_store(module, dn_key, list);
2335
2336         talloc_free(list);
2337
2338         return ret;
2339 }
2340
2341 /*
2342   add index entries for one elements in a message
2343  */
2344 static int ltdb_index_add_el(struct ldb_module *module,
2345                              struct ltdb_private *ltdb,
2346                              const struct ldb_message *msg,
2347                              struct ldb_message_element *el)
2348 {
2349         unsigned int i;
2350         for (i = 0; i < el->num_values; i++) {
2351                 int ret = ltdb_index_add1(module, ltdb,
2352                                           msg, el, i);
2353                 if (ret != LDB_SUCCESS) {
2354                         return ret;
2355                 }
2356         }
2357
2358         return LDB_SUCCESS;
2359 }
2360
2361 /*
2362   add index entries for all elements in a message
2363  */
2364 static int ltdb_index_add_all(struct ldb_module *module,
2365                               struct ltdb_private *ltdb,
2366                               const struct ldb_message *msg)
2367 {
2368         struct ldb_message_element *elements = msg->elements;
2369         unsigned int i;
2370         const char *dn_str;
2371         int ret;
2372
2373         if (ldb_dn_is_special(msg->dn)) {
2374                 return LDB_SUCCESS;
2375         }
2376
2377         dn_str = ldb_dn_get_linearized(msg->dn);
2378         if (dn_str == NULL) {
2379                 return LDB_ERR_OPERATIONS_ERROR;
2380         }
2381
2382         ret = ltdb_write_index_dn_guid(module, msg, 1);
2383         if (ret != LDB_SUCCESS) {
2384                 return ret;
2385         }
2386
2387         if (!ltdb->cache->attribute_indexes) {
2388                 /* no indexed fields */
2389                 return LDB_SUCCESS;
2390         }
2391
2392         for (i = 0; i < msg->num_elements; i++) {
2393                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
2394                         continue;
2395                 }
2396                 ret = ltdb_index_add_el(module, ltdb,
2397                                         msg, &elements[i]);
2398                 if (ret != LDB_SUCCESS) {
2399                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2400                         ldb_asprintf_errstring(ldb,
2401                                                __location__ ": Failed to re-index %s in %s - %s",
2402                                                elements[i].name, dn_str,
2403                                                ldb_errstring(ldb));
2404                         return ret;
2405                 }
2406         }
2407
2408         return LDB_SUCCESS;
2409 }
2410
2411
2412 /*
2413   insert a DN index for a message
2414 */
2415 static int ltdb_modify_index_dn(struct ldb_module *module,
2416                                 struct ltdb_private *ltdb,
2417                                 const struct ldb_message *msg,
2418                                 struct ldb_dn *dn,
2419                                 const char *index, int add)
2420 {
2421         struct ldb_message_element el;
2422         struct ldb_val val;
2423         int ret;
2424
2425         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2426         if (val.data == NULL) {
2427                 const char *dn_str = ldb_dn_get_linearized(dn);
2428                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2429                                        __location__
2430                                        ": Failed to modify %s "
2431                                        "against %s in %s: failed "
2432                                        "to get casefold DN",
2433                                        index,
2434                                        ltdb->cache->GUID_index_attribute,
2435                                        dn_str);
2436                 return LDB_ERR_OPERATIONS_ERROR;
2437         }
2438
2439         val.length = strlen((char *)val.data);
2440         el.name = index;
2441         el.values = &val;
2442         el.num_values = 1;
2443
2444         if (add) {
2445                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2446         } else { /* delete */
2447                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2448         }
2449
2450         if (ret != LDB_SUCCESS) {
2451                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2452                 const char *dn_str = ldb_dn_get_linearized(dn);
2453                 ldb_asprintf_errstring(ldb,
2454                                        __location__
2455                                        ": Failed to modify %s "
2456                                        "against %s in %s - %s",
2457                                        index,
2458                                        ltdb->cache->GUID_index_attribute,
2459                                        dn_str, ldb_errstring(ldb));
2460                 return ret;
2461         }
2462         return ret;
2463 }
2464
2465 /*
2466   insert a one level index for a message
2467 */
2468 static int ltdb_index_onelevel(struct ldb_module *module,
2469                                const struct ldb_message *msg, int add)
2470 {
2471         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2472                                                     struct ltdb_private);
2473         struct ldb_dn *pdn;
2474         int ret;
2475
2476         /* We index for ONE Level only if requested */
2477         if (!ltdb->cache->one_level_indexes) {
2478                 return LDB_SUCCESS;
2479         }
2480
2481         pdn = ldb_dn_get_parent(module, msg->dn);
2482         if (pdn == NULL) {
2483                 return LDB_ERR_OPERATIONS_ERROR;
2484         }
2485         ret = ltdb_modify_index_dn(module, ltdb,
2486                                    msg, pdn, LTDB_IDXONE, add);
2487
2488         talloc_free(pdn);
2489
2490         return ret;
2491 }
2492
2493 /*
2494   insert a one level index for a message
2495 */
2496 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2497                                     const struct ldb_message *msg,
2498                                     int add)
2499 {
2500         int ret;
2501         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2502                                                     struct ltdb_private);
2503
2504         /* We index for DN only if using a GUID index */
2505         if (ltdb->cache->GUID_index_attribute == NULL) {
2506                 return LDB_SUCCESS;
2507         }
2508
2509         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2510                                    LTDB_IDXDN, add);
2511
2512         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2513                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2514                                        "Entry %s already exists",
2515                                        ldb_dn_get_linearized(msg->dn));
2516                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2517         }
2518         return ret;
2519 }
2520
2521 /*
2522   add the index entries for a new element in a record
2523   The caller guarantees that these element values are not yet indexed
2524 */
2525 int ltdb_index_add_element(struct ldb_module *module,
2526                            struct ltdb_private *ltdb,
2527                            const struct ldb_message *msg,
2528                            struct ldb_message_element *el)
2529 {
2530         if (ldb_dn_is_special(msg->dn)) {
2531                 return LDB_SUCCESS;
2532         }
2533         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2534                 return LDB_SUCCESS;
2535         }
2536         return ltdb_index_add_el(module, ltdb, msg, el);
2537 }
2538
2539 /*
2540   add the index entries for a new record
2541 */
2542 int ltdb_index_add_new(struct ldb_module *module,
2543                        struct ltdb_private *ltdb,
2544                        const struct ldb_message *msg)
2545 {
2546         int ret;
2547
2548         if (ldb_dn_is_special(msg->dn)) {
2549                 return LDB_SUCCESS;
2550         }
2551
2552         ret = ltdb_index_add_all(module, ltdb, msg);
2553         if (ret != LDB_SUCCESS) {
2554                 /*
2555                  * Because we can't trust the caller to be doing
2556                  * transactions properly, clean up any index for this
2557                  * entry rather than relying on a transaction
2558                  * cleanup
2559                  */
2560
2561                 ltdb_index_delete(module, msg);
2562                 return ret;
2563         }
2564
2565         ret = ltdb_index_onelevel(module, msg, 1);
2566         if (ret != LDB_SUCCESS) {
2567                 /*
2568                  * Because we can't trust the caller to be doing
2569                  * transactions properly, clean up any index for this
2570                  * entry rather than relying on a transaction
2571                  * cleanup
2572                  */
2573                 ltdb_index_delete(module, msg);
2574                 return ret;
2575         }
2576         return ret;
2577 }
2578
2579
2580 /*
2581   delete an index entry for one message element
2582 */
2583 int ltdb_index_del_value(struct ldb_module *module,
2584                          struct ltdb_private *ltdb,
2585                          const struct ldb_message *msg,
2586                          struct ldb_message_element *el, unsigned int v_idx)
2587 {
2588         struct ldb_context *ldb;
2589         struct ldb_dn *dn_key;
2590         const char *dn_str;
2591         int ret, i;
2592         unsigned int j;
2593         struct dn_list *list;
2594         struct ldb_dn *dn = msg->dn;
2595         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2596
2597         ldb = ldb_module_get_ctx(module);
2598
2599         dn_str = ldb_dn_get_linearized(dn);
2600         if (dn_str == NULL) {
2601                 return LDB_ERR_OPERATIONS_ERROR;
2602         }
2603
2604         if (dn_str[0] == '@') {
2605                 return LDB_SUCCESS;
2606         }
2607
2608         dn_key = ltdb_index_key(ldb, ltdb,
2609                                 el->name, &el->values[v_idx],
2610                                 NULL, &truncation);
2611         /*
2612          * We ignore key truncation in ltdb_index_add1() so
2613          * match that by ignoring it here as well
2614          *
2615          * Multiple values are legitimate and accepted
2616          */
2617         if (!dn_key) {
2618                 return LDB_ERR_OPERATIONS_ERROR;
2619         }
2620
2621         list = talloc_zero(dn_key, struct dn_list);
2622         if (list == NULL) {
2623                 talloc_free(dn_key);
2624                 return LDB_ERR_OPERATIONS_ERROR;
2625         }
2626
2627         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2628         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2629                 /* it wasn't indexed. Did we have an earlier error? If we did then
2630                    its gone now */
2631                 talloc_free(dn_key);
2632                 return LDB_SUCCESS;
2633         }
2634
2635         if (ret != LDB_SUCCESS) {
2636                 talloc_free(dn_key);
2637                 return ret;
2638         }
2639
2640         /*
2641          * Find one of the values matching this message to remove
2642          */
2643         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2644         if (i == -1) {
2645                 /* nothing to delete */
2646                 talloc_free(dn_key);
2647                 return LDB_SUCCESS;
2648         }
2649
2650         j = (unsigned int) i;
2651         if (j != list->count - 1) {
2652                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2653         }
2654         list->count--;
2655         if (list->count == 0) {
2656                 talloc_free(list->dn);
2657                 list->dn = NULL;
2658         } else {
2659                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2660         }
2661
2662         ret = ltdb_dn_list_store(module, dn_key, list);
2663
2664         talloc_free(dn_key);
2665
2666         return ret;
2667 }
2668
2669 /*
2670   delete the index entries for a element
2671   return -1 on failure
2672 */
2673 int ltdb_index_del_element(struct ldb_module *module,
2674                            struct ltdb_private *ltdb,
2675                            const struct ldb_message *msg,
2676                            struct ldb_message_element *el)
2677 {
2678         const char *dn_str;
2679         int ret;
2680         unsigned int i;
2681
2682         if (!ltdb->cache->attribute_indexes) {
2683                 /* no indexed fields */
2684                 return LDB_SUCCESS;
2685         }
2686
2687         dn_str = ldb_dn_get_linearized(msg->dn);
2688         if (dn_str == NULL) {
2689                 return LDB_ERR_OPERATIONS_ERROR;
2690         }
2691
2692         if (dn_str[0] == '@') {
2693                 return LDB_SUCCESS;
2694         }
2695
2696         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2697                 return LDB_SUCCESS;
2698         }
2699         for (i = 0; i < el->num_values; i++) {
2700                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2701                 if (ret != LDB_SUCCESS) {
2702                         return ret;
2703                 }
2704         }
2705
2706         return LDB_SUCCESS;
2707 }
2708
2709 /*
2710   delete the index entries for a record
2711   return -1 on failure
2712 */
2713 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2714 {
2715         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2716         int ret;
2717         unsigned int i;
2718
2719         if (ldb_dn_is_special(msg->dn)) {
2720                 return LDB_SUCCESS;
2721         }
2722
2723         ret = ltdb_index_onelevel(module, msg, 0);
2724         if (ret != LDB_SUCCESS) {
2725                 return ret;
2726         }
2727
2728         ret = ltdb_write_index_dn_guid(module, msg, 0);
2729         if (ret != LDB_SUCCESS) {
2730                 return ret;
2731         }
2732
2733         if (!ltdb->cache->attribute_indexes) {
2734                 /* no indexed fields */
2735                 return LDB_SUCCESS;
2736         }
2737
2738         for (i = 0; i < msg->num_elements; i++) {
2739                 ret = ltdb_index_del_element(module, ltdb,
2740                                              msg, &msg->elements[i]);
2741                 if (ret != LDB_SUCCESS) {
2742                         return ret;
2743                 }
2744         }
2745
2746         return LDB_SUCCESS;
2747 }
2748
2749
2750 /*
2751   traversal function that deletes all @INDEX records in the in-memory
2752   TDB.
2753
2754   This does not touch the actual DB, that is done at transaction
2755   commit, which in turn greatly reduces DB churn as we will likely
2756   be able to do a direct update into the old record.
2757 */
2758 static int delete_index(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, void *state)
2759 {
2760         struct ldb_module *module = state;
2761         const char *dnstr = "DN=" LTDB_INDEX ":";
2762         struct dn_list list;
2763         struct ldb_dn *dn;
2764         struct ldb_val v;
2765         int ret;
2766
2767         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2768                 return 0;
2769         }
2770         /* we need to put a empty list in the internal tdb for this
2771          * index entry */
2772         list.dn = NULL;
2773         list.count = 0;
2774
2775         /* the offset of 3 is to remove the DN= prefix. */
2776         v.data = key.data + 3;
2777         v.length = strnlen((char *)key.data, key.length) - 3;
2778
2779         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2780
2781         /*
2782          * This does not actually touch the DB quite yet, just
2783          * the in-memory index cache
2784          */
2785         ret = ltdb_dn_list_store(module, dn, &list);
2786         if (ret != LDB_SUCCESS) {
2787                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2788                                        "Unable to store null index for %s\n",
2789                                                 ldb_dn_get_linearized(dn));
2790                 talloc_free(dn);
2791                 return -1;
2792         }
2793         talloc_free(dn);
2794         return 0;
2795 }
2796
2797 /*
2798   traversal function that adds @INDEX records during a re index TODO wrong comment
2799 */
2800 static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2801 {
2802         struct ldb_context *ldb;
2803         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2804         struct ldb_module *module = ctx->module;
2805         struct ldb_message *msg;
2806         unsigned int nb_elements_in_db;
2807         int ret;
2808         TDB_DATA key2;
2809         bool is_record;
2810         TDB_DATA key = {
2811                 .dptr = ldb_key.data,
2812                 .dsize = ldb_key.length
2813         };
2814         
2815         ldb = ldb_module_get_ctx(module);
2816
2817         if (key.dsize > 4 &&
2818             memcmp(key.dptr, "DN=@", 4) == 0) {
2819                 return 0;
2820         }
2821
2822         is_record = ltdb_key_is_record(key);
2823         if (is_record == false) {
2824                 return 0;
2825         }
2826         
2827         msg = ldb_msg_new(module);
2828         if (msg == NULL) {
2829                 return -1;
2830         }
2831
2832         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2833                                                    msg,
2834                                                    NULL, 0,
2835                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2836                                                    &nb_elements_in_db);
2837         if (ret != 0) {
2838                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2839                                                 ldb_dn_get_linearized(msg->dn));
2840                 ctx->error = ret;
2841                 talloc_free(msg);
2842                 return -1;
2843         }
2844
2845         if (msg->dn == NULL) {
2846                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2847                           "Refusing to re-index as GUID "
2848                           "key %*.*s with no DN\n",
2849                           (int)key.dsize, (int)key.dsize,
2850                           (char *)key.dptr);
2851                 talloc_free(msg);
2852                 return -1;
2853         }
2854         
2855         /* check if the DN key has changed, perhaps due to the case
2856            insensitivity of an element changing, or a change from DN
2857            to GUID keys */
2858         key2 = ltdb_key_msg(module, msg, msg);
2859         if (key2.dptr == NULL) {
2860                 /* probably a corrupt record ... darn */
2861                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2862                                                 ldb_dn_get_linearized(msg->dn));
2863                 talloc_free(msg);
2864                 return 0;
2865         }
2866         if (key.dsize != key2.dsize ||
2867             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2868                 struct ldb_val ldb_key2 = {
2869                         .data = key2.dptr,
2870                         .length = key2.dsize
2871                 };
2872                 ltdb->kv_ops->update_in_iterate(ltdb, ldb_key, ldb_key2, val, ctx);
2873         }
2874         talloc_free(key2.dptr);
2875
2876         talloc_free(msg);
2877
2878         ctx->count++;
2879         if (ctx->count % 10000 == 0) {
2880                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2881                           "Reindexing: re-keyed %u records so far",
2882                           ctx->count);
2883         }
2884
2885         return 0;
2886 }
2887
2888 /*
2889   traversal function that adds @INDEX records during a re index
2890 */
2891 static int re_index(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2892 {
2893         struct ldb_context *ldb;
2894         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2895         struct ldb_module *module = ctx->module;
2896         struct ldb_message *msg;
2897         unsigned int nb_elements_in_db;
2898         TDB_DATA key = {
2899                 .dptr = ldb_key.data,
2900                 .dsize = ldb_key.length
2901         };
2902         int ret;
2903         bool is_record;
2904         
2905         ldb = ldb_module_get_ctx(module);
2906
2907         if (key.dsize > 4 &&
2908             memcmp(key.dptr, "DN=@", 4) == 0) {
2909                 return 0;
2910         }
2911
2912         is_record = ltdb_key_is_record(key);
2913         if (is_record == false) {
2914                 return 0;
2915         }
2916         
2917         msg = ldb_msg_new(module);
2918         if (msg == NULL) {
2919                 return -1;
2920         }
2921
2922         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2923                                                    msg,
2924                                                    NULL, 0,
2925                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2926                                                    &nb_elements_in_db);
2927         if (ret != 0) {
2928                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2929                                                 ldb_dn_get_linearized(msg->dn));
2930                 ctx->error = ret;
2931                 talloc_free(msg);
2932                 return -1;
2933         }
2934
2935         if (msg->dn == NULL) {
2936                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2937                           "Refusing to re-index as GUID "
2938                           "key %*.*s with no DN\n",
2939                           (int)key.dsize, (int)key.dsize,
2940                           (char *)key.dptr);
2941                 talloc_free(msg);
2942                 return -1;
2943         }
2944
2945         ret = ltdb_index_onelevel(module, msg, 1);
2946         if (ret != LDB_SUCCESS) {
2947                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2948                           "Adding special ONE LEVEL index failed (%s)!",
2949                                                 ldb_dn_get_linearized(msg->dn));
2950                 talloc_free(msg);
2951                 return -1;
2952         }
2953
2954         ret = ltdb_index_add_all(module, ltdb, msg);
2955
2956         if (ret != LDB_SUCCESS) {
2957                 ctx->error = ret;
2958                 talloc_free(msg);
2959                 return -1;
2960         }
2961
2962         talloc_free(msg);
2963
2964         ctx->count++;
2965         if (ctx->count % 10000 == 0) {
2966                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2967                           "Reindexing: re-indexed %u records so far",
2968                           ctx->count);
2969         }
2970
2971         return 0;
2972 }
2973
2974 /*
2975   force a complete reindex of the database
2976 */
2977 int ltdb_reindex(struct ldb_module *module)
2978 {
2979         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2980         int ret;
2981         struct ltdb_reindex_context ctx;
2982
2983         /*
2984          * Only triggered after a modification, but make clear we do
2985          * not re-index a read-only DB
2986          */
2987         if (ltdb->read_only) {
2988                 return LDB_ERR_UNWILLING_TO_PERFORM;
2989         }
2990
2991         if (ltdb_cache_reload(module) != 0) {
2992                 return LDB_ERR_OPERATIONS_ERROR;
2993         }
2994
2995         /*
2996          * Ensure we read (and so remove) the entries from the real
2997          * DB, no values stored so far are any use as we want to do a
2998          * re-index
2999          */
3000         ltdb_index_transaction_cancel(module);
3001
3002         ret = ltdb_index_transaction_start(module);
3003         if (ret != LDB_SUCCESS) {
3004                 return ret;
3005         }
3006
3007         /* first traverse the database deleting any @INDEX records by
3008          * putting NULL entries in the in-memory tdb
3009          */
3010         ret = ltdb->kv_ops->iterate(ltdb, delete_index, module);
3011         if (ret < 0) {
3012                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3013                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3014                                        ldb_errstring(ldb));
3015                 return LDB_ERR_OPERATIONS_ERROR;
3016         }
3017
3018         ctx.module = module;
3019         ctx.error = 0;
3020         ctx.count = 0;
3021
3022         ret = ltdb->kv_ops->iterate(ltdb, re_key, &ctx);
3023         if (ret < 0) {
3024                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3025                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3026                                        ldb_errstring(ldb));
3027                 return LDB_ERR_OPERATIONS_ERROR;
3028         }
3029
3030         if (ctx.error != LDB_SUCCESS) {
3031                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3032                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3033                 return ctx.error;
3034         }
3035
3036         ctx.error = 0;
3037         ctx.count = 0;
3038
3039         /* now traverse adding any indexes for normal LDB records */
3040         ret = ltdb->kv_ops->iterate(ltdb, re_index, &ctx);
3041         if (ret < 0) {
3042                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3043                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3044                                        ldb_errstring(ldb));
3045                 return LDB_ERR_OPERATIONS_ERROR;
3046         }
3047
3048         if (ctx.error != LDB_SUCCESS) {
3049                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3050                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3051                 return ctx.error;
3052         }
3053
3054         if (ctx.count > 10000) {
3055                 ldb_debug(ldb_module_get_ctx(module),
3056                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3057                           "final index write-out will be in transaction commit",
3058                           ltdb->kv_ops->name(ltdb));
3059         }
3060         return LDB_SUCCESS;
3061 }