cdbef3944b531fa70922d43e5cc9970d69f4e93d
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 /*
37   the idxptr code is a bit unusual. The way it works is to replace
38   @IDX elements in records during a transaction with @IDXPTR
39   elements. The @IDXPTR elements don't contain the actual index entry
40   values, but contain a pointer to a linked list of values.
41
42   This means we are storing pointers in a database, which is normally
43   not allowed, but in this case we are storing them only for the
44   duration of a transaction, and re-writing them into the normal @IDX
45   format at the end of the transaction. That means no other processes
46   are ever exposed to the @IDXPTR values.
47
48   The advantage is that the linked list doesn't cause huge
49   fragmentation during a transaction. Without the @IDXPTR method we
50   often ended up with a ldb that was between 10x and 100x larger then
51   it needs to be due to massive fragmentation caused by re-writing
52   @INDEX records many times during indexing.
53  */
54 struct ldb_index_pointer {
55         struct ldb_index_pointer *next, *prev;
56         struct ldb_val value;
57 };
58
59 struct ltdb_idxptr {
60         int num_dns;
61         const char **dn_list;
62         bool repack;
63 };
64
65 /*
66   add to the list of DNs that need to be fixed on transaction end
67  */
68 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
69 {
70         void *data = ldb_module_get_private(module);
71         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
72         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
73                                                const char *, ltdb->idxptr->num_dns+1);
74         if (ltdb->idxptr->dn_list == NULL) {
75                 ltdb->idxptr->num_dns = 0;
76                 return LDB_ERR_OPERATIONS_ERROR;
77         }
78         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
79                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
80         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
81                 return LDB_ERR_OPERATIONS_ERROR;
82         }
83         ltdb->idxptr->num_dns++;
84         return LDB_SUCCESS;
85 }
86
87 /* free an idxptr record */
88 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
89 {
90         struct ldb_val val;
91         struct ldb_index_pointer *ptr;
92
93         if (el->num_values != 1) {
94                 return LDB_ERR_OPERATIONS_ERROR;
95         }
96
97         val = el->values[0];
98         if (val.length != sizeof(void *)) {
99                 return LDB_ERR_OPERATIONS_ERROR;
100         }
101
102         ptr = *(struct ldb_index_pointer **)val.data;
103         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
104                 return LDB_ERR_OPERATIONS_ERROR;
105         }
106
107         while (ptr) {
108                 struct ldb_index_pointer *tmp = ptr;
109                 DLIST_REMOVE(ptr, ptr);
110                 talloc_free(tmp);
111         }
112
113         return LDB_SUCCESS;
114 }
115
116
117 /* convert from the IDXPTR format to a ldb_message_element format */
118 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
119 {
120         struct ldb_val val;
121         struct ldb_index_pointer *ptr, *tmp;
122         int i;
123         struct ldb_val *val2;
124
125         if (el->num_values != 1) {
126                 return LDB_ERR_OPERATIONS_ERROR;
127         }
128
129         val = el->values[0];
130         if (val.length != sizeof(void *)) {
131                 return LDB_ERR_OPERATIONS_ERROR;
132         }
133
134         ptr = *(struct ldb_index_pointer **)val.data;
135         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
136                 return LDB_ERR_OPERATIONS_ERROR;
137         }
138
139         /* count the length of the list */
140         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
141                 i++;
142         }
143
144         /* allocate the new values array */
145         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
146         if (val2 == NULL) {
147                 return LDB_ERR_OPERATIONS_ERROR;
148         }
149         el->values = val2;
150         el->num_values = i;
151
152         /* populate the values array */
153         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
154                 el->values[i].length = tmp->value.length;
155                 /* we need to over-allocate here as there are still some places
156                    in ldb that rely on null termination. */
157                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
158                 if (el->values[i].data == NULL) {
159                         return LDB_ERR_OPERATIONS_ERROR;
160                 }
161                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
162                 el->values[i].data[tmp->value.length] = 0;
163         }
164
165         /* update the name */
166         el->name = LTDB_IDX;
167
168         return LDB_SUCCESS;
169 }
170
171
172 /* convert to the IDXPTR format from a ldb_message_element format */
173 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
174 {
175         struct ldb_index_pointer *ptr, *tmp;
176         int i;
177         struct ldb_val *val2;
178         void *data = ldb_module_get_private(module);
179         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
180
181         ptr = NULL;
182
183         for (i=0;i<el->num_values;i++) {
184                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
185                 if (tmp == NULL) {
186                         return LDB_ERR_OPERATIONS_ERROR;
187                 }
188                 tmp->value = el->values[i];
189                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
190                 if (tmp->value.data == NULL) {
191                         return LDB_ERR_OPERATIONS_ERROR;
192                 }
193                 DLIST_ADD(ptr, tmp);
194         }
195
196         /* allocate the new values array */
197         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
198         if (val2 == NULL) {
199                 return LDB_ERR_OPERATIONS_ERROR;
200         }
201         el->values = val2;
202         el->num_values = 1;
203
204         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
205         el->values[0].length = sizeof(ptr);
206
207         /* update the name */
208         el->name = LTDB_IDXPTR;
209
210         return LDB_SUCCESS;
211 }
212
213
214 /* enable the idxptr mode when transactions start */
215 int ltdb_index_transaction_start(struct ldb_module *module)
216 {
217         void *data = ldb_module_get_private(module);
218         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
219         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
220         return LDB_SUCCESS;
221 }
222
223 /*
224   a wrapper around ltdb_search_dn1() which translates pointer based index records
225   and maps them into normal ldb message structures
226  */
227 static int ltdb_search_dn1_index(struct ldb_module *module,
228                                 struct ldb_dn *dn, struct ldb_message *msg)
229 {
230         int ret, i;
231         ret = ltdb_search_dn1(module, dn, msg);
232         if (ret != LDB_SUCCESS) {
233                 return ret;
234         }
235
236         /* if this isn't a @INDEX record then don't munge it */
237         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
238                 return LDB_ERR_OPERATIONS_ERROR;
239         }
240
241         for (i=0;i<msg->num_elements;i++) {
242                 struct ldb_message_element *el = &msg->elements[i];
243                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
244                         ret = ltdb_convert_from_idxptr(module, el);
245                         if (ret != LDB_SUCCESS) {
246                                 return ret;
247                         }
248                 }
249         }
250
251         return ret;
252 }
253
254
255
256 /*
257   fixup the idxptr for one DN
258  */
259 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
260 {
261         struct ldb_context *ldb;
262         struct ldb_dn *dn;
263         struct ldb_message *msg = ldb_msg_new(module);
264         int ret;
265
266         ldb = ldb_module_get_ctx(module);
267
268         dn = ldb_dn_new(msg, ldb, strdn);
269         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
270                 ret = ltdb_store(module, msg, TDB_REPLACE);
271         }
272         talloc_free(msg);
273         return ret;
274 }
275
276 /* cleanup the idxptr mode when transaction commits */
277 int ltdb_index_transaction_commit(struct ldb_module *module)
278 {
279         int i;
280         void *data = ldb_module_get_private(module);
281         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
282
283         /* fix all the DNs that we have modified */
284         if (ltdb->idxptr) {
285                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
286                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
287                 }
288
289                 if (ltdb->idxptr->repack) {
290                         tdb_repack(ltdb->tdb);
291                 }
292         }
293
294         talloc_free(ltdb->idxptr);
295         ltdb->idxptr = NULL;
296         return LDB_SUCCESS;
297 }
298
299 /* cleanup the idxptr mode when transaction cancels */
300 int ltdb_index_transaction_cancel(struct ldb_module *module)
301 {
302         void *data = ldb_module_get_private(module);
303         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
304         talloc_free(ltdb->idxptr);
305         ltdb->idxptr = NULL;
306         return LDB_SUCCESS;
307 }
308
309
310
311 /* a wrapper around ltdb_store() for the index code which
312    stores in IDXPTR format when idxptr mode is enabled
313
314    WARNING: This modifies the msg which is passed in
315 */
316 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
317 {
318         void *data = ldb_module_get_private(module);
319         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
320         int ret;
321
322         if (ltdb->idxptr) {
323                 int i;
324                 struct ldb_message *msg2 = ldb_msg_new(module);
325
326                 /* free any old pointer */
327                 ret = ltdb_search_dn1(module, msg->dn, msg2);
328                 if (ret == 0) {
329                         for (i=0;i<msg2->num_elements;i++) {
330                                 struct ldb_message_element *el = &msg2->elements[i];
331                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
332                                         ret = ltdb_free_idxptr(module, el);
333                                         if (ret != LDB_SUCCESS) {
334                                                 return ret;
335                                         }
336                                 }
337                         }
338                 }
339                 talloc_free(msg2);
340
341                 for (i=0;i<msg->num_elements;i++) {
342                         struct ldb_message_element *el = &msg->elements[i];
343                         if (strcmp(el->name, LTDB_IDX) == 0) {
344                                 ret = ltdb_convert_to_idxptr(module, el);
345                                 if (ret != LDB_SUCCESS) {
346                                         return ret;
347                                 }
348                         }
349                 }
350
351                 if (ltdb_idxptr_add(module, msg) != 0) {
352                         return LDB_ERR_OPERATIONS_ERROR;
353                 }
354         }
355
356         ret = ltdb_store(module, msg, flgs);
357         return ret;
358 }
359
360
361 /*
362   find an element in a list, using the given comparison function and
363   assuming that the list is already sorted using comp_fn
364
365   return -1 if not found, or the index of the first occurance of needle if found
366 */
367 static int ldb_list_find(const void *needle,
368                          const void *base, size_t nmemb, size_t size,
369                          comparison_fn_t comp_fn)
370 {
371         const char *base_p = (const char *)base;
372         size_t min_i, max_i, test_i;
373
374         if (nmemb == 0) {
375                 return -1;
376         }
377
378         min_i = 0;
379         max_i = nmemb-1;
380
381         while (min_i < max_i) {
382                 int r;
383
384                 test_i = (min_i + max_i) / 2;
385                 /* the following cast looks strange, but is
386                  correct. The key to understanding it is that base_p
387                  is a pointer to an array of pointers, so we have to
388                  dereference it after casting to void **. The strange
389                  const in the middle gives us the right type of pointer
390                  after the dereference  (tridge) */
391                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
392                 if (r == 0) {
393                         /* scan back for first element */
394                         while (test_i > 0 &&
395                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
396                                 test_i--;
397                         }
398                         return test_i;
399                 }
400                 if (r < 0) {
401                         if (test_i == 0) {
402                                 return -1;
403                         }
404                         max_i = test_i - 1;
405                 }
406                 if (r > 0) {
407                         min_i = test_i + 1;
408                 }
409         }
410
411         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
412                 return min_i;
413         }
414
415         return -1;
416 }
417
418 struct dn_list {
419         unsigned int count;
420         char **dn;
421 };
422
423 /*
424   return the dn key to be used for an index
425   caller frees
426 */
427 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
428                                      const char *attr, const struct ldb_val *value)
429 {
430         struct ldb_dn *ret;
431         struct ldb_val v;
432         const struct ldb_schema_attribute *a;
433         char *attr_folded;
434         int r;
435
436         attr_folded = ldb_attr_casefold(ldb, attr);
437         if (!attr_folded) {
438                 return NULL;
439         }
440
441         a = ldb_schema_attribute_by_name(ldb, attr);
442         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
443         if (r != LDB_SUCCESS) {
444                 const char *errstr = ldb_errstring(ldb);
445                 /* canonicalisation can be refused. For example, 
446                    a attribute that takes wildcards will refuse to canonicalise
447                    if the value contains a wildcard */
448                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
449                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
450                 talloc_free(attr_folded);
451                 return NULL;
452         }
453         if (ldb_should_b64_encode(&v)) {
454                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
455                 if (!vstr) return NULL;
456                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
457                 talloc_free(vstr);
458         } else {
459                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
460         }
461
462         if (v.data != value->data) {
463                 talloc_free(v.data);
464         }
465         talloc_free(attr_folded);
466
467         return ret;
468 }
469
470 /*
471   see if a attribute value is in the list of indexed attributes
472 */
473 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
474                             unsigned int *v_idx, const char *key)
475 {
476         unsigned int i, j;
477         for (i=0;i<msg->num_elements;i++) {
478                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
479                         const struct ldb_message_element *el = &msg->elements[i];
480
481                         if (attr == NULL) {
482                                 /* in this case we are just looking to see if key is present,
483                                    we are not spearching for a specific index */
484                                 return 0;
485                         }
486
487                         for (j=0;j<el->num_values;j++) {
488                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
489                                         if (v_idx) {
490                                                 *v_idx = j;
491                                         }
492                                         return i;
493                                 }
494                         }
495                 }
496         }
497         return -1;
498 }
499
500 /* used in sorting dn lists */
501 static int list_cmp(const char **s1, const char **s2)
502 {
503         return strcmp(*s1, *s2);
504 }
505
506 /*
507   return a list of dn's that might match a simple indexed search or
508  */
509 static int ltdb_index_dn_simple(struct ldb_module *module,
510                                 const struct ldb_parse_tree *tree,
511                                 const struct ldb_message *index_list,
512                                 struct dn_list *list)
513 {
514         struct ldb_context *ldb;
515         struct ldb_dn *dn;
516         int ret;
517         unsigned int i, j;
518         struct ldb_message *msg;
519
520         ldb = ldb_module_get_ctx(module);
521
522         list->count = 0;
523         list->dn = NULL;
524
525         /* if the attribute isn't in the list of indexed attributes then
526            this node needs a full search */
527         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
528                 return LDB_ERR_OPERATIONS_ERROR;
529         }
530
531         /* the attribute is indexed. Pull the list of DNs that match the 
532            search criterion */
533         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
534         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
535
536         msg = talloc(list, struct ldb_message);
537         if (msg == NULL) {
538                 return LDB_ERR_OPERATIONS_ERROR;
539         }
540
541         ret = ltdb_search_dn1_index(module, dn, msg);
542         talloc_free(dn);
543         if (ret != LDB_SUCCESS) {
544                 return ret;
545         }
546
547         for (i=0;i<msg->num_elements;i++) {
548                 struct ldb_message_element *el;
549
550                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
551                         continue;
552                 }
553
554                 el = &msg->elements[i];
555
556                 list->dn = talloc_array(list, char *, el->num_values);
557                 if (!list->dn) {
558                         talloc_free(msg);
559                         return LDB_ERR_OPERATIONS_ERROR;
560                 }
561
562                 for (j=0;j<el->num_values;j++) {
563                         list->dn[list->count] =
564                                 talloc_strdup(list->dn, (char *)el->values[j].data);
565                         if (!list->dn[list->count]) {
566                                 talloc_free(msg);
567                                 return LDB_ERR_OPERATIONS_ERROR;
568                         }
569                         list->count++;
570                 }
571         }
572
573         talloc_free(msg);
574
575         if (list->count > 1) {
576                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
577         }
578
579         return LDB_SUCCESS;
580 }
581
582
583 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
584
585 /*
586   return a list of dn's that might match a leaf indexed search
587  */
588 static int ltdb_index_dn_leaf(struct ldb_module *module,
589                               const struct ldb_parse_tree *tree,
590                               const struct ldb_message *index_list,
591                               struct dn_list *list)
592 {
593         struct ldb_context *ldb;
594         ldb = ldb_module_get_ctx(module);
595
596         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
597                 list->dn = talloc_array(list, char *, 1);
598                 if (list->dn == NULL) {
599                         ldb_oom(ldb);
600                         return LDB_ERR_OPERATIONS_ERROR;
601                 }
602                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
603                 if (list->dn[0] == NULL) {
604                         ldb_oom(ldb);
605                         return LDB_ERR_OPERATIONS_ERROR;
606                 }
607                 list->count = 1;
608                 return LDB_SUCCESS;
609         }
610         return ltdb_index_dn_simple(module, tree, index_list, list);
611 }
612
613
614 /*
615   list intersection
616   list = list & list2
617   relies on the lists being sorted
618 */
619 static int list_intersect(struct ldb_context *ldb,
620                           struct dn_list *list, const struct dn_list *list2)
621 {
622         struct dn_list *list3;
623         unsigned int i;
624
625         if (list->count == 0 || list2->count == 0) {
626                 /* 0 & X == 0 */
627                 return LDB_ERR_NO_SUCH_OBJECT;
628         }
629
630         list3 = talloc(ldb, struct dn_list);
631         if (list3 == NULL) {
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634
635         list3->dn = talloc_array(list3, char *, list->count);
636         if (!list3->dn) {
637                 talloc_free(list3);
638                 return LDB_ERR_OPERATIONS_ERROR;
639         }
640         list3->count = 0;
641
642         for (i=0;i<list->count;i++) {
643                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
644                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
645                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
646                         list3->count++;
647                 } else {
648                         talloc_free(list->dn[i]);
649                 }
650         }
651
652         talloc_free(list->dn);
653         list->dn = talloc_move(list, &list3->dn);
654         list->count = list3->count;
655         talloc_free(list3);
656
657         return LDB_ERR_NO_SUCH_OBJECT;
658 }
659
660
661 /*
662   list union
663   list = list | list2
664   relies on the lists being sorted
665 */
666 static int list_union(struct ldb_context *ldb,
667                       struct dn_list *list, const struct dn_list *list2)
668 {
669         unsigned int i;
670         char **d;
671         unsigned int count = list->count;
672
673         if (list->count == 0 && list2->count == 0) {
674                 /* 0 | 0 == 0 */
675                 return LDB_ERR_NO_SUCH_OBJECT;
676         }
677
678         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
679         if (!d) {
680                 return LDB_ERR_OPERATIONS_ERROR;
681         }
682         list->dn = d;
683
684         for (i=0;i<list2->count;i++) {
685                 if (ldb_list_find(list2->dn[i], list->dn, count,
686                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
687                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
688                         if (!list->dn[list->count]) {
689                                 return LDB_ERR_OPERATIONS_ERROR;
690                         }
691                         list->count++;
692                 }
693         }
694
695         if (list->count != count) {
696                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
697         }
698
699         return LDB_ERR_NO_SUCH_OBJECT;
700 }
701
702 static int ltdb_index_dn(struct ldb_module *module,
703                          const struct ldb_parse_tree *tree,
704                          const struct ldb_message *index_list,
705                          struct dn_list *list);
706
707
708 /*
709   OR two index results
710  */
711 static int ltdb_index_dn_or(struct ldb_module *module,
712                             const struct ldb_parse_tree *tree,
713                             const struct ldb_message *index_list,
714                             struct dn_list *list)
715 {
716         struct ldb_context *ldb;
717         unsigned int i;
718         int ret;
719
720         ldb = ldb_module_get_ctx(module);
721
722         ret = LDB_ERR_OPERATIONS_ERROR;
723         list->dn = NULL;
724         list->count = 0;
725
726         for (i=0;i<tree->u.list.num_elements;i++) {
727                 struct dn_list *list2;
728                 int v;
729
730                 list2 = talloc(module, struct dn_list);
731                 if (list2 == NULL) {
732                         return LDB_ERR_OPERATIONS_ERROR;
733                 }
734
735                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
736
737                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
738                         /* 0 || X == X */
739                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
740                                 ret = v;
741                         }
742                         talloc_free(list2);
743                         continue;
744                 }
745
746                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
747                         /* 1 || X == 1 */
748                         talloc_free(list->dn);
749                         talloc_free(list2);
750                         return v;
751                 }
752
753                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
754                         ret = LDB_SUCCESS;
755                         list->dn = talloc_move(list, &list2->dn);
756                         list->count = list2->count;
757                 } else {
758                         if (list_union(ldb, list, list2) == -1) {
759                                 talloc_free(list2);
760                                 return LDB_ERR_OPERATIONS_ERROR;
761                         }
762                         ret = LDB_SUCCESS;
763                 }
764                 talloc_free(list2);
765         }
766
767         if (list->count == 0) {
768                 return LDB_ERR_NO_SUCH_OBJECT;
769         }
770
771         return ret;
772 }
773
774
775 /*
776   NOT an index results
777  */
778 static int ltdb_index_dn_not(struct ldb_module *module,
779                              const struct ldb_parse_tree *tree,
780                              const struct ldb_message *index_list,
781                              struct dn_list *list)
782 {
783         /* the only way to do an indexed not would be if we could
784            negate the not via another not or if we knew the total
785            number of database elements so we could know that the
786            existing expression covered the whole database.
787
788            instead, we just give up, and rely on a full index scan
789            (unless an outer & manages to reduce the list)
790         */
791         return LDB_ERR_OPERATIONS_ERROR;
792 }
793
794 /*
795   AND two index results
796  */
797 static int ltdb_index_dn_and(struct ldb_module *module,
798                              const struct ldb_parse_tree *tree,
799                              const struct ldb_message *index_list,
800                              struct dn_list *list)
801 {
802         struct ldb_context *ldb;
803         unsigned int i;
804         int ret;
805
806         ldb = ldb_module_get_ctx(module);
807
808         ret = LDB_ERR_OPERATIONS_ERROR;
809         list->dn = NULL;
810         list->count = 0;
811
812         for (i=0;i<tree->u.list.num_elements;i++) {
813                 struct dn_list *list2;
814                 int v;
815
816                 list2 = talloc(module, struct dn_list);
817                 if (list2 == NULL) {
818                         return LDB_ERR_OPERATIONS_ERROR;
819                 }
820
821                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
822
823                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
824                         /* 0 && X == 0 */
825                         talloc_free(list->dn);
826                         talloc_free(list2);
827                         return LDB_ERR_NO_SUCH_OBJECT;
828                 }
829
830                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
831                         talloc_free(list2);
832                         continue;
833                 }
834
835                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
836                         ret = LDB_SUCCESS;
837                         talloc_free(list->dn);
838                         list->dn = talloc_move(list, &list2->dn);
839                         list->count = list2->count;
840                 } else {
841                         if (list_intersect(ldb, list, list2) == -1) {
842                                 talloc_free(list2);
843                                 return LDB_ERR_OPERATIONS_ERROR;
844                         }
845                 }
846
847                 talloc_free(list2);
848
849                 if (list->count == 0) {
850                         talloc_free(list->dn);
851                         return LDB_ERR_NO_SUCH_OBJECT;
852                 }
853         }
854
855         return ret;
856 }
857
858 /*
859   AND index results and ONE level special index
860  */
861 static int ltdb_index_dn_one(struct ldb_module *module,
862                              struct ldb_dn *parent_dn,
863                              struct dn_list *list)
864 {
865         struct ldb_context *ldb;
866         struct dn_list *list2;
867         struct ldb_message *msg;
868         struct ldb_dn *key;
869         struct ldb_val val;
870         unsigned int i, j;
871         int ret;
872
873         ldb = ldb_module_get_ctx(module);
874
875         list2 = talloc_zero(module, struct dn_list);
876         if (list2 == NULL) {
877                 return LDB_ERR_OPERATIONS_ERROR;
878         }
879
880         /* the attribute is indexed. Pull the list of DNs that match the
881            search criterion */
882         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
883         val.length = strlen((char *)val.data);
884         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
885         if (!key) {
886                 talloc_free(list2);
887                 return LDB_ERR_OPERATIONS_ERROR;
888         }
889
890         msg = talloc(list2, struct ldb_message);
891         if (msg == NULL) {
892                 talloc_free(list2);
893                 return LDB_ERR_OPERATIONS_ERROR;
894         }
895
896         ret = ltdb_search_dn1_index(module, key, msg);
897         talloc_free(key);
898         if (ret != LDB_SUCCESS) {
899                 return ret;
900         }
901
902         for (i = 0; i < msg->num_elements; i++) {
903                 struct ldb_message_element *el;
904
905                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
906                         continue;
907                 }
908
909                 el = &msg->elements[i];
910
911                 list2->dn = talloc_array(list2, char *, el->num_values);
912                 if (!list2->dn) {
913                         talloc_free(list2);
914                         return LDB_ERR_OPERATIONS_ERROR;
915                 }
916
917                 for (j = 0; j < el->num_values; j++) {
918                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
919                         if (!list2->dn[list2->count]) {
920                                 talloc_free(list2);
921                                 return LDB_ERR_OPERATIONS_ERROR;
922                         }
923                         list2->count++;
924                 }
925         }
926
927         if (list2->count == 0) {
928                 talloc_free(list2);
929                 return LDB_ERR_NO_SUCH_OBJECT;
930         }
931
932         if (list2->count > 1) {
933                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
934         }
935
936         if (list->count > 0) {
937                 if (list_intersect(ldb, list, list2) == -1) {
938                         talloc_free(list2);
939                         return LDB_ERR_OPERATIONS_ERROR;
940                 }
941
942                 if (list->count == 0) {
943                         talloc_free(list->dn);
944                         talloc_free(list2);
945                         return LDB_ERR_NO_SUCH_OBJECT;
946                 }
947         } else {
948                 list->dn = talloc_move(list, &list2->dn);
949                 list->count = list2->count;
950         }
951
952         talloc_free(list2);
953
954         return LDB_SUCCESS;
955 }
956
957 /*
958   return a list of dn's that might match a indexed search or
959   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
960  */
961 static int ltdb_index_dn(struct ldb_module *module,
962                          const struct ldb_parse_tree *tree,
963                          const struct ldb_message *index_list,
964                          struct dn_list *list)
965 {
966         int ret = LDB_ERR_OPERATIONS_ERROR;
967
968         switch (tree->operation) {
969         case LDB_OP_AND:
970                 ret = ltdb_index_dn_and(module, tree, index_list, list);
971                 break;
972
973         case LDB_OP_OR:
974                 ret = ltdb_index_dn_or(module, tree, index_list, list);
975                 break;
976
977         case LDB_OP_NOT:
978                 ret = ltdb_index_dn_not(module, tree, index_list, list);
979                 break;
980
981         case LDB_OP_EQUALITY:
982                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
983                 break;
984
985         case LDB_OP_SUBSTRING:
986         case LDB_OP_GREATER:
987         case LDB_OP_LESS:
988         case LDB_OP_PRESENT:
989         case LDB_OP_APPROX:
990         case LDB_OP_EXTENDED:
991                 /* we can't index with fancy bitops yet */
992                 ret = LDB_ERR_OPERATIONS_ERROR;
993                 break;
994         }
995
996         return ret;
997 }
998
999 /*
1000   filter a candidate dn_list from an indexed search into a set of results
1001   extracting just the given attributes
1002 */
1003 static int ltdb_index_filter(const struct dn_list *dn_list,
1004                              struct ltdb_context *ac)
1005 {
1006         struct ldb_context *ldb;
1007         struct ldb_message *msg;
1008         unsigned int i;
1009
1010         ldb = ldb_module_get_ctx(ac->module);
1011
1012         for (i = 0; i < dn_list->count; i++) {
1013                 struct ldb_dn *dn;
1014                 int ret;
1015
1016                 msg = ldb_msg_new(ac);
1017                 if (!msg) {
1018                         return LDB_ERR_OPERATIONS_ERROR;
1019                 }
1020
1021                 dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
1022                 if (dn == NULL) {
1023                         talloc_free(msg);
1024                         return LDB_ERR_OPERATIONS_ERROR;
1025                 }
1026
1027                 ret = ltdb_search_dn1(ac->module, dn, msg);
1028                 talloc_free(dn);
1029                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1030                         /* the record has disappeared? yes, this can happen */
1031                         talloc_free(msg);
1032                         continue;
1033                 }
1034
1035                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1036                         /* an internal error */
1037                         talloc_free(msg);
1038                         return LDB_ERR_OPERATIONS_ERROR;
1039                 }
1040
1041                 if (!ldb_match_msg(ldb, msg,
1042                                    ac->tree, ac->base, ac->scope)) {
1043                         talloc_free(msg);
1044                         continue;
1045                 }
1046
1047                 /* filter the attributes that the user wants */
1048                 ret = ltdb_filter_attrs(msg, ac->attrs);
1049
1050                 if (ret == -1) {
1051                         talloc_free(msg);
1052                         return LDB_ERR_OPERATIONS_ERROR;
1053                 }
1054
1055                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1056                 if (ret != LDB_SUCCESS) {
1057                         ac->callback_failed = true;
1058                         return ret;
1059                 }
1060         }
1061
1062         return LDB_SUCCESS;
1063 }
1064
1065 /*
1066   search the database with a LDAP-like expression using indexes
1067   returns -1 if an indexed search is not possible, in which
1068   case the caller should call ltdb_search_full()
1069 */
1070 int ltdb_search_indexed(struct ltdb_context *ac)
1071 {
1072         struct ldb_context *ldb;
1073         void *data = ldb_module_get_private(ac->module);
1074         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1075         struct dn_list *dn_list;
1076         int ret, idxattr, idxone;
1077
1078         ldb = ldb_module_get_ctx(ac->module);
1079
1080         idxattr = idxone = 0;
1081         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1082         if (ret == 0 ) {
1083                 idxattr = 1;
1084         }
1085
1086         /* We do one level indexing only if requested */
1087         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1088         if (ret == 0 ) {
1089                 idxone = 1;
1090         }
1091
1092         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1093             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1094                 /* no indexes? must do full search */
1095                 return LDB_ERR_OPERATIONS_ERROR;
1096         }
1097
1098         ret = LDB_ERR_OPERATIONS_ERROR;
1099
1100         dn_list = talloc_zero(ac, struct dn_list);
1101         if (dn_list == NULL) {
1102                 return LDB_ERR_OPERATIONS_ERROR;
1103         }
1104
1105         if (ac->scope == LDB_SCOPE_BASE) {
1106                 /* with BASE searches only one DN can match */
1107                 dn_list->dn = talloc_array(dn_list, char *, 1);
1108                 if (dn_list->dn == NULL) {
1109                         ldb_oom(ldb);
1110                         return LDB_ERR_OPERATIONS_ERROR;
1111                 }
1112                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1113                 if (dn_list->dn[0] == NULL) {
1114                         ldb_oom(ldb);
1115                         return LDB_ERR_OPERATIONS_ERROR;
1116                 }
1117                 dn_list->count = 1;
1118                 ret = LDB_SUCCESS;
1119         }
1120
1121         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1122                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1123
1124                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1125                         talloc_free(dn_list);
1126                         return ret;
1127                 }
1128         }
1129
1130         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1131                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1132         }
1133
1134         if (ret == LDB_SUCCESS) {
1135                 /* we've got a candidate list - now filter by the full tree
1136                    and extract the needed attributes */
1137                 ret = ltdb_index_filter(dn_list, ac);
1138         }
1139
1140         talloc_free(dn_list);
1141
1142         return ret;
1143 }
1144
1145 /*
1146   add a index element where this is the first indexed DN for this value
1147 */
1148 static int ltdb_index_add1_new(struct ldb_context *ldb,
1149                                struct ldb_message *msg,
1150                                const char *dn)
1151 {
1152         struct ldb_message_element *el;
1153
1154         /* add another entry */
1155         el = talloc_realloc(msg, msg->elements,
1156                                struct ldb_message_element, msg->num_elements+1);
1157         if (!el) {
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160
1161         msg->elements = el;
1162         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1163         if (!msg->elements[msg->num_elements].name) {
1164                 return LDB_ERR_OPERATIONS_ERROR;
1165         }
1166         msg->elements[msg->num_elements].num_values = 0;
1167         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1168         if (!msg->elements[msg->num_elements].values) {
1169                 return LDB_ERR_OPERATIONS_ERROR;
1170         }
1171         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1172         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1173         msg->elements[msg->num_elements].num_values = 1;
1174         msg->num_elements++;
1175
1176         return LDB_SUCCESS;
1177 }
1178
1179
1180 /*
1181   add a index element where this is not the first indexed DN for this
1182   value
1183 */
1184 static int ltdb_index_add1_add(struct ldb_context *ldb,
1185                                struct ldb_message *msg,
1186                                int idx,
1187                                const char *dn)
1188 {
1189         struct ldb_val *v2;
1190         unsigned int i;
1191
1192         /* for multi-valued attributes we can end up with repeats */
1193         for (i=0;i<msg->elements[idx].num_values;i++) {
1194                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1195                         return LDB_SUCCESS;
1196                 }
1197         }
1198
1199         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1200                               struct ldb_val,
1201                               msg->elements[idx].num_values+1);
1202         if (!v2) {
1203                 return LDB_ERR_OPERATIONS_ERROR;
1204         }
1205         msg->elements[idx].values = v2;
1206
1207         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1208         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1209         msg->elements[idx].num_values++;
1210
1211         return LDB_SUCCESS;
1212 }
1213
1214 /*
1215   add an index entry for one message element
1216 */
1217 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1218                            struct ldb_message_element *el, int v_idx)
1219 {
1220         struct ldb_context *ldb;
1221         struct ldb_message *msg;
1222         struct ldb_dn *dn_key;
1223         int ret;
1224         unsigned int i;
1225
1226         ldb = ldb_module_get_ctx(module);
1227
1228         msg = talloc(module, struct ldb_message);
1229         if (msg == NULL) {
1230                 errno = ENOMEM;
1231                 return LDB_ERR_OPERATIONS_ERROR;
1232         }
1233
1234         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1235         if (!dn_key) {
1236                 talloc_free(msg);
1237                 return LDB_ERR_OPERATIONS_ERROR;
1238         }
1239         talloc_steal(msg, dn_key);
1240
1241         ret = ltdb_search_dn1_index(module, dn_key, msg);
1242         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1243                 talloc_free(msg);
1244                 return ret;
1245         }
1246
1247         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1248                 msg->dn = dn_key;
1249                 msg->num_elements = 0;
1250                 msg->elements = NULL;
1251         }
1252
1253         for (i=0;i<msg->num_elements;i++) {
1254                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1255                         break;
1256                 }
1257         }
1258
1259         if (i == msg->num_elements) {
1260                 ret = ltdb_index_add1_new(ldb, msg, dn);
1261         } else {
1262                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1263         }
1264
1265         if (ret == LDB_SUCCESS) {
1266                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1267         }
1268
1269         talloc_free(msg);
1270
1271         return ret;
1272 }
1273
1274 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1275                            struct ldb_message_element *elements, int num_el)
1276 {
1277         void *data = ldb_module_get_private(module);
1278         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1279         int ret;
1280         unsigned int i, j;
1281
1282         if (dn[0] == '@') {
1283                 return LDB_SUCCESS;
1284         }
1285
1286         if (ltdb->cache->indexlist->num_elements == 0) {
1287                 /* no indexed fields */
1288                 return LDB_SUCCESS;
1289         }
1290
1291         for (i = 0; i < num_el; i++) {
1292                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1293                                        NULL, LTDB_IDXATTR);
1294                 if (ret == -1) {
1295                         continue;
1296                 }
1297                 for (j = 0; j < elements[i].num_values; j++) {
1298                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1299                         if (ret != LDB_SUCCESS) {
1300                                 return ret;
1301                         }
1302                 }
1303         }
1304
1305         return LDB_SUCCESS;
1306 }
1307
1308 /*
1309   add the index entries for a new record
1310 */
1311 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1312 {
1313         const char *dn;
1314         int ret;
1315
1316         dn = ldb_dn_get_linearized(msg->dn);
1317         if (dn == NULL) {
1318                 return LDB_ERR_OPERATIONS_ERROR;
1319         }
1320
1321         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1322
1323         return ret;
1324 }
1325
1326
1327 /*
1328   delete an index entry for one message element
1329 */
1330 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1331                          struct ldb_message_element *el, int v_idx)
1332 {
1333         struct ldb_context *ldb;
1334         struct ldb_message *msg;
1335         struct ldb_dn *dn_key;
1336         int ret, i;
1337         unsigned int j;
1338
1339         ldb = ldb_module_get_ctx(module);
1340
1341         if (dn[0] == '@') {
1342                 return LDB_SUCCESS;
1343         }
1344
1345         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1346         if (!dn_key) {
1347                 return LDB_ERR_OPERATIONS_ERROR;
1348         }
1349
1350         msg = talloc(dn_key, struct ldb_message);
1351         if (msg == NULL) {
1352                 talloc_free(dn_key);
1353                 return LDB_ERR_OPERATIONS_ERROR;
1354         }
1355
1356         ret = ltdb_search_dn1_index(module, dn_key, msg);
1357         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1358                 talloc_free(dn_key);
1359                 return ret;
1360         }
1361
1362         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1363                 /* it wasn't indexed. Did we have an earlier error? If we did then
1364                    its gone now */
1365                 talloc_free(dn_key);
1366                 return LDB_SUCCESS;
1367         }
1368
1369         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1370         if (i == -1) {
1371                 struct ldb_ldif ldif;
1372
1373                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1374                                 "ERROR: dn %s not found in %s\n", dn,
1375                                 ldb_dn_get_linearized(dn_key));
1376                 ldif.changetype = LDB_CHANGETYPE_NONE;
1377                 ldif.msg = msg;
1378                 ldb_ldif_write_file(ldb, stdout, &ldif);
1379                 sleep(100);
1380                 /* it ain't there. hmmm */
1381                 talloc_free(dn_key);
1382                 return LDB_SUCCESS;
1383         }
1384
1385         if (j != msg->elements[i].num_values - 1) {
1386                 memmove(&msg->elements[i].values[j],
1387                         &msg->elements[i].values[j+1],
1388                         (msg->elements[i].num_values-(j+1)) *
1389                         sizeof(msg->elements[i].values[0]));
1390         }
1391         msg->elements[i].num_values--;
1392
1393         if (msg->elements[i].num_values == 0) {
1394                 ret = ltdb_delete_noindex(module, dn_key);
1395         } else {
1396                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1397         }
1398
1399         talloc_free(dn_key);
1400
1401         return ret;
1402 }
1403
1404 /*
1405   delete the index entries for a record
1406   return -1 on failure
1407 */
1408 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1409 {
1410         void *data = ldb_module_get_private(module);
1411         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1412         int ret;
1413         const char *dn;
1414         unsigned int i, j;
1415
1416         /* find the list of indexed fields */
1417         if (ltdb->cache->indexlist->num_elements == 0) {
1418                 /* no indexed fields */
1419                 return LDB_SUCCESS;
1420         }
1421
1422         if (ldb_dn_is_special(msg->dn)) {
1423                 return LDB_SUCCESS;
1424         }
1425
1426         dn = ldb_dn_get_linearized(msg->dn);
1427         if (dn == NULL) {
1428                 return LDB_ERR_OPERATIONS_ERROR;
1429         }
1430
1431         for (i = 0; i < msg->num_elements; i++) {
1432                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1433                                        NULL, LTDB_IDXATTR);
1434                 if (ret == -1) {
1435                         continue;
1436                 }
1437                 for (j = 0; j < msg->elements[i].num_values; j++) {
1438                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1439                         if (ret != LDB_SUCCESS) {
1440                                 return ret;
1441                         }
1442                 }
1443         }
1444
1445         return LDB_SUCCESS;
1446 }
1447
1448 /*
1449   handle special index for one level searches
1450 */
1451 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1452 {
1453         void *data = ldb_module_get_private(module);
1454         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1455         struct ldb_message_element el;
1456         struct ldb_val val;
1457         struct ldb_dn *pdn;
1458         const char *dn;
1459         int ret;
1460
1461         /* We index for ONE Level only if requested */
1462         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1463         if (ret != 0) {
1464                 return LDB_SUCCESS;
1465         }
1466
1467         pdn = ldb_dn_get_parent(module, msg->dn);
1468         if (pdn == NULL) {
1469                 return LDB_ERR_OPERATIONS_ERROR;
1470         }
1471
1472         dn = ldb_dn_get_linearized(msg->dn);
1473         if (dn == NULL) {
1474                 talloc_free(pdn);
1475                 return LDB_ERR_OPERATIONS_ERROR;
1476         }
1477
1478         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1479         if (val.data == NULL) {
1480                 talloc_free(pdn);
1481                 return LDB_ERR_OPERATIONS_ERROR;
1482         }
1483
1484         val.length = strlen((char *)val.data);
1485         el.name = LTDB_IDXONE;
1486         el.values = &val;
1487         el.num_values = 1;
1488
1489         if (add) {
1490                 ret = ltdb_index_add1(module, dn, &el, 0);
1491         } else { /* delete */
1492                 ret = ltdb_index_del_value(module, dn, &el, 0);
1493         }
1494
1495         talloc_free(pdn);
1496
1497         return ret;
1498 }
1499
1500
1501 /*
1502   traversal function that deletes all @INDEX records
1503 */
1504 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1505 {
1506         const char *dn = "DN=" LTDB_INDEX ":";
1507         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1508                 return tdb_delete(tdb, key);
1509         }
1510         return 0;
1511 }
1512
1513 /*
1514   traversal function that adds @INDEX records during a re index
1515 */
1516 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1517 {
1518         struct ldb_context *ldb;
1519         struct ldb_module *module = (struct ldb_module *)state;
1520         struct ldb_message *msg;
1521         const char *dn = NULL;
1522         int ret;
1523         TDB_DATA key2;
1524
1525         ldb = ldb_module_get_ctx(module);
1526
1527         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1528             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1529                 return 0;
1530         }
1531
1532         msg = talloc(module, struct ldb_message);
1533         if (msg == NULL) {
1534                 return -1;
1535         }
1536
1537         ret = ltdb_unpack_data(module, &data, msg);
1538         if (ret != 0) {
1539                 talloc_free(msg);
1540                 return -1;
1541         }
1542
1543         /* check if the DN key has changed, perhaps due to the
1544            case insensitivity of an element changing */
1545         key2 = ltdb_key(module, msg->dn);
1546         if (key2.dptr == NULL) {
1547                 /* probably a corrupt record ... darn */
1548                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1549                                                         ldb_dn_get_linearized(msg->dn));
1550                 talloc_free(msg);
1551                 return 0;
1552         }
1553         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1554                 tdb_delete(tdb, key);
1555                 tdb_store(tdb, key2, data, 0);
1556         }
1557         talloc_free(key2.dptr);
1558
1559         if (msg->dn == NULL) {
1560                 dn = (char *)key.dptr + 3;
1561         } else {
1562                 dn = ldb_dn_get_linearized(msg->dn);
1563         }
1564
1565         ret = ltdb_index_one(module, msg, 1);
1566         if (ret == LDB_SUCCESS) {
1567                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1568         } else {
1569                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1570                         "Adding special ONE LEVEL index failed (%s)!\n",
1571                         ldb_dn_get_linearized(msg->dn));
1572         }
1573
1574         talloc_free(msg);
1575
1576         if (ret != LDB_SUCCESS) return -1;
1577
1578         return 0;
1579 }
1580
1581 /*
1582   force a complete reindex of the database
1583 */
1584 int ltdb_reindex(struct ldb_module *module)
1585 {
1586         void *data = ldb_module_get_private(module);
1587         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1588         int ret;
1589
1590         if (ltdb_cache_reload(module) != 0) {
1591                 return LDB_ERR_OPERATIONS_ERROR;
1592         }
1593
1594         /* first traverse the database deleting any @INDEX records */
1595         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1596         if (ret == -1) {
1597                 return LDB_ERR_OPERATIONS_ERROR;
1598         }
1599
1600         /* if we don't have indexes we have nothing todo */
1601         if (ltdb->cache->indexlist->num_elements == 0) {
1602                 return LDB_SUCCESS;
1603         }
1604
1605         /* now traverse adding any indexes for normal LDB records */
1606         ret = tdb_traverse(ltdb->tdb, re_index, module);
1607         if (ret == -1) {
1608                 return LDB_ERR_OPERATIONS_ERROR;
1609         }
1610
1611         if (ltdb->idxptr) {
1612                 ltdb->idxptr->repack = true;
1613         }
1614
1615         return LDB_SUCCESS;
1616 }