fab60cb1253677a5daf96371a3160864f1c2992b
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "dlinklist.h"
36
37 /*
38   the idxptr code is a bit unusual. The way it works is to replace
39   @IDX elements in records during a transaction with @IDXPTR
40   elements. The @IDXPTR elements don't contain the actual index entry
41   values, but contain a pointer to a linked list of values.
42
43   This means we are storing pointers in a database, which is normally
44   not allowed, but in this case we are storing them only for the
45   duration of a transaction, and re-writing them into the normal @IDX
46   format at the end of the transaction. That means no other processes
47   are ever exposed to the @IDXPTR values.
48
49   The advantage is that the linked list doesn't cause huge
50   fragmentation during a transaction. Without the @IDXPTR method we
51   often ended up with a ldb that was between 10x and 100x larger then
52   it needs to be due to massive fragmentation caused by re-writing
53   @INDEX records many times during indexing.
54  */
55 struct ldb_index_pointer {
56         struct ldb_index_pointer *next, *prev;
57         struct ldb_val value;
58 };
59
60 struct ltdb_idxptr {
61         int num_dns;
62         const char **dn_list;
63         bool repack;
64 };
65
66 /*
67   add to the list of DNs that need to be fixed on transaction end
68  */
69 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
70 {
71         void *data = ldb_module_get_private(module);
72         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
73         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
74                                                const char *, ltdb->idxptr->num_dns+1);
75         if (ltdb->idxptr->dn_list == NULL) {
76                 ltdb->idxptr->num_dns = 0;
77                 return LDB_ERR_OPERATIONS_ERROR;
78         }
79         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
80                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
81         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
82                 return LDB_ERR_OPERATIONS_ERROR;
83         }
84         ltdb->idxptr->num_dns++;
85         return LDB_SUCCESS;
86 }
87
88 /* free an idxptr record */
89 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
90 {
91         struct ldb_val val;
92         struct ldb_index_pointer *ptr;
93
94         if (el->num_values != 1) {
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97
98         val = el->values[0];
99         if (val.length != sizeof(void *)) {
100                 return LDB_ERR_OPERATIONS_ERROR;
101         }
102
103         ptr = *(struct ldb_index_pointer **)val.data;
104         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
105                 return LDB_ERR_OPERATIONS_ERROR;
106         }
107
108         while (ptr) {
109                 struct ldb_index_pointer *tmp = ptr;
110                 DLIST_REMOVE(ptr, ptr);
111                 talloc_free(tmp);
112         }
113
114         return LDB_SUCCESS;
115 }
116
117
118 /* convert from the IDXPTR format to a ldb_message_element format */
119 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
120 {
121         struct ldb_val val;
122         struct ldb_index_pointer *ptr, *tmp;
123         int i;
124         struct ldb_val *val2;
125
126         if (el->num_values != 1) {
127                 return LDB_ERR_OPERATIONS_ERROR;
128         }
129
130         val = el->values[0];
131         if (val.length != sizeof(void *)) {
132                 return LDB_ERR_OPERATIONS_ERROR;
133         }
134
135         ptr = *(struct ldb_index_pointer **)val.data;
136         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         /* count the length of the list */
141         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
142                 i++;
143         }
144
145         /* allocate the new values array */
146         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
147         if (val2 == NULL) {
148                 return LDB_ERR_OPERATIONS_ERROR;
149         }
150         el->values = val2;
151         el->num_values = i;
152
153         /* populate the values array */
154         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
155                 el->values[i].length = tmp->value.length;
156                 /* we need to over-allocate here as there are still some places
157                    in ldb that rely on null termination. */
158                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
159                 if (el->values[i].data == NULL) {
160                         return LDB_ERR_OPERATIONS_ERROR;
161                 }
162                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
163                 el->values[i].data[tmp->value.length] = 0;
164         }
165
166         /* update the name */
167         el->name = LTDB_IDX;
168
169         return LDB_SUCCESS;
170 }
171
172
173 /* convert to the IDXPTR format from a ldb_message_element format */
174 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
175 {
176         struct ldb_index_pointer *ptr, *tmp;
177         int i;
178         struct ldb_val *val2;
179         void *data = ldb_module_get_private(module);
180         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
181
182         ptr = NULL;
183
184         for (i=0;i<el->num_values;i++) {
185                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
186                 if (tmp == NULL) {
187                         return LDB_ERR_OPERATIONS_ERROR;
188                 }
189                 tmp->value = el->values[i];
190                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
191                 if (tmp->value.data == NULL) {
192                         return LDB_ERR_OPERATIONS_ERROR;
193                 }
194                 DLIST_ADD(ptr, tmp);
195         }
196
197         /* allocate the new values array */
198         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
199         if (val2 == NULL) {
200                 return LDB_ERR_OPERATIONS_ERROR;
201         }
202         el->values = val2;
203         el->num_values = 1;
204
205         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
206         el->values[0].length = sizeof(ptr);
207
208         /* update the name */
209         el->name = LTDB_IDXPTR;
210
211         return LDB_SUCCESS;
212 }
213
214
215 /* enable the idxptr mode when transactions start */
216 int ltdb_index_transaction_start(struct ldb_module *module)
217 {
218         void *data = ldb_module_get_private(module);
219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
220         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
221         return LDB_SUCCESS;
222 }
223
224 /*
225   a wrapper around ltdb_search_dn1() which translates pointer based index records
226   and maps them into normal ldb message structures
227  */
228 static int ltdb_search_dn1_index(struct ldb_module *module,
229                                 struct ldb_dn *dn, struct ldb_message *msg)
230 {
231         int ret, i;
232         ret = ltdb_search_dn1(module, dn, msg);
233         if (ret != LDB_SUCCESS) {
234                 return ret;
235         }
236
237         /* if this isn't a @INDEX record then don't munge it */
238         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         for (i=0;i<msg->num_elements;i++) {
243                 struct ldb_message_element *el = &msg->elements[i];
244                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
245                         ret = ltdb_convert_from_idxptr(module, el);
246                         if (ret != LDB_SUCCESS) {
247                                 return ret;
248                         }
249                 }
250         }
251
252         return ret;
253 }
254
255
256
257 /*
258   fixup the idxptr for one DN
259  */
260 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
261 {
262         struct ldb_context *ldb;
263         struct ldb_dn *dn;
264         struct ldb_message *msg = ldb_msg_new(module);
265         int ret;
266
267         ldb = ldb_module_get_ctx(module);
268
269         dn = ldb_dn_new(msg, ldb, strdn);
270         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
271                 ret = ltdb_store(module, msg, TDB_REPLACE);
272         }
273         talloc_free(msg);
274         return ret;
275 }
276
277 /* cleanup the idxptr mode when transaction commits */
278 int ltdb_index_transaction_commit(struct ldb_module *module)
279 {
280         int i;
281         void *data = ldb_module_get_private(module);
282         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
283
284         /* fix all the DNs that we have modified */
285         if (ltdb->idxptr) {
286                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
287                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
288                 }
289
290                 if (ltdb->idxptr->repack) {
291                         tdb_repack(ltdb->tdb);
292                 }
293         }
294
295         talloc_free(ltdb->idxptr);
296         ltdb->idxptr = NULL;
297         return LDB_SUCCESS;
298 }
299
300 /* cleanup the idxptr mode when transaction cancels */
301 int ltdb_index_transaction_cancel(struct ldb_module *module)
302 {
303         void *data = ldb_module_get_private(module);
304         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
305         talloc_free(ltdb->idxptr);
306         ltdb->idxptr = NULL;
307         return LDB_SUCCESS;
308 }
309
310
311
312 /* a wrapper around ltdb_store() for the index code which
313    stores in IDXPTR format when idxptr mode is enabled
314
315    WARNING: This modifies the msg which is passed in
316 */
317 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         int ret;
322
323         if (ltdb->idxptr) {
324                 int i;
325                 struct ldb_message *msg2 = ldb_msg_new(module);
326
327                 /* free any old pointer */
328                 ret = ltdb_search_dn1(module, msg->dn, msg2);
329                 if (ret == 0) {
330                         for (i=0;i<msg2->num_elements;i++) {
331                                 struct ldb_message_element *el = &msg2->elements[i];
332                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
333                                         ret = ltdb_free_idxptr(module, el);
334                                         if (ret != LDB_SUCCESS) {
335                                                 return ret;
336                                         }
337                                 }
338                         }
339                 }
340                 talloc_free(msg2);
341
342                 for (i=0;i<msg->num_elements;i++) {
343                         struct ldb_message_element *el = &msg->elements[i];
344                         if (strcmp(el->name, LTDB_IDX) == 0) {
345                                 ret = ltdb_convert_to_idxptr(module, el);
346                                 if (ret != LDB_SUCCESS) {
347                                         return ret;
348                                 }
349                         }
350                 }
351
352                 if (ltdb_idxptr_add(module, msg) != 0) {
353                         return LDB_ERR_OPERATIONS_ERROR;
354                 }
355         }
356
357         ret = ltdb_store(module, msg, flgs);
358         return ret;
359 }
360
361
362 /*
363   find an element in a list, using the given comparison function and
364   assuming that the list is already sorted using comp_fn
365
366   return -1 if not found, or the index of the first occurance of needle if found
367 */
368 static int ldb_list_find(const void *needle,
369                          const void *base, size_t nmemb, size_t size,
370                          comparison_fn_t comp_fn)
371 {
372         const char *base_p = (const char *)base;
373         size_t min_i, max_i, test_i;
374
375         if (nmemb == 0) {
376                 return -1;
377         }
378
379         min_i = 0;
380         max_i = nmemb-1;
381
382         while (min_i < max_i) {
383                 int r;
384
385                 test_i = (min_i + max_i) / 2;
386                 /* the following cast looks strange, but is
387                  correct. The key to understanding it is that base_p
388                  is a pointer to an array of pointers, so we have to
389                  dereference it after casting to void **. The strange
390                  const in the middle gives us the right type of pointer
391                  after the dereference  (tridge) */
392                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
393                 if (r == 0) {
394                         /* scan back for first element */
395                         while (test_i > 0 &&
396                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
397                                 test_i--;
398                         }
399                         return test_i;
400                 }
401                 if (r < 0) {
402                         if (test_i == 0) {
403                                 return -1;
404                         }
405                         max_i = test_i - 1;
406                 }
407                 if (r > 0) {
408                         min_i = test_i + 1;
409                 }
410         }
411
412         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
413                 return min_i;
414         }
415
416         return -1;
417 }
418
419 struct dn_list {
420         unsigned int count;
421         char **dn;
422 };
423
424 /*
425   return the dn key to be used for an index
426   caller frees
427 */
428 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
429                                      const char *attr, const struct ldb_val *value,
430                                      const struct ldb_schema_attribute **ap)
431 {
432         struct ldb_dn *ret;
433         struct ldb_val v;
434         const struct ldb_schema_attribute *a;
435         char *attr_folded;
436         int r;
437
438         attr_folded = ldb_attr_casefold(ldb, attr);
439         if (!attr_folded) {
440                 return NULL;
441         }
442
443         a = ldb_schema_attribute_by_name(ldb, attr);
444         if (ap) {
445                 *ap = a;
446         }
447         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
448         if (r != LDB_SUCCESS) {
449                 const char *errstr = ldb_errstring(ldb);
450                 /* canonicalisation can be refused. For example, 
451                    a attribute that takes wildcards will refuse to canonicalise
452                    if the value contains a wildcard */
453                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
454                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
455                 talloc_free(attr_folded);
456                 return NULL;
457         }
458         if (ldb_should_b64_encode(&v)) {
459                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
460                 if (!vstr) return NULL;
461                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
462                 talloc_free(vstr);
463         } else {
464                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
465         }
466
467         if (v.data != value->data) {
468                 talloc_free(v.data);
469         }
470         talloc_free(attr_folded);
471
472         return ret;
473 }
474
475 /*
476   see if a attribute value is in the list of indexed attributes
477 */
478 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
479                             unsigned int *v_idx, const char *key)
480 {
481         unsigned int i, j;
482         for (i=0;i<msg->num_elements;i++) {
483                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
484                         const struct ldb_message_element *el = &msg->elements[i];
485
486                         if (attr == NULL) {
487                                 /* in this case we are just looking to see if key is present,
488                                    we are not spearching for a specific index */
489                                 return 0;
490                         }
491
492                         for (j=0;j<el->num_values;j++) {
493                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
494                                         if (v_idx) {
495                                                 *v_idx = j;
496                                         }
497                                         return i;
498                                 }
499                         }
500                 }
501         }
502         return -1;
503 }
504
505 /* used in sorting dn lists */
506 static int list_cmp(const char **s1, const char **s2)
507 {
508         return strcmp(*s1, *s2);
509 }
510
511 /*
512   return a list of dn's that might match a simple indexed search or
513  */
514 static int ltdb_index_dn_simple(struct ldb_module *module,
515                                 const struct ldb_parse_tree *tree,
516                                 const struct ldb_message *index_list,
517                                 struct dn_list *list)
518 {
519         struct ldb_context *ldb;
520         struct ldb_dn *dn;
521         int ret;
522         unsigned int i, j;
523         struct ldb_message *msg;
524
525         ldb = ldb_module_get_ctx(module);
526
527         list->count = 0;
528         list->dn = NULL;
529
530         /* if the attribute isn't in the list of indexed attributes then
531            this node needs a full search */
532         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
533                 return LDB_ERR_OPERATIONS_ERROR;
534         }
535
536         /* the attribute is indexed. Pull the list of DNs that match the 
537            search criterion */
538         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
539         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
540
541         msg = talloc(list, struct ldb_message);
542         if (msg == NULL) {
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         ret = ltdb_search_dn1_index(module, dn, msg);
547         talloc_free(dn);
548         if (ret != LDB_SUCCESS) {
549                 return ret;
550         }
551
552         for (i=0;i<msg->num_elements;i++) {
553                 struct ldb_message_element *el;
554
555                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
556                         continue;
557                 }
558
559                 el = &msg->elements[i];
560
561                 list->dn = talloc_array(list, char *, el->num_values);
562                 if (!list->dn) {
563                         talloc_free(msg);
564                         return LDB_ERR_OPERATIONS_ERROR;
565                 }
566
567                 for (j=0;j<el->num_values;j++) {
568                         list->dn[list->count] =
569                                 talloc_strdup(list->dn, (char *)el->values[j].data);
570                         if (!list->dn[list->count]) {
571                                 talloc_free(msg);
572                                 return LDB_ERR_OPERATIONS_ERROR;
573                         }
574                         list->count++;
575                 }
576         }
577
578         talloc_free(msg);
579
580         if (list->count > 1) {
581                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
582         }
583
584         return LDB_SUCCESS;
585 }
586
587
588 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
589
590 /*
591   return a list of dn's that might match a leaf indexed search
592  */
593 static int ltdb_index_dn_leaf(struct ldb_module *module,
594                               const struct ldb_parse_tree *tree,
595                               const struct ldb_message *index_list,
596                               struct dn_list *list)
597 {
598         struct ldb_context *ldb;
599         ldb = ldb_module_get_ctx(module);
600
601         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
602                 list->dn = talloc_array(list, char *, 1);
603                 if (list->dn == NULL) {
604                         ldb_oom(ldb);
605                         return LDB_ERR_OPERATIONS_ERROR;
606                 }
607                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
608                 if (list->dn[0] == NULL) {
609                         ldb_oom(ldb);
610                         return LDB_ERR_OPERATIONS_ERROR;
611                 }
612                 list->count = 1;
613                 return LDB_SUCCESS;
614         }
615         return ltdb_index_dn_simple(module, tree, index_list, list);
616 }
617
618
619 /*
620   list intersection
621   list = list & list2
622   relies on the lists being sorted
623 */
624 static int list_intersect(struct ldb_context *ldb,
625                           struct dn_list *list, const struct dn_list *list2)
626 {
627         struct dn_list *list3;
628         unsigned int i;
629
630         if (list->count == 0 || list2->count == 0) {
631                 /* 0 & X == 0 */
632                 return LDB_ERR_NO_SUCH_OBJECT;
633         }
634
635         list3 = talloc(ldb, struct dn_list);
636         if (list3 == NULL) {
637                 return LDB_ERR_OPERATIONS_ERROR;
638         }
639
640         list3->dn = talloc_array(list3, char *, list->count);
641         if (!list3->dn) {
642                 talloc_free(list3);
643                 return LDB_ERR_OPERATIONS_ERROR;
644         }
645         list3->count = 0;
646
647         for (i=0;i<list->count;i++) {
648                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
649                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
650                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
651                         list3->count++;
652                 } else {
653                         talloc_free(list->dn[i]);
654                 }
655         }
656
657         talloc_free(list->dn);
658         list->dn = talloc_move(list, &list3->dn);
659         list->count = list3->count;
660         talloc_free(list3);
661
662         return LDB_ERR_NO_SUCH_OBJECT;
663 }
664
665
666 /*
667   list union
668   list = list | list2
669   relies on the lists being sorted
670 */
671 static int list_union(struct ldb_context *ldb,
672                       struct dn_list *list, const struct dn_list *list2)
673 {
674         unsigned int i;
675         char **d;
676         unsigned int count = list->count;
677
678         if (list->count == 0 && list2->count == 0) {
679                 /* 0 | 0 == 0 */
680                 return LDB_ERR_NO_SUCH_OBJECT;
681         }
682
683         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
684         if (!d) {
685                 return LDB_ERR_OPERATIONS_ERROR;
686         }
687         list->dn = d;
688
689         for (i=0;i<list2->count;i++) {
690                 if (ldb_list_find(list2->dn[i], list->dn, count,
691                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
692                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
693                         if (!list->dn[list->count]) {
694                                 return LDB_ERR_OPERATIONS_ERROR;
695                         }
696                         list->count++;
697                 }
698         }
699
700         if (list->count != count) {
701                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
702         }
703
704         return LDB_ERR_NO_SUCH_OBJECT;
705 }
706
707 static int ltdb_index_dn(struct ldb_module *module,
708                          const struct ldb_parse_tree *tree,
709                          const struct ldb_message *index_list,
710                          struct dn_list *list);
711
712
713 /*
714   OR two index results
715  */
716 static int ltdb_index_dn_or(struct ldb_module *module,
717                             const struct ldb_parse_tree *tree,
718                             const struct ldb_message *index_list,
719                             struct dn_list *list)
720 {
721         struct ldb_context *ldb;
722         unsigned int i;
723         int ret;
724
725         ldb = ldb_module_get_ctx(module);
726
727         ret = LDB_ERR_OPERATIONS_ERROR;
728         list->dn = NULL;
729         list->count = 0;
730
731         for (i=0;i<tree->u.list.num_elements;i++) {
732                 struct dn_list *list2;
733                 int v;
734
735                 list2 = talloc(module, struct dn_list);
736                 if (list2 == NULL) {
737                         return LDB_ERR_OPERATIONS_ERROR;
738                 }
739
740                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
741
742                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
743                         /* 0 || X == X */
744                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
745                                 ret = v;
746                         }
747                         talloc_free(list2);
748                         continue;
749                 }
750
751                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
752                         /* 1 || X == 1 */
753                         talloc_free(list->dn);
754                         talloc_free(list2);
755                         return v;
756                 }
757
758                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
759                         ret = LDB_SUCCESS;
760                         list->dn = talloc_move(list, &list2->dn);
761                         list->count = list2->count;
762                 } else {
763                         if (list_union(ldb, list, list2) == -1) {
764                                 talloc_free(list2);
765                                 return LDB_ERR_OPERATIONS_ERROR;
766                         }
767                         ret = LDB_SUCCESS;
768                 }
769                 talloc_free(list2);
770         }
771
772         if (list->count == 0) {
773                 return LDB_ERR_NO_SUCH_OBJECT;
774         }
775
776         return ret;
777 }
778
779
780 /*
781   NOT an index results
782  */
783 static int ltdb_index_dn_not(struct ldb_module *module,
784                              const struct ldb_parse_tree *tree,
785                              const struct ldb_message *index_list,
786                              struct dn_list *list)
787 {
788         /* the only way to do an indexed not would be if we could
789            negate the not via another not or if we knew the total
790            number of database elements so we could know that the
791            existing expression covered the whole database.
792
793            instead, we just give up, and rely on a full index scan
794            (unless an outer & manages to reduce the list)
795         */
796         return LDB_ERR_OPERATIONS_ERROR;
797 }
798
799
800 static bool ltdb_index_unique(struct ldb_context *ldb,
801                               const char *attr)
802 {
803         const struct ldb_schema_attribute *a;
804         a = ldb_schema_attribute_by_name(ldb, attr);
805         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
806                 return true;
807         }
808         return false;
809 }
810
811 /*
812   AND two index results
813  */
814 static int ltdb_index_dn_and(struct ldb_module *module,
815                              const struct ldb_parse_tree *tree,
816                              const struct ldb_message *index_list,
817                              struct dn_list *list)
818 {
819         struct ldb_context *ldb;
820         unsigned int i;
821         int ret, pass;
822
823         ldb = ldb_module_get_ctx(module);
824
825         ret = LDB_ERR_OPERATIONS_ERROR;
826         list->dn = NULL;
827         list->count = 0;
828
829         for (pass=0;pass<=1;pass++) {
830                 /* in the first pass we only look for unique simple
831                    equality tests, in the hope of avoiding having to look
832                    at any others */
833                 bool only_unique = pass==0?true:false;
834
835                 for (i=0;i<tree->u.list.num_elements;i++) {
836                         struct dn_list *list2;
837                         int v;
838                         bool is_unique = false;
839                         const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
840
841                         if (subtree->operation == LDB_OP_EQUALITY &&
842                             ltdb_index_unique(ldb, subtree->u.equality.attr)) {
843                                 is_unique = true;
844                         }
845                         if (is_unique != only_unique) continue;
846                         
847                         list2 = talloc(module, struct dn_list);
848                         if (list2 == NULL) {
849                                 return LDB_ERR_OPERATIONS_ERROR;
850                         }
851                         
852                         v = ltdb_index_dn(module, subtree, index_list, list2);
853
854                         if (v == LDB_ERR_NO_SUCH_OBJECT) {
855                                 /* 0 && X == 0 */
856                                 talloc_free(list->dn);
857                                 talloc_free(list2);
858                                 return LDB_ERR_NO_SUCH_OBJECT;
859                         }
860                         
861                         if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
862                                 talloc_free(list2);
863                                 continue;
864                         }
865                         
866                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
867                                 ret = LDB_SUCCESS;
868                                 talloc_free(list->dn);
869                                 list->dn = talloc_move(list, &list2->dn);
870                                 list->count = list2->count;
871                         } else {
872                                 if (list_intersect(ldb, list, list2) == -1) {
873                                         talloc_free(list2);
874                                         return LDB_ERR_OPERATIONS_ERROR;
875                                 }
876                         }
877                         
878                         talloc_free(list2);
879                         
880                         if (list->count == 0) {
881                                 talloc_free(list->dn);
882                                 return LDB_ERR_NO_SUCH_OBJECT;
883                         }
884                         
885                         if (list->count == 1) {
886                                 /* it isn't worth loading the next part of the tree */
887                                 return ret;
888                         }
889                 }
890         }       
891         return ret;
892 }
893         
894 /*
895   AND index results and ONE level special index
896  */
897 static int ltdb_index_dn_one(struct ldb_module *module,
898                              struct ldb_dn *parent_dn,
899                              struct dn_list *list)
900 {
901         struct ldb_context *ldb;
902         struct dn_list *list2;
903         struct ldb_message *msg;
904         struct ldb_dn *key;
905         struct ldb_val val;
906         unsigned int i, j;
907         int ret;
908
909         ldb = ldb_module_get_ctx(module);
910
911         list2 = talloc_zero(module, struct dn_list);
912         if (list2 == NULL) {
913                 return LDB_ERR_OPERATIONS_ERROR;
914         }
915
916         /* the attribute is indexed. Pull the list of DNs that match the
917            search criterion */
918         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
919         val.length = strlen((char *)val.data);
920         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
921         if (!key) {
922                 talloc_free(list2);
923                 return LDB_ERR_OPERATIONS_ERROR;
924         }
925
926         msg = talloc(list2, struct ldb_message);
927         if (msg == NULL) {
928                 talloc_free(list2);
929                 return LDB_ERR_OPERATIONS_ERROR;
930         }
931
932         ret = ltdb_search_dn1_index(module, key, msg);
933         talloc_free(key);
934         if (ret != LDB_SUCCESS) {
935                 return ret;
936         }
937
938         for (i = 0; i < msg->num_elements; i++) {
939                 struct ldb_message_element *el;
940
941                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
942                         continue;
943                 }
944
945                 el = &msg->elements[i];
946
947                 list2->dn = talloc_array(list2, char *, el->num_values);
948                 if (!list2->dn) {
949                         talloc_free(list2);
950                         return LDB_ERR_OPERATIONS_ERROR;
951                 }
952
953                 for (j = 0; j < el->num_values; j++) {
954                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
955                         if (!list2->dn[list2->count]) {
956                                 talloc_free(list2);
957                                 return LDB_ERR_OPERATIONS_ERROR;
958                         }
959                         list2->count++;
960                 }
961         }
962
963         if (list2->count == 0) {
964                 talloc_free(list2);
965                 return LDB_ERR_NO_SUCH_OBJECT;
966         }
967
968         if (list2->count > 1) {
969                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
970         }
971
972         if (list->count > 0) {
973                 if (list_intersect(ldb, list, list2) == -1) {
974                         talloc_free(list2);
975                         return LDB_ERR_OPERATIONS_ERROR;
976                 }
977
978                 if (list->count == 0) {
979                         talloc_free(list->dn);
980                         talloc_free(list2);
981                         return LDB_ERR_NO_SUCH_OBJECT;
982                 }
983         } else {
984                 list->dn = talloc_move(list, &list2->dn);
985                 list->count = list2->count;
986         }
987
988         talloc_free(list2);
989
990         return LDB_SUCCESS;
991 }
992
993 /*
994   return a list of dn's that might match a indexed search or
995   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
996  */
997 static int ltdb_index_dn(struct ldb_module *module,
998                          const struct ldb_parse_tree *tree,
999                          const struct ldb_message *index_list,
1000                          struct dn_list *list)
1001 {
1002         int ret = LDB_ERR_OPERATIONS_ERROR;
1003
1004         switch (tree->operation) {
1005         case LDB_OP_AND:
1006                 ret = ltdb_index_dn_and(module, tree, index_list, list);
1007                 break;
1008
1009         case LDB_OP_OR:
1010                 ret = ltdb_index_dn_or(module, tree, index_list, list);
1011                 break;
1012
1013         case LDB_OP_NOT:
1014                 ret = ltdb_index_dn_not(module, tree, index_list, list);
1015                 break;
1016
1017         case LDB_OP_EQUALITY:
1018                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
1019                 break;
1020
1021         case LDB_OP_SUBSTRING:
1022         case LDB_OP_GREATER:
1023         case LDB_OP_LESS:
1024         case LDB_OP_PRESENT:
1025         case LDB_OP_APPROX:
1026         case LDB_OP_EXTENDED:
1027                 /* we can't index with fancy bitops yet */
1028                 ret = LDB_ERR_OPERATIONS_ERROR;
1029                 break;
1030         }
1031
1032         return ret;
1033 }
1034
1035 /*
1036   filter a candidate dn_list from an indexed search into a set of results
1037   extracting just the given attributes
1038 */
1039 static int ltdb_index_filter(const struct dn_list *dn_list,
1040                              struct ltdb_context *ac)
1041 {
1042         struct ldb_context *ldb;
1043         struct ldb_message *msg;
1044         unsigned int i;
1045
1046         ldb = ldb_module_get_ctx(ac->module);
1047
1048         for (i = 0; i < dn_list->count; i++) {
1049                 struct ldb_dn *dn;
1050                 int ret;
1051
1052                 msg = ldb_msg_new(ac);
1053                 if (!msg) {
1054                         return LDB_ERR_OPERATIONS_ERROR;
1055                 }
1056
1057                 dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
1058                 if (dn == NULL) {
1059                         talloc_free(msg);
1060                         return LDB_ERR_OPERATIONS_ERROR;
1061                 }
1062
1063                 ret = ltdb_search_dn1(ac->module, dn, msg);
1064                 talloc_free(dn);
1065                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1066                         /* the record has disappeared? yes, this can happen */
1067                         talloc_free(msg);
1068                         continue;
1069                 }
1070
1071                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1072                         /* an internal error */
1073                         talloc_free(msg);
1074                         return LDB_ERR_OPERATIONS_ERROR;
1075                 }
1076
1077                 if (!ldb_match_msg(ldb, msg,
1078                                    ac->tree, ac->base, ac->scope)) {
1079                         talloc_free(msg);
1080                         continue;
1081                 }
1082
1083                 /* filter the attributes that the user wants */
1084                 ret = ltdb_filter_attrs(msg, ac->attrs);
1085
1086                 if (ret == -1) {
1087                         talloc_free(msg);
1088                         return LDB_ERR_OPERATIONS_ERROR;
1089                 }
1090
1091                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1092                 if (ret != LDB_SUCCESS) {
1093                         ac->request_terminated = true;
1094                         return ret;
1095                 }
1096         }
1097
1098         return LDB_SUCCESS;
1099 }
1100
1101 /*
1102   search the database with a LDAP-like expression using indexes
1103   returns -1 if an indexed search is not possible, in which
1104   case the caller should call ltdb_search_full()
1105 */
1106 int ltdb_search_indexed(struct ltdb_context *ac)
1107 {
1108         struct ldb_context *ldb;
1109         void *data = ldb_module_get_private(ac->module);
1110         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1111         struct dn_list *dn_list;
1112         int ret, idxattr, idxone;
1113
1114         ldb = ldb_module_get_ctx(ac->module);
1115
1116         idxattr = idxone = 0;
1117         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1118         if (ret == 0 ) {
1119                 idxattr = 1;
1120         }
1121
1122         /* We do one level indexing only if requested */
1123         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1124         if (ret == 0 ) {
1125                 idxone = 1;
1126         }
1127
1128         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1129             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1130                 /* no indexes? must do full search */
1131                 return LDB_ERR_OPERATIONS_ERROR;
1132         }
1133
1134         ret = LDB_ERR_OPERATIONS_ERROR;
1135
1136         dn_list = talloc_zero(ac, struct dn_list);
1137         if (dn_list == NULL) {
1138                 return LDB_ERR_OPERATIONS_ERROR;
1139         }
1140
1141         if (ac->scope == LDB_SCOPE_BASE) {
1142                 /* with BASE searches only one DN can match */
1143                 dn_list->dn = talloc_array(dn_list, char *, 1);
1144                 if (dn_list->dn == NULL) {
1145                         ldb_oom(ldb);
1146                         return LDB_ERR_OPERATIONS_ERROR;
1147                 }
1148                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1149                 if (dn_list->dn[0] == NULL) {
1150                         ldb_oom(ldb);
1151                         return LDB_ERR_OPERATIONS_ERROR;
1152                 }
1153                 dn_list->count = 1;
1154                 ret = LDB_SUCCESS;
1155         }
1156
1157         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1158                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1159         }
1160
1161         if (ret == LDB_ERR_OPERATIONS_ERROR &&
1162             ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1163                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1164         }
1165
1166         if (ret == LDB_SUCCESS) {
1167                 /* we've got a candidate list - now filter by the full tree
1168                    and extract the needed attributes */
1169                 ret = ltdb_index_filter(dn_list, ac);
1170         }
1171
1172         talloc_free(dn_list);
1173
1174         return ret;
1175 }
1176
1177 /*
1178   add a index element where this is the first indexed DN for this value
1179 */
1180 static int ltdb_index_add1_new(struct ldb_context *ldb,
1181                                struct ldb_message *msg,
1182                                const char *dn)
1183 {
1184         struct ldb_message_element *el;
1185
1186         /* add another entry */
1187         el = talloc_realloc(msg, msg->elements,
1188                                struct ldb_message_element, msg->num_elements+1);
1189         if (!el) {
1190                 return LDB_ERR_OPERATIONS_ERROR;
1191         }
1192
1193         msg->elements = el;
1194         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1195         if (!msg->elements[msg->num_elements].name) {
1196                 return LDB_ERR_OPERATIONS_ERROR;
1197         }
1198         msg->elements[msg->num_elements].num_values = 0;
1199         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1200         if (!msg->elements[msg->num_elements].values) {
1201                 return LDB_ERR_OPERATIONS_ERROR;
1202         }
1203         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1204         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1205         msg->elements[msg->num_elements].num_values = 1;
1206         msg->num_elements++;
1207
1208         return LDB_SUCCESS;
1209 }
1210
1211
1212 /*
1213   add a index element where this is not the first indexed DN for this
1214   value
1215 */
1216 static int ltdb_index_add1_add(struct ldb_context *ldb,
1217                                struct ldb_message *msg,
1218                                int idx,
1219                                const char *dn,
1220                                const struct ldb_schema_attribute *a)
1221 {
1222         struct ldb_val *v2;
1223         unsigned int i;
1224
1225         /* for multi-valued attributes we can end up with repeats */
1226         for (i=0;i<msg->elements[idx].num_values;i++) {
1227                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1228                         return LDB_SUCCESS;
1229                 }
1230         }
1231
1232         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1233                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1234         }
1235
1236         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1237                               struct ldb_val,
1238                               msg->elements[idx].num_values+1);
1239         if (!v2) {
1240                 return LDB_ERR_OPERATIONS_ERROR;
1241         }
1242         msg->elements[idx].values = v2;
1243
1244         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1245         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1246         msg->elements[idx].num_values++;
1247
1248         return LDB_SUCCESS;
1249 }
1250
1251 /*
1252   add an index entry for one message element
1253 */
1254 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1255                            struct ldb_message_element *el, int v_idx)
1256 {
1257         struct ldb_context *ldb;
1258         struct ldb_message *msg;
1259         struct ldb_dn *dn_key;
1260         int ret;
1261         unsigned int i;
1262         const struct ldb_schema_attribute *a;
1263
1264         ldb = ldb_module_get_ctx(module);
1265
1266         msg = talloc(module, struct ldb_message);
1267         if (msg == NULL) {
1268                 errno = ENOMEM;
1269                 return LDB_ERR_OPERATIONS_ERROR;
1270         }
1271
1272         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1273         if (!dn_key) {
1274                 talloc_free(msg);
1275                 return LDB_ERR_OPERATIONS_ERROR;
1276         }
1277         talloc_steal(msg, dn_key);
1278
1279         ret = ltdb_search_dn1_index(module, dn_key, msg);
1280         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1281                 talloc_free(msg);
1282                 return ret;
1283         }
1284
1285         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1286                 msg->dn = dn_key;
1287                 msg->num_elements = 0;
1288                 msg->elements = NULL;
1289         }
1290
1291         for (i=0;i<msg->num_elements;i++) {
1292                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1293                         break;
1294                 }
1295         }
1296
1297         if (i == msg->num_elements) {
1298                 ret = ltdb_index_add1_new(ldb, msg, dn);
1299         } else {
1300                 ret = ltdb_index_add1_add(ldb, msg, i, dn, a);
1301         }
1302
1303         if (ret == LDB_SUCCESS) {
1304                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1305         }
1306
1307         talloc_free(msg);
1308
1309         return ret;
1310 }
1311
1312 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1313                            struct ldb_message_element *elements, int num_el)
1314 {
1315         void *data = ldb_module_get_private(module);
1316         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1317         int ret;
1318         unsigned int i, j;
1319
1320         if (dn[0] == '@') {
1321                 return LDB_SUCCESS;
1322         }
1323
1324         if (ltdb->cache->indexlist->num_elements == 0) {
1325                 /* no indexed fields */
1326                 return LDB_SUCCESS;
1327         }
1328
1329         for (i = 0; i < num_el; i++) {
1330                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1331                                        NULL, LTDB_IDXATTR);
1332                 if (ret == -1) {
1333                         continue;
1334                 }
1335                 for (j = 0; j < elements[i].num_values; j++) {
1336                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1337                         if (ret != LDB_SUCCESS) {
1338                                 return ret;
1339                         }
1340                 }
1341         }
1342
1343         return LDB_SUCCESS;
1344 }
1345
1346 /*
1347   add the index entries for a new record
1348 */
1349 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1350 {
1351         const char *dn;
1352         int ret;
1353
1354         dn = ldb_dn_get_linearized(msg->dn);
1355         if (dn == NULL) {
1356                 return LDB_ERR_OPERATIONS_ERROR;
1357         }
1358
1359         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1360
1361         return ret;
1362 }
1363
1364
1365 /*
1366   delete an index entry for one message element
1367 */
1368 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1369                          struct ldb_message_element *el, int v_idx)
1370 {
1371         struct ldb_context *ldb;
1372         struct ldb_message *msg;
1373         struct ldb_dn *dn_key;
1374         int ret, i;
1375         unsigned int j;
1376
1377         ldb = ldb_module_get_ctx(module);
1378
1379         if (dn[0] == '@') {
1380                 return LDB_SUCCESS;
1381         }
1382
1383         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1384         if (!dn_key) {
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         msg = talloc(dn_key, struct ldb_message);
1389         if (msg == NULL) {
1390                 talloc_free(dn_key);
1391                 return LDB_ERR_OPERATIONS_ERROR;
1392         }
1393
1394         ret = ltdb_search_dn1_index(module, dn_key, msg);
1395         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1396                 talloc_free(dn_key);
1397                 return ret;
1398         }
1399
1400         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1401                 /* it wasn't indexed. Did we have an earlier error? If we did then
1402                    its gone now */
1403                 talloc_free(dn_key);
1404                 return LDB_SUCCESS;
1405         }
1406
1407         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1408         if (i == -1) {
1409                 struct ldb_ldif ldif;
1410
1411                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1412                                 "ERROR: dn %s not found in %s\n", dn,
1413                                 ldb_dn_get_linearized(dn_key));
1414                 ldif.changetype = LDB_CHANGETYPE_NONE;
1415                 ldif.msg = msg;
1416                 ldb_ldif_write_file(ldb, stdout, &ldif);
1417                 sleep(100);
1418                 /* it ain't there. hmmm */
1419                 talloc_free(dn_key);
1420                 return LDB_SUCCESS;
1421         }
1422
1423         if (j != msg->elements[i].num_values - 1) {
1424                 memmove(&msg->elements[i].values[j],
1425                         &msg->elements[i].values[j+1],
1426                         (msg->elements[i].num_values-(j+1)) *
1427                         sizeof(msg->elements[i].values[0]));
1428         }
1429         msg->elements[i].num_values--;
1430
1431         if (msg->elements[i].num_values == 0) {
1432                 ret = ltdb_delete_noindex(module, dn_key);
1433         } else {
1434                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1435         }
1436
1437         talloc_free(dn_key);
1438
1439         return ret;
1440 }
1441
1442 /*
1443   delete the index entries for a record
1444   return -1 on failure
1445 */
1446 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1447 {
1448         void *data = ldb_module_get_private(module);
1449         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1450         int ret;
1451         const char *dn;
1452         unsigned int i, j;
1453
1454         /* find the list of indexed fields */
1455         if (ltdb->cache->indexlist->num_elements == 0) {
1456                 /* no indexed fields */
1457                 return LDB_SUCCESS;
1458         }
1459
1460         if (ldb_dn_is_special(msg->dn)) {
1461                 return LDB_SUCCESS;
1462         }
1463
1464         dn = ldb_dn_get_linearized(msg->dn);
1465         if (dn == NULL) {
1466                 return LDB_ERR_OPERATIONS_ERROR;
1467         }
1468
1469         for (i = 0; i < msg->num_elements; i++) {
1470                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1471                                        NULL, LTDB_IDXATTR);
1472                 if (ret == -1) {
1473                         continue;
1474                 }
1475                 for (j = 0; j < msg->elements[i].num_values; j++) {
1476                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1477                         if (ret != LDB_SUCCESS) {
1478                                 return ret;
1479                         }
1480                 }
1481         }
1482
1483         return LDB_SUCCESS;
1484 }
1485
1486 /*
1487   handle special index for one level searches
1488 */
1489 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1490 {
1491         void *data = ldb_module_get_private(module);
1492         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1493         struct ldb_message_element el;
1494         struct ldb_val val;
1495         struct ldb_dn *pdn;
1496         const char *dn;
1497         int ret;
1498
1499         if (ldb_dn_is_special(msg->dn)) {
1500                 return LDB_SUCCESS;
1501         }
1502
1503         /* We index for ONE Level only if requested */
1504         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1505         if (ret != 0) {
1506                 return LDB_SUCCESS;
1507         }
1508
1509         pdn = ldb_dn_get_parent(module, msg->dn);
1510         if (pdn == NULL) {
1511                 return LDB_ERR_OPERATIONS_ERROR;
1512         }
1513
1514         dn = ldb_dn_get_linearized(msg->dn);
1515         if (dn == NULL) {
1516                 talloc_free(pdn);
1517                 return LDB_ERR_OPERATIONS_ERROR;
1518         }
1519
1520         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1521         if (val.data == NULL) {
1522                 talloc_free(pdn);
1523                 return LDB_ERR_OPERATIONS_ERROR;
1524         }
1525
1526         val.length = strlen((char *)val.data);
1527         el.name = LTDB_IDXONE;
1528         el.values = &val;
1529         el.num_values = 1;
1530
1531         if (add) {
1532                 ret = ltdb_index_add1(module, dn, &el, 0);
1533         } else { /* delete */
1534                 ret = ltdb_index_del_value(module, dn, &el, 0);
1535         }
1536
1537         talloc_free(pdn);
1538
1539         return ret;
1540 }
1541
1542
1543 /*
1544   traversal function that deletes all @INDEX records
1545 */
1546 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1547 {
1548         const char *dn = "DN=" LTDB_INDEX ":";
1549         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1550                 return tdb_delete(tdb, key);
1551         }
1552         return 0;
1553 }
1554
1555 /*
1556   traversal function that adds @INDEX records during a re index
1557 */
1558 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1559 {
1560         struct ldb_context *ldb;
1561         struct ldb_module *module = (struct ldb_module *)state;
1562         struct ldb_message *msg;
1563         const char *dn = NULL;
1564         int ret;
1565         TDB_DATA key2;
1566
1567         ldb = ldb_module_get_ctx(module);
1568
1569         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1570             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1571                 return 0;
1572         }
1573
1574         msg = talloc(module, struct ldb_message);
1575         if (msg == NULL) {
1576                 return -1;
1577         }
1578
1579         ret = ltdb_unpack_data(module, &data, msg);
1580         if (ret != 0) {
1581                 talloc_free(msg);
1582                 return -1;
1583         }
1584
1585         /* check if the DN key has changed, perhaps due to the
1586            case insensitivity of an element changing */
1587         key2 = ltdb_key(module, msg->dn);
1588         if (key2.dptr == NULL) {
1589                 /* probably a corrupt record ... darn */
1590                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1591                                                         ldb_dn_get_linearized(msg->dn));
1592                 talloc_free(msg);
1593                 return 0;
1594         }
1595         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1596                 tdb_delete(tdb, key);
1597                 tdb_store(tdb, key2, data, 0);
1598         }
1599         talloc_free(key2.dptr);
1600
1601         if (msg->dn == NULL) {
1602                 dn = (char *)key.dptr + 3;
1603         } else {
1604                 dn = ldb_dn_get_linearized(msg->dn);
1605         }
1606
1607         ret = ltdb_index_one(module, msg, 1);
1608         if (ret == LDB_SUCCESS) {
1609                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1610         } else {
1611                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1612                         "Adding special ONE LEVEL index failed (%s)!\n",
1613                         ldb_dn_get_linearized(msg->dn));
1614         }
1615
1616         talloc_free(msg);
1617
1618         if (ret != LDB_SUCCESS) return -1;
1619
1620         return 0;
1621 }
1622
1623 /*
1624   force a complete reindex of the database
1625 */
1626 int ltdb_reindex(struct ldb_module *module)
1627 {
1628         void *data = ldb_module_get_private(module);
1629         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1630         int ret;
1631
1632         if (ltdb_cache_reload(module) != 0) {
1633                 return LDB_ERR_OPERATIONS_ERROR;
1634         }
1635
1636         /* first traverse the database deleting any @INDEX records */
1637         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1638         if (ret == -1) {
1639                 return LDB_ERR_OPERATIONS_ERROR;
1640         }
1641
1642         /* if we don't have indexes we have nothing todo */
1643         if (ltdb->cache->indexlist->num_elements == 0) {
1644                 return LDB_SUCCESS;
1645         }
1646
1647         /* now traverse adding any indexes for normal LDB records */
1648         ret = tdb_traverse(ltdb->tdb, re_index, module);
1649         if (ret == -1) {
1650                 return LDB_ERR_OPERATIONS_ERROR;
1651         }
1652
1653         if (ltdb->idxptr) {
1654                 ltdb->idxptr->repack = true;
1655         }
1656
1657         return LDB_SUCCESS;
1658 }