Revert "s4:ldb Fix ldb_list_find() folowing the change from char * to TDB_DATA"
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "dlinklist.h"
36
37 /*
38   the idxptr code is a bit unusual. The way it works is to replace
39   @IDX elements in records during a transaction with @IDXPTR
40   elements. The @IDXPTR elements don't contain the actual index entry
41   values, but contain a pointer to a linked list of values.
42
43   This means we are storing pointers in a database, which is normally
44   not allowed, but in this case we are storing them only for the
45   duration of a transaction, and re-writing them into the normal @IDX
46   format at the end of the transaction. That means no other processes
47   are ever exposed to the @IDXPTR values.
48
49   The advantage is that the linked list doesn't cause huge
50   fragmentation during a transaction. Without the @IDXPTR method we
51   often ended up with a ldb that was between 10x and 100x larger then
52   it needs to be due to massive fragmentation caused by re-writing
53   @INDEX records many times during indexing.
54  */
55 struct ldb_index_pointer {
56         struct ldb_index_pointer *next, *prev;
57         struct ldb_val value;
58 };
59
60 struct ltdb_idxptr {
61         int num_dns;
62         const char **dn_list;
63         bool repack;
64 };
65
66 /*
67   add to the list of DNs that need to be fixed on transaction end
68  */
69 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
70 {
71         void *data = ldb_module_get_private(module);
72         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
73         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
74                                                const char *, ltdb->idxptr->num_dns+1);
75         if (ltdb->idxptr->dn_list == NULL) {
76                 ltdb->idxptr->num_dns = 0;
77                 return LDB_ERR_OPERATIONS_ERROR;
78         }
79         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
80                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
81         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
82                 return LDB_ERR_OPERATIONS_ERROR;
83         }
84         ltdb->idxptr->num_dns++;
85         return LDB_SUCCESS;
86 }
87
88 /* free an idxptr record */
89 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
90 {
91         struct ldb_val val;
92         struct ldb_index_pointer *ptr;
93
94         if (el->num_values != 1) {
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97
98         val = el->values[0];
99         if (val.length != sizeof(void *)) {
100                 return LDB_ERR_OPERATIONS_ERROR;
101         }
102
103         ptr = *(struct ldb_index_pointer **)val.data;
104         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
105                 return LDB_ERR_OPERATIONS_ERROR;
106         }
107
108         while (ptr) {
109                 struct ldb_index_pointer *tmp = ptr;
110                 DLIST_REMOVE(ptr, ptr);
111                 talloc_free(tmp);
112         }
113
114         return LDB_SUCCESS;
115 }
116
117
118 /* convert from the IDXPTR format to a ldb_message_element format */
119 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
120 {
121         struct ldb_val val;
122         struct ldb_index_pointer *ptr, *tmp;
123         int i;
124         struct ldb_val *val2;
125
126         if (el->num_values != 1) {
127                 return LDB_ERR_OPERATIONS_ERROR;
128         }
129
130         val = el->values[0];
131         if (val.length != sizeof(void *)) {
132                 return LDB_ERR_OPERATIONS_ERROR;
133         }
134
135         ptr = *(struct ldb_index_pointer **)val.data;
136         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         /* count the length of the list */
141         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
142                 i++;
143         }
144
145         /* allocate the new values array */
146         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
147         if (val2 == NULL) {
148                 return LDB_ERR_OPERATIONS_ERROR;
149         }
150         el->values = val2;
151         el->num_values = i;
152
153         /* populate the values array */
154         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
155                 el->values[i].length = tmp->value.length;
156                 /* we need to over-allocate here as there are still some places
157                    in ldb that rely on null termination. */
158                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
159                 if (el->values[i].data == NULL) {
160                         return LDB_ERR_OPERATIONS_ERROR;
161                 }
162                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
163                 el->values[i].data[tmp->value.length] = 0;
164         }
165
166         /* update the name */
167         el->name = LTDB_IDX;
168
169         return LDB_SUCCESS;
170 }
171
172
173 /* convert to the IDXPTR format from a ldb_message_element format */
174 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
175 {
176         struct ldb_index_pointer *ptr, *tmp;
177         int i;
178         struct ldb_val *val2;
179         void *data = ldb_module_get_private(module);
180         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
181
182         ptr = NULL;
183
184         for (i=0;i<el->num_values;i++) {
185                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
186                 if (tmp == NULL) {
187                         return LDB_ERR_OPERATIONS_ERROR;
188                 }
189                 tmp->value = el->values[i];
190                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
191                 if (tmp->value.data == NULL) {
192                         return LDB_ERR_OPERATIONS_ERROR;
193                 }
194                 DLIST_ADD(ptr, tmp);
195         }
196
197         /* allocate the new values array */
198         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
199         if (val2 == NULL) {
200                 return LDB_ERR_OPERATIONS_ERROR;
201         }
202         el->values = val2;
203         el->num_values = 1;
204
205         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
206         el->values[0].length = sizeof(ptr);
207
208         /* update the name */
209         el->name = LTDB_IDXPTR;
210
211         return LDB_SUCCESS;
212 }
213
214
215 /* enable the idxptr mode when transactions start */
216 int ltdb_index_transaction_start(struct ldb_module *module)
217 {
218         void *data = ldb_module_get_private(module);
219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
220         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
221         return LDB_SUCCESS;
222 }
223
224 /*
225   a wrapper around ltdb_search_dn1() which translates pointer based index records
226   and maps them into normal ldb message structures
227  */
228 static int ltdb_search_dn1_index(struct ldb_module *module,
229                                 struct ldb_dn *dn, struct ldb_message *msg)
230 {
231         int ret, i;
232         ret = ltdb_search_dn1(module, dn, msg);
233         if (ret != LDB_SUCCESS) {
234                 return ret;
235         }
236
237         /* if this isn't a @INDEX record then don't munge it */
238         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         for (i=0;i<msg->num_elements;i++) {
243                 struct ldb_message_element *el = &msg->elements[i];
244                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
245                         ret = ltdb_convert_from_idxptr(module, el);
246                         if (ret != LDB_SUCCESS) {
247                                 return ret;
248                         }
249                 }
250         }
251
252         return ret;
253 }
254
255
256
257 /*
258   fixup the idxptr for one DN
259  */
260 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
261 {
262         struct ldb_context *ldb;
263         struct ldb_dn *dn;
264         struct ldb_message *msg = ldb_msg_new(module);
265         int ret;
266
267         ldb = ldb_module_get_ctx(module);
268
269         dn = ldb_dn_new(msg, ldb, strdn);
270         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
271                 ret = ltdb_store(module, msg, TDB_REPLACE);
272         }
273         talloc_free(msg);
274         return ret;
275 }
276
277 /* cleanup the idxptr mode when transaction commits */
278 int ltdb_index_transaction_commit(struct ldb_module *module)
279 {
280         int i;
281         void *data = ldb_module_get_private(module);
282         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
283
284         /* fix all the DNs that we have modified */
285         if (ltdb->idxptr) {
286                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
287                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
288                 }
289
290                 if (ltdb->idxptr->repack) {
291                         tdb_repack(ltdb->tdb);
292                 }
293         }
294
295         talloc_free(ltdb->idxptr);
296         ltdb->idxptr = NULL;
297         return LDB_SUCCESS;
298 }
299
300 /* cleanup the idxptr mode when transaction cancels */
301 int ltdb_index_transaction_cancel(struct ldb_module *module)
302 {
303         void *data = ldb_module_get_private(module);
304         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
305         talloc_free(ltdb->idxptr);
306         ltdb->idxptr = NULL;
307         return LDB_SUCCESS;
308 }
309
310
311
312 /* a wrapper around ltdb_store() for the index code which
313    stores in IDXPTR format when idxptr mode is enabled
314
315    WARNING: This modifies the msg which is passed in
316 */
317 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         int ret;
322
323         if (ltdb->idxptr) {
324                 int i;
325                 struct ldb_message *msg2 = ldb_msg_new(module);
326
327                 /* free any old pointer */
328                 ret = ltdb_search_dn1(module, msg->dn, msg2);
329                 if (ret == 0) {
330                         for (i=0;i<msg2->num_elements;i++) {
331                                 struct ldb_message_element *el = &msg2->elements[i];
332                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
333                                         ret = ltdb_free_idxptr(module, el);
334                                         if (ret != LDB_SUCCESS) {
335                                                 return ret;
336                                         }
337                                 }
338                         }
339                 }
340                 talloc_free(msg2);
341
342                 for (i=0;i<msg->num_elements;i++) {
343                         struct ldb_message_element *el = &msg->elements[i];
344                         if (strcmp(el->name, LTDB_IDX) == 0) {
345                                 ret = ltdb_convert_to_idxptr(module, el);
346                                 if (ret != LDB_SUCCESS) {
347                                         return ret;
348                                 }
349                         }
350                 }
351
352                 if (ltdb_idxptr_add(module, msg) != 0) {
353                         return LDB_ERR_OPERATIONS_ERROR;
354                 }
355         }
356
357         ret = ltdb_store(module, msg, flgs);
358         return ret;
359 }
360
361
362 /*
363   find an element in a list, using the given comparison function and
364   assuming that the list is already sorted using comp_fn
365
366   return -1 if not found, or the index of the first occurance of needle if found
367 */
368 static int ldb_list_find(const void *needle,
369                          const void *base, size_t nmemb, size_t size,
370                          comparison_fn_t comp_fn)
371 {
372         const char *base_p = (const char *)base;
373         size_t min_i, max_i, test_i;
374
375         if (nmemb == 0) {
376                 return -1;
377         }
378
379         min_i = 0;
380         max_i = nmemb-1;
381
382         while (min_i < max_i) {
383                 int r;
384
385                 test_i = (min_i + max_i) / 2;
386                 /* the following cast looks strange, but is
387                  correct. The key to understanding it is that base_p
388                  is a pointer to an array of pointers, so we have to
389                  dereference it after casting to void **. The strange
390                  const in the middle gives us the right type of pointer
391                  after the dereference  (tridge) */
392                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
393                 if (r == 0) {
394                         /* scan back for first element */
395                         while (test_i > 0 &&
396                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
397                                 test_i--;
398                         }
399                         return test_i;
400                 }
401                 if (r < 0) {
402                         if (test_i == 0) {
403                                 return -1;
404                         }
405                         max_i = test_i - 1;
406                 }
407                 if (r > 0) {
408                         min_i = test_i + 1;
409                 }
410         }
411
412         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
413                 return min_i;
414         }
415
416         return -1;
417 }
418
419 struct dn_list {
420         unsigned int count;
421         char **dn;
422 };
423
424 /*
425   return the dn key to be used for an index
426   caller frees
427 */
428 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
429                                      const char *attr, const struct ldb_val *value,
430                                      const struct ldb_schema_attribute **ap)
431 {
432         struct ldb_dn *ret;
433         struct ldb_val v;
434         const struct ldb_schema_attribute *a;
435         char *attr_folded;
436         int r;
437
438         attr_folded = ldb_attr_casefold(ldb, attr);
439         if (!attr_folded) {
440                 return NULL;
441         }
442
443         a = ldb_schema_attribute_by_name(ldb, attr);
444         if (ap) {
445                 *ap = a;
446         }
447         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
448         if (r != LDB_SUCCESS) {
449                 const char *errstr = ldb_errstring(ldb);
450                 /* canonicalisation can be refused. For example, 
451                    a attribute that takes wildcards will refuse to canonicalise
452                    if the value contains a wildcard */
453                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
454                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
455                 talloc_free(attr_folded);
456                 return NULL;
457         }
458         if (ldb_should_b64_encode(ldb, &v)) {
459                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
460                 if (!vstr) return NULL;
461                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
462                 talloc_free(vstr);
463         } else {
464                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
465         }
466
467         if (v.data != value->data) {
468                 talloc_free(v.data);
469         }
470         talloc_free(attr_folded);
471
472         return ret;
473 }
474
475 /*
476   see if a attribute value is in the list of indexed attributes
477 */
478 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
479                             unsigned int *v_idx, const char *key)
480 {
481         unsigned int i, j;
482         for (i=0;i<msg->num_elements;i++) {
483                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
484                         const struct ldb_message_element *el = &msg->elements[i];
485
486                         if (attr == NULL) {
487                                 /* in this case we are just looking to see if key is present,
488                                    we are not spearching for a specific index */
489                                 return 0;
490                         }
491
492                         for (j=0;j<el->num_values;j++) {
493                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
494                                         if (v_idx) {
495                                                 *v_idx = j;
496                                         }
497                                         return i;
498                                 }
499                         }
500                 }
501         }
502         return -1;
503 }
504
505 /* used in sorting dn lists */
506 static int list_cmp(const char **s1, const char **s2)
507 {
508         return strcmp(*s1, *s2);
509 }
510
511 /*
512   return a list of dn's that might match a simple indexed search or
513  */
514 static int ltdb_index_dn_simple(struct ldb_module *module,
515                                 const struct ldb_parse_tree *tree,
516                                 const struct ldb_message *index_list,
517                                 struct dn_list *list)
518 {
519         struct ldb_context *ldb;
520         struct ldb_dn *dn;
521         int ret;
522         unsigned int i, j;
523         struct ldb_message *msg;
524
525         ldb = ldb_module_get_ctx(module);
526
527         list->count = 0;
528         list->dn = NULL;
529
530         /* if the attribute isn't in the list of indexed attributes then
531            this node needs a full search */
532         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
533                 return LDB_ERR_OPERATIONS_ERROR;
534         }
535
536         /* the attribute is indexed. Pull the list of DNs that match the 
537            search criterion */
538         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
539         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
540
541         msg = talloc(list, struct ldb_message);
542         if (msg == NULL) {
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         ret = ltdb_search_dn1_index(module, dn, msg);
547         talloc_free(dn);
548         if (ret != LDB_SUCCESS) {
549                 return ret;
550         }
551
552         for (i=0;i<msg->num_elements;i++) {
553                 struct ldb_message_element *el;
554
555                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
556                         continue;
557                 }
558
559                 el = &msg->elements[i];
560
561                 list->dn = talloc_array(list, char *, el->num_values);
562                 if (!list->dn) {
563                         talloc_free(msg);
564                         return LDB_ERR_OPERATIONS_ERROR;
565                 }
566
567                 for (j=0;j<el->num_values;j++) {
568                         list->dn[list->count] =
569                                 talloc_strdup(list->dn, (char *)el->values[j].data);
570                         if (!list->dn[list->count]) {
571                                 talloc_free(msg);
572                                 return LDB_ERR_OPERATIONS_ERROR;
573                         }
574                         list->count++;
575                 }
576         }
577
578         talloc_free(msg);
579
580         if (list->count > 1) {
581                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
582         }
583
584         return LDB_SUCCESS;
585 }
586
587
588 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
589
590 /*
591   return a list of dn's that might match a leaf indexed search
592  */
593 static int ltdb_index_dn_leaf(struct ldb_module *module,
594                               const struct ldb_parse_tree *tree,
595                               const struct ldb_message *index_list,
596                               struct dn_list *list)
597 {
598         struct ldb_context *ldb;
599         ldb = ldb_module_get_ctx(module);
600
601         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
602                 list->dn = talloc_array(list, char *, 1);
603                 if (list->dn == NULL) {
604                         ldb_oom(ldb);
605                         return LDB_ERR_OPERATIONS_ERROR;
606                 }
607                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
608                 if (list->dn[0] == NULL) {
609                         ldb_oom(ldb);
610                         return LDB_ERR_OPERATIONS_ERROR;
611                 }
612                 list->count = 1;
613                 return LDB_SUCCESS;
614         }
615         return ltdb_index_dn_simple(module, tree, index_list, list);
616 }
617
618
619 /*
620   list intersection
621   list = list & list2
622   relies on the lists being sorted
623 */
624 static int list_intersect(struct ldb_context *ldb,
625                           struct dn_list *list, const struct dn_list *list2)
626 {
627         struct dn_list *list3;
628         unsigned int i;
629
630         if (list->count == 0 || list2->count == 0) {
631                 /* 0 & X == 0 */
632                 return LDB_ERR_NO_SUCH_OBJECT;
633         }
634
635         list3 = talloc(ldb, struct dn_list);
636         if (list3 == NULL) {
637                 return LDB_ERR_OPERATIONS_ERROR;
638         }
639
640         list3->dn = talloc_array(list3, char *, list->count);
641         if (!list3->dn) {
642                 talloc_free(list3);
643                 return LDB_ERR_OPERATIONS_ERROR;
644         }
645         list3->count = 0;
646
647         for (i=0;i<list->count;i++) {
648                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
649                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
650                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
651                         list3->count++;
652                 } else {
653                         talloc_free(list->dn[i]);
654                 }
655         }
656
657         talloc_free(list->dn);
658         list->dn = talloc_move(list, &list3->dn);
659         list->count = list3->count;
660         talloc_free(list3);
661
662         return LDB_ERR_NO_SUCH_OBJECT;
663 }
664
665
666 /*
667   list union
668   list = list | list2
669   relies on the lists being sorted
670 */
671 static int list_union(struct ldb_context *ldb,
672                       struct dn_list *list, const struct dn_list *list2)
673 {
674         unsigned int i;
675         char **d;
676         unsigned int count = list->count;
677
678         if (list->count == 0 && list2->count == 0) {
679                 /* 0 | 0 == 0 */
680                 return LDB_ERR_NO_SUCH_OBJECT;
681         }
682
683         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
684         if (!d) {
685                 return LDB_ERR_OPERATIONS_ERROR;
686         }
687         list->dn = d;
688
689         for (i=0;i<list2->count;i++) {
690                 if (ldb_list_find(list2->dn[i], list->dn, count,
691                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
692                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
693                         if (!list->dn[list->count]) {
694                                 return LDB_ERR_OPERATIONS_ERROR;
695                         }
696                         list->count++;
697                 }
698         }
699
700         if (list->count != count) {
701                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
702         }
703
704         return LDB_ERR_NO_SUCH_OBJECT;
705 }
706
707 static int ltdb_index_dn(struct ldb_module *module,
708                          const struct ldb_parse_tree *tree,
709                          const struct ldb_message *index_list,
710                          struct dn_list *list);
711
712
713 /*
714   OR two index results
715  */
716 static int ltdb_index_dn_or(struct ldb_module *module,
717                             const struct ldb_parse_tree *tree,
718                             const struct ldb_message *index_list,
719                             struct dn_list *list)
720 {
721         struct ldb_context *ldb;
722         unsigned int i;
723         int ret;
724
725         ldb = ldb_module_get_ctx(module);
726
727         ret = LDB_ERR_OPERATIONS_ERROR;
728         list->dn = NULL;
729         list->count = 0;
730
731         for (i=0;i<tree->u.list.num_elements;i++) {
732                 struct dn_list *list2;
733                 int v;
734
735                 list2 = talloc(module, struct dn_list);
736                 if (list2 == NULL) {
737                         return LDB_ERR_OPERATIONS_ERROR;
738                 }
739
740                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
741
742                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
743                         /* 0 || X == X */
744                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
745                                 ret = v;
746                         }
747                         talloc_free(list2);
748                         continue;
749                 }
750
751                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
752                         /* 1 || X == 1 */
753                         talloc_free(list->dn);
754                         talloc_free(list2);
755                         return v;
756                 }
757
758                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
759                         ret = LDB_SUCCESS;
760                         list->dn = talloc_move(list, &list2->dn);
761                         list->count = list2->count;
762                 } else {
763                         if (list_union(ldb, list, list2) == -1) {
764                                 talloc_free(list2);
765                                 return LDB_ERR_OPERATIONS_ERROR;
766                         }
767                         ret = LDB_SUCCESS;
768                 }
769                 talloc_free(list2);
770         }
771
772         if (list->count == 0) {
773                 return LDB_ERR_NO_SUCH_OBJECT;
774         }
775
776         return ret;
777 }
778
779
780 /*
781   NOT an index results
782  */
783 static int ltdb_index_dn_not(struct ldb_module *module,
784                              const struct ldb_parse_tree *tree,
785                              const struct ldb_message *index_list,
786                              struct dn_list *list)
787 {
788         /* the only way to do an indexed not would be if we could
789            negate the not via another not or if we knew the total
790            number of database elements so we could know that the
791            existing expression covered the whole database.
792
793            instead, we just give up, and rely on a full index scan
794            (unless an outer & manages to reduce the list)
795         */
796         return LDB_ERR_OPERATIONS_ERROR;
797 }
798
799
800 static bool ltdb_index_unique(struct ldb_context *ldb,
801                               const char *attr)
802 {
803         const struct ldb_schema_attribute *a;
804         a = ldb_schema_attribute_by_name(ldb, attr);
805         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
806                 return true;
807         }
808         return false;
809 }
810
811 /*
812   AND two index results
813  */
814 static int ltdb_index_dn_and(struct ldb_module *module,
815                              const struct ldb_parse_tree *tree,
816                              const struct ldb_message *index_list,
817                              struct dn_list *list)
818 {
819         struct ldb_context *ldb;
820         unsigned int i;
821         int ret, pass;
822
823         ldb = ldb_module_get_ctx(module);
824
825         ret = LDB_ERR_OPERATIONS_ERROR;
826         list->dn = NULL;
827         list->count = 0;
828
829         for (pass=0;pass<=1;pass++) {
830                 /* in the first pass we only look for unique simple
831                    equality tests, in the hope of avoiding having to look
832                    at any others */
833                 bool only_unique = pass==0?true:false;
834
835                 for (i=0;i<tree->u.list.num_elements;i++) {
836                         struct dn_list *list2;
837                         int v;
838                         bool is_unique = false;
839                         const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
840
841                         if (subtree->operation == LDB_OP_EQUALITY &&
842                             ltdb_index_unique(ldb, subtree->u.equality.attr)) {
843                                 is_unique = true;
844                         }
845                         if (is_unique != only_unique) continue;
846                         
847                         list2 = talloc(module, struct dn_list);
848                         if (list2 == NULL) {
849                                 return LDB_ERR_OPERATIONS_ERROR;
850                         }
851                         
852                         v = ltdb_index_dn(module, subtree, index_list, list2);
853
854                         if (v == LDB_ERR_NO_SUCH_OBJECT) {
855                                 /* 0 && X == 0 */
856                                 talloc_free(list->dn);
857                                 talloc_free(list2);
858                                 return LDB_ERR_NO_SUCH_OBJECT;
859                         }
860                         
861                         if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
862                                 talloc_free(list2);
863                                 continue;
864                         }
865                         
866                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
867                                 ret = LDB_SUCCESS;
868                                 talloc_free(list->dn);
869                                 list->dn = talloc_move(list, &list2->dn);
870                                 list->count = list2->count;
871                         } else {
872                                 if (list_intersect(ldb, list, list2) == -1) {
873                                         talloc_free(list2);
874                                         return LDB_ERR_OPERATIONS_ERROR;
875                                 }
876                         }
877                         
878                         talloc_free(list2);
879                         
880                         if (list->count == 0) {
881                                 talloc_free(list->dn);
882                                 return LDB_ERR_NO_SUCH_OBJECT;
883                         }
884                         
885                         if (list->count == 1) {
886                                 /* it isn't worth loading the next part of the tree */
887                                 return ret;
888                         }
889                 }
890         }       
891         return ret;
892 }
893         
894 /*
895   AND index results and ONE level special index
896  */
897 static int ltdb_index_dn_one(struct ldb_module *module,
898                              struct ldb_dn *parent_dn,
899                              struct dn_list *list)
900 {
901         struct ldb_context *ldb;
902         struct dn_list *list2;
903         struct ldb_message *msg;
904         struct ldb_dn *key;
905         struct ldb_val val;
906         unsigned int i, j;
907         int ret;
908
909         ldb = ldb_module_get_ctx(module);
910
911         list2 = talloc_zero(module, struct dn_list);
912         if (list2 == NULL) {
913                 return LDB_ERR_OPERATIONS_ERROR;
914         }
915
916         /* the attribute is indexed. Pull the list of DNs that match the
917            search criterion */
918         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
919         val.length = strlen((char *)val.data);
920         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
921         if (!key) {
922                 talloc_free(list2);
923                 return LDB_ERR_OPERATIONS_ERROR;
924         }
925
926         msg = talloc(list2, struct ldb_message);
927         if (msg == NULL) {
928                 talloc_free(list2);
929                 return LDB_ERR_OPERATIONS_ERROR;
930         }
931
932         ret = ltdb_search_dn1_index(module, key, msg);
933         talloc_free(key);
934         if (ret != LDB_SUCCESS) {
935                 return ret;
936         }
937
938         for (i = 0; i < msg->num_elements; i++) {
939                 struct ldb_message_element *el;
940
941                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
942                         continue;
943                 }
944
945                 el = &msg->elements[i];
946
947                 list2->dn = talloc_array(list2, char *, el->num_values);
948                 if (!list2->dn) {
949                         talloc_free(list2);
950                         return LDB_ERR_OPERATIONS_ERROR;
951                 }
952
953                 for (j = 0; j < el->num_values; j++) {
954                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
955                         if (!list2->dn[list2->count]) {
956                                 talloc_free(list2);
957                                 return LDB_ERR_OPERATIONS_ERROR;
958                         }
959                         list2->count++;
960                 }
961         }
962
963         if (list2->count == 0) {
964                 talloc_free(list2);
965                 return LDB_ERR_NO_SUCH_OBJECT;
966         }
967
968         if (list2->count > 1) {
969                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
970         }
971
972         if (list->count > 0) {
973                 if (list_intersect(ldb, list, list2) == -1) {
974                         talloc_free(list2);
975                         return LDB_ERR_OPERATIONS_ERROR;
976                 }
977
978                 if (list->count == 0) {
979                         talloc_free(list->dn);
980                         talloc_free(list2);
981                         return LDB_ERR_NO_SUCH_OBJECT;
982                 }
983         } else {
984                 list->dn = talloc_move(list, &list2->dn);
985                 list->count = list2->count;
986         }
987
988         talloc_free(list2);
989
990         return LDB_SUCCESS;
991 }
992
993 /*
994   return a list of dn's that might match a indexed search or
995   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
996  */
997 static int ltdb_index_dn(struct ldb_module *module,
998                          const struct ldb_parse_tree *tree,
999                          const struct ldb_message *index_list,
1000                          struct dn_list *list)
1001 {
1002         int ret = LDB_ERR_OPERATIONS_ERROR;
1003
1004         switch (tree->operation) {
1005         case LDB_OP_AND:
1006                 ret = ltdb_index_dn_and(module, tree, index_list, list);
1007                 break;
1008
1009         case LDB_OP_OR:
1010                 ret = ltdb_index_dn_or(module, tree, index_list, list);
1011                 break;
1012
1013         case LDB_OP_NOT:
1014                 ret = ltdb_index_dn_not(module, tree, index_list, list);
1015                 break;
1016
1017         case LDB_OP_EQUALITY:
1018                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
1019                 break;
1020
1021         case LDB_OP_SUBSTRING:
1022         case LDB_OP_GREATER:
1023         case LDB_OP_LESS:
1024         case LDB_OP_PRESENT:
1025         case LDB_OP_APPROX:
1026         case LDB_OP_EXTENDED:
1027                 /* we can't index with fancy bitops yet */
1028                 ret = LDB_ERR_OPERATIONS_ERROR;
1029                 break;
1030         }
1031
1032         return ret;
1033 }
1034
1035 /*
1036   filter a candidate dn_list from an indexed search into a set of results
1037   extracting just the given attributes
1038 */
1039 static int ltdb_index_filter(const struct dn_list *dn_list,
1040                              struct ltdb_context *ac, 
1041                              uint32_t *match_count)
1042 {
1043         struct ldb_context *ldb;
1044         struct ldb_message *msg;
1045         unsigned int i;
1046
1047         ldb = ldb_module_get_ctx(ac->module);
1048
1049         for (i = 0; i < dn_list->count; i++) {
1050                 struct ldb_dn *dn;
1051                 int ret;
1052
1053                 msg = ldb_msg_new(ac);
1054                 if (!msg) {
1055                         return LDB_ERR_OPERATIONS_ERROR;
1056                 }
1057
1058                 dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
1059                 if (dn == NULL) {
1060                         talloc_free(msg);
1061                         return LDB_ERR_OPERATIONS_ERROR;
1062                 }
1063
1064                 ret = ltdb_search_dn1(ac->module, dn, msg);
1065                 talloc_free(dn);
1066                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1067                         /* the record has disappeared? yes, this can happen */
1068                         talloc_free(msg);
1069                         continue;
1070                 }
1071
1072                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1073                         /* an internal error */
1074                         talloc_free(msg);
1075                         return LDB_ERR_OPERATIONS_ERROR;
1076                 }
1077
1078                 if (!ldb_match_msg(ldb, msg,
1079                                    ac->tree, ac->base, ac->scope)) {
1080                         talloc_free(msg);
1081                         continue;
1082                 }
1083
1084                 /* filter the attributes that the user wants */
1085                 ret = ltdb_filter_attrs(msg, ac->attrs);
1086
1087                 if (ret == -1) {
1088                         talloc_free(msg);
1089                         return LDB_ERR_OPERATIONS_ERROR;
1090                 }
1091
1092                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1093                 if (ret != LDB_SUCCESS) {
1094                         ac->request_terminated = true;
1095                         return ret;
1096                 }
1097
1098                 (*match_count)++;
1099         }
1100
1101         return LDB_SUCCESS;
1102 }
1103
1104 /*
1105   search the database with a LDAP-like expression using indexes
1106   returns -1 if an indexed search is not possible, in which
1107   case the caller should call ltdb_search_full()
1108 */
1109 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1110 {
1111         struct ldb_context *ldb;
1112         void *data = ldb_module_get_private(ac->module);
1113         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1114         struct dn_list *dn_list;
1115         int ret, idxattr, idxone;
1116
1117         ldb = ldb_module_get_ctx(ac->module);
1118
1119         idxattr = idxone = 0;
1120         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1121         if (ret == 0 ) {
1122                 idxattr = 1;
1123         }
1124
1125         /* We do one level indexing only if requested */
1126         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1127         if (ret == 0 ) {
1128                 idxone = 1;
1129         }
1130
1131         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1132             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1133                 /* no indexes? must do full search */
1134                 return LDB_ERR_OPERATIONS_ERROR;
1135         }
1136
1137         ret = LDB_ERR_OPERATIONS_ERROR;
1138
1139         dn_list = talloc_zero(ac, struct dn_list);
1140         if (dn_list == NULL) {
1141                 return LDB_ERR_OPERATIONS_ERROR;
1142         }
1143
1144         if (ac->scope == LDB_SCOPE_BASE) {
1145                 /* with BASE searches only one DN can match */
1146                 dn_list->dn = talloc_array(dn_list, char *, 1);
1147                 if (dn_list->dn == NULL) {
1148                         ldb_oom(ldb);
1149                         return LDB_ERR_OPERATIONS_ERROR;
1150                 }
1151                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1152                 if (dn_list->dn[0] == NULL) {
1153                         ldb_oom(ldb);
1154                         return LDB_ERR_OPERATIONS_ERROR;
1155                 }
1156                 dn_list->count = 1;
1157                 ret = LDB_SUCCESS;
1158         }
1159
1160         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1161                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1162         }
1163
1164         if (ret == LDB_ERR_OPERATIONS_ERROR &&
1165             ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1166                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1167         }
1168
1169         if (ret == LDB_SUCCESS) {
1170                 /* we've got a candidate list - now filter by the full tree
1171                    and extract the needed attributes */
1172                 ret = ltdb_index_filter(dn_list, ac, match_count);
1173         }
1174
1175         talloc_free(dn_list);
1176
1177         return ret;
1178 }
1179
1180 /*
1181   add a index element where this is the first indexed DN for this value
1182 */
1183 static int ltdb_index_add1_new(struct ldb_context *ldb,
1184                                struct ldb_message *msg,
1185                                const char *dn)
1186 {
1187         struct ldb_message_element *el;
1188
1189         /* add another entry */
1190         el = talloc_realloc(msg, msg->elements,
1191                                struct ldb_message_element, msg->num_elements+1);
1192         if (!el) {
1193                 return LDB_ERR_OPERATIONS_ERROR;
1194         }
1195
1196         msg->elements = el;
1197         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1198         if (!msg->elements[msg->num_elements].name) {
1199                 return LDB_ERR_OPERATIONS_ERROR;
1200         }
1201         msg->elements[msg->num_elements].num_values = 0;
1202         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1203         if (!msg->elements[msg->num_elements].values) {
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1207         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1208         msg->elements[msg->num_elements].num_values = 1;
1209         msg->num_elements++;
1210
1211         return LDB_SUCCESS;
1212 }
1213
1214
1215 /*
1216   add a index element where this is not the first indexed DN for this
1217   value
1218 */
1219 static int ltdb_index_add1_add(struct ldb_context *ldb,
1220                                struct ldb_message *msg,
1221                                int idx,
1222                                const char *dn,
1223                                const struct ldb_schema_attribute *a)
1224 {
1225         struct ldb_val *v2;
1226         unsigned int i;
1227
1228         /* for multi-valued attributes we can end up with repeats */
1229         for (i=0;i<msg->elements[idx].num_values;i++) {
1230                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1231                         return LDB_SUCCESS;
1232                 }
1233         }
1234
1235         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1236                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1237         }
1238
1239         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1240                               struct ldb_val,
1241                               msg->elements[idx].num_values+1);
1242         if (!v2) {
1243                 return LDB_ERR_OPERATIONS_ERROR;
1244         }
1245         msg->elements[idx].values = v2;
1246
1247         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1248         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1249         msg->elements[idx].num_values++;
1250
1251         return LDB_SUCCESS;
1252 }
1253
1254 /*
1255   add an index entry for one message element
1256 */
1257 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1258                            struct ldb_message_element *el, int v_idx)
1259 {
1260         struct ldb_context *ldb;
1261         struct ldb_message *msg;
1262         struct ldb_dn *dn_key;
1263         int ret;
1264         unsigned int i;
1265         const struct ldb_schema_attribute *a;
1266
1267         ldb = ldb_module_get_ctx(module);
1268
1269         msg = talloc(module, struct ldb_message);
1270         if (msg == NULL) {
1271                 errno = ENOMEM;
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274
1275         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1276         if (!dn_key) {
1277                 talloc_free(msg);
1278                 return LDB_ERR_OPERATIONS_ERROR;
1279         }
1280         talloc_steal(msg, dn_key);
1281
1282         ret = ltdb_search_dn1_index(module, dn_key, msg);
1283         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1284                 talloc_free(msg);
1285                 return ret;
1286         }
1287
1288         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1289                 msg->dn = dn_key;
1290                 msg->num_elements = 0;
1291                 msg->elements = NULL;
1292         }
1293
1294         for (i=0;i<msg->num_elements;i++) {
1295                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1296                         break;
1297                 }
1298         }
1299
1300         if (i == msg->num_elements) {
1301                 ret = ltdb_index_add1_new(ldb, msg, dn);
1302         } else {
1303                 ret = ltdb_index_add1_add(ldb, msg, i, dn, a);
1304         }
1305
1306         if (ret == LDB_SUCCESS) {
1307                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1308         }
1309
1310         talloc_free(msg);
1311
1312         return ret;
1313 }
1314
1315 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1316                            struct ldb_message_element *elements, int num_el)
1317 {
1318         void *data = ldb_module_get_private(module);
1319         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1320         int ret;
1321         unsigned int i, j;
1322
1323         if (dn[0] == '@') {
1324                 return LDB_SUCCESS;
1325         }
1326
1327         if (ltdb->cache->indexlist->num_elements == 0) {
1328                 /* no indexed fields */
1329                 return LDB_SUCCESS;
1330         }
1331
1332         for (i = 0; i < num_el; i++) {
1333                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1334                                        NULL, LTDB_IDXATTR);
1335                 if (ret == -1) {
1336                         continue;
1337                 }
1338                 for (j = 0; j < elements[i].num_values; j++) {
1339                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1340                         if (ret != LDB_SUCCESS) {
1341                                 return ret;
1342                         }
1343                 }
1344         }
1345
1346         return LDB_SUCCESS;
1347 }
1348
1349 /*
1350   add the index entries for a new record
1351 */
1352 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1353 {
1354         const char *dn;
1355         int ret;
1356
1357         dn = ldb_dn_get_linearized(msg->dn);
1358         if (dn == NULL) {
1359                 return LDB_ERR_OPERATIONS_ERROR;
1360         }
1361
1362         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1363
1364         return ret;
1365 }
1366
1367
1368 /*
1369   delete an index entry for one message element
1370 */
1371 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1372                          struct ldb_message_element *el, int v_idx)
1373 {
1374         struct ldb_context *ldb;
1375         struct ldb_message *msg;
1376         struct ldb_dn *dn_key;
1377         int ret, i;
1378         unsigned int j;
1379
1380         ldb = ldb_module_get_ctx(module);
1381
1382         if (dn[0] == '@') {
1383                 return LDB_SUCCESS;
1384         }
1385
1386         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1387         if (!dn_key) {
1388                 return LDB_ERR_OPERATIONS_ERROR;
1389         }
1390
1391         msg = talloc(dn_key, struct ldb_message);
1392         if (msg == NULL) {
1393                 talloc_free(dn_key);
1394                 return LDB_ERR_OPERATIONS_ERROR;
1395         }
1396
1397         ret = ltdb_search_dn1_index(module, dn_key, msg);
1398         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1399                 talloc_free(dn_key);
1400                 return ret;
1401         }
1402
1403         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1404                 /* it wasn't indexed. Did we have an earlier error? If we did then
1405                    its gone now */
1406                 talloc_free(dn_key);
1407                 return LDB_SUCCESS;
1408         }
1409
1410         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1411         if (i == -1) {
1412                 struct ldb_ldif ldif;
1413                 char *ldif_string;
1414                 ldif.changetype = LDB_CHANGETYPE_NONE;
1415                 ldif.msg = msg;
1416                 ldif_string = ldb_ldif_write_string(ldb, NULL, &ldif);
1417                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1418                           "ERROR: dn %s not found in %s", dn,
1419                           ldif_string);
1420                 talloc_free(ldif_string);
1421                 /* it ain't there. hmmm */
1422                 talloc_free(dn_key);
1423                 return LDB_SUCCESS;
1424         }
1425
1426         if (j != msg->elements[i].num_values - 1) {
1427                 memmove(&msg->elements[i].values[j],
1428                         &msg->elements[i].values[j+1],
1429                         (msg->elements[i].num_values-(j+1)) *
1430                         sizeof(msg->elements[i].values[0]));
1431         }
1432         msg->elements[i].num_values--;
1433
1434         if (msg->elements[i].num_values == 0) {
1435                 ret = ltdb_delete_noindex(module, dn_key);
1436         } else {
1437                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1438         }
1439
1440         talloc_free(dn_key);
1441
1442         return ret;
1443 }
1444
1445 /*
1446   delete the index entries for a record
1447   return -1 on failure
1448 */
1449 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1450 {
1451         void *data = ldb_module_get_private(module);
1452         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1453         int ret;
1454         const char *dn;
1455         unsigned int i, j;
1456
1457         /* find the list of indexed fields */
1458         if (ltdb->cache->indexlist->num_elements == 0) {
1459                 /* no indexed fields */
1460                 return LDB_SUCCESS;
1461         }
1462
1463         if (ldb_dn_is_special(msg->dn)) {
1464                 return LDB_SUCCESS;
1465         }
1466
1467         dn = ldb_dn_get_linearized(msg->dn);
1468         if (dn == NULL) {
1469                 return LDB_ERR_OPERATIONS_ERROR;
1470         }
1471
1472         for (i = 0; i < msg->num_elements; i++) {
1473                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1474                                        NULL, LTDB_IDXATTR);
1475                 if (ret == -1) {
1476                         continue;
1477                 }
1478                 for (j = 0; j < msg->elements[i].num_values; j++) {
1479                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1480                         if (ret != LDB_SUCCESS) {
1481                                 return ret;
1482                         }
1483                 }
1484         }
1485
1486         return LDB_SUCCESS;
1487 }
1488
1489 /*
1490   handle special index for one level searches
1491 */
1492 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1493 {
1494         void *data = ldb_module_get_private(module);
1495         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1496         struct ldb_message_element el;
1497         struct ldb_val val;
1498         struct ldb_dn *pdn;
1499         const char *dn;
1500         int ret;
1501
1502         if (ldb_dn_is_special(msg->dn)) {
1503                 return LDB_SUCCESS;
1504         }
1505
1506         /* We index for ONE Level only if requested */
1507         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1508         if (ret != 0) {
1509                 return LDB_SUCCESS;
1510         }
1511
1512         pdn = ldb_dn_get_parent(module, msg->dn);
1513         if (pdn == NULL) {
1514                 return LDB_ERR_OPERATIONS_ERROR;
1515         }
1516
1517         dn = ldb_dn_get_linearized(msg->dn);
1518         if (dn == NULL) {
1519                 talloc_free(pdn);
1520                 return LDB_ERR_OPERATIONS_ERROR;
1521         }
1522
1523         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1524         if (val.data == NULL) {
1525                 talloc_free(pdn);
1526                 return LDB_ERR_OPERATIONS_ERROR;
1527         }
1528
1529         val.length = strlen((char *)val.data);
1530         el.name = LTDB_IDXONE;
1531         el.values = &val;
1532         el.num_values = 1;
1533
1534         if (add) {
1535                 ret = ltdb_index_add1(module, dn, &el, 0);
1536         } else { /* delete */
1537                 ret = ltdb_index_del_value(module, dn, &el, 0);
1538         }
1539
1540         talloc_free(pdn);
1541
1542         return ret;
1543 }
1544
1545
1546 /*
1547   traversal function that deletes all @INDEX records
1548 */
1549 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1550 {
1551         const char *dn = "DN=" LTDB_INDEX ":";
1552         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1553                 return tdb_delete(tdb, key);
1554         }
1555         return 0;
1556 }
1557
1558 /*
1559   traversal function that adds @INDEX records during a re index
1560 */
1561 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1562 {
1563         struct ldb_context *ldb;
1564         struct ldb_module *module = (struct ldb_module *)state;
1565         struct ldb_message *msg;
1566         const char *dn = NULL;
1567         int ret;
1568         TDB_DATA key2;
1569
1570         ldb = ldb_module_get_ctx(module);
1571
1572         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1573             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1574                 return 0;
1575         }
1576
1577         msg = talloc(module, struct ldb_message);
1578         if (msg == NULL) {
1579                 return -1;
1580         }
1581
1582         ret = ltdb_unpack_data(module, &data, msg);
1583         if (ret != 0) {
1584                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1585                           ldb_dn_get_linearized(msg->dn));
1586                 talloc_free(msg);
1587                 return -1;
1588         }
1589
1590         /* check if the DN key has changed, perhaps due to the
1591            case insensitivity of an element changing */
1592         key2 = ltdb_key(module, msg->dn);
1593         if (key2.dptr == NULL) {
1594                 /* probably a corrupt record ... darn */
1595                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1596                                                         ldb_dn_get_linearized(msg->dn));
1597                 talloc_free(msg);
1598                 return 0;
1599         }
1600         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1601                 tdb_delete(tdb, key);
1602                 tdb_store(tdb, key2, data, 0);
1603         }
1604         talloc_free(key2.dptr);
1605
1606         if (msg->dn == NULL) {
1607                 dn = (char *)key.dptr + 3;
1608         } else {
1609                 dn = ldb_dn_get_linearized(msg->dn);
1610         }
1611
1612         ret = ltdb_index_one(module, msg, 1);
1613         if (ret == LDB_SUCCESS) {
1614                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1615         } else {
1616                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1617                         "Adding special ONE LEVEL index failed (%s)!",
1618                         ldb_dn_get_linearized(msg->dn));
1619         }
1620
1621         talloc_free(msg);
1622
1623         if (ret != LDB_SUCCESS) return -1;
1624
1625         return 0;
1626 }
1627
1628 /*
1629   force a complete reindex of the database
1630 */
1631 int ltdb_reindex(struct ldb_module *module)
1632 {
1633         void *data = ldb_module_get_private(module);
1634         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1635         int ret;
1636
1637         if (ltdb_cache_reload(module) != 0) {
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         /* first traverse the database deleting any @INDEX records */
1642         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1643         if (ret == -1) {
1644                 return LDB_ERR_OPERATIONS_ERROR;
1645         }
1646
1647         /* if we don't have indexes we have nothing todo */
1648         if (ltdb->cache->indexlist->num_elements == 0) {
1649                 return LDB_SUCCESS;
1650         }
1651
1652         /* now traverse adding any indexes for normal LDB records */
1653         ret = tdb_traverse(ltdb->tdb, re_index, module);
1654         if (ret == -1) {
1655                 return LDB_ERR_OPERATIONS_ERROR;
1656         }
1657
1658         if (ltdb->idxptr) {
1659                 ltdb->idxptr->repack = true;
1660         }
1661
1662         return LDB_SUCCESS;
1663 }