Merge branch 'master' of git://git.samba.org/samba into minschema
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "dlinklist.h"
36
37 /*
38   the idxptr code is a bit unusual. The way it works is to replace
39   @IDX elements in records during a transaction with @IDXPTR
40   elements. The @IDXPTR elements don't contain the actual index entry
41   values, but contain a pointer to a linked list of values.
42
43   This means we are storing pointers in a database, which is normally
44   not allowed, but in this case we are storing them only for the
45   duration of a transaction, and re-writing them into the normal @IDX
46   format at the end of the transaction. That means no other processes
47   are ever exposed to the @IDXPTR values.
48
49   The advantage is that the linked list doesn't cause huge
50   fragmentation during a transaction. Without the @IDXPTR method we
51   often ended up with a ldb that was between 10x and 100x larger then
52   it needs to be due to massive fragmentation caused by re-writing
53   @INDEX records many times during indexing.
54  */
55 struct ldb_index_pointer {
56         struct ldb_index_pointer *next, *prev;
57         struct ldb_val value;
58 };
59
60 struct ltdb_idxptr {
61         int num_dns;
62         const char **dn_list;
63         bool repack;
64 };
65
66 /*
67   add to the list of DNs that need to be fixed on transaction end
68  */
69 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
70 {
71         void *data = ldb_module_get_private(module);
72         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
73         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
74                                                const char *, ltdb->idxptr->num_dns+1);
75         if (ltdb->idxptr->dn_list == NULL) {
76                 ltdb->idxptr->num_dns = 0;
77                 return LDB_ERR_OPERATIONS_ERROR;
78         }
79         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
80                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
81         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
82                 return LDB_ERR_OPERATIONS_ERROR;
83         }
84         ltdb->idxptr->num_dns++;
85         return LDB_SUCCESS;
86 }
87
88 /* free an idxptr record */
89 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
90 {
91         struct ldb_val val;
92         struct ldb_index_pointer *ptr;
93
94         if (el->num_values != 1) {
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97
98         val = el->values[0];
99         if (val.length != sizeof(void *)) {
100                 return LDB_ERR_OPERATIONS_ERROR;
101         }
102
103         ptr = *(struct ldb_index_pointer **)val.data;
104         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
105                 return LDB_ERR_OPERATIONS_ERROR;
106         }
107
108         while (ptr) {
109                 struct ldb_index_pointer *tmp = ptr;
110                 DLIST_REMOVE(ptr, ptr);
111                 talloc_free(tmp);
112         }
113
114         return LDB_SUCCESS;
115 }
116
117
118 /* convert from the IDXPTR format to a ldb_message_element format */
119 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
120 {
121         struct ldb_val val;
122         struct ldb_index_pointer *ptr, *tmp;
123         int i;
124         struct ldb_val *val2;
125
126         if (el->num_values != 1) {
127                 return LDB_ERR_OPERATIONS_ERROR;
128         }
129
130         val = el->values[0];
131         if (val.length != sizeof(void *)) {
132                 return LDB_ERR_OPERATIONS_ERROR;
133         }
134
135         ptr = *(struct ldb_index_pointer **)val.data;
136         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         /* count the length of the list */
141         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
142                 i++;
143         }
144
145         /* allocate the new values array */
146         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
147         if (val2 == NULL) {
148                 return LDB_ERR_OPERATIONS_ERROR;
149         }
150         el->values = val2;
151         el->num_values = i;
152
153         /* populate the values array */
154         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
155                 el->values[i].length = tmp->value.length;
156                 /* we need to over-allocate here as there are still some places
157                    in ldb that rely on null termination. */
158                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
159                 if (el->values[i].data == NULL) {
160                         return LDB_ERR_OPERATIONS_ERROR;
161                 }
162                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
163                 el->values[i].data[tmp->value.length] = 0;
164         }
165
166         /* update the name */
167         el->name = LTDB_IDX;
168
169         return LDB_SUCCESS;
170 }
171
172
173 /* convert to the IDXPTR format from a ldb_message_element format */
174 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
175 {
176         struct ldb_index_pointer *ptr, *tmp;
177         int i;
178         struct ldb_val *val2;
179         void *data = ldb_module_get_private(module);
180         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
181
182         ptr = NULL;
183
184         for (i=0;i<el->num_values;i++) {
185                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
186                 if (tmp == NULL) {
187                         return LDB_ERR_OPERATIONS_ERROR;
188                 }
189                 tmp->value = el->values[i];
190                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
191                 if (tmp->value.data == NULL) {
192                         return LDB_ERR_OPERATIONS_ERROR;
193                 }
194                 DLIST_ADD(ptr, tmp);
195         }
196
197         /* allocate the new values array */
198         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
199         if (val2 == NULL) {
200                 return LDB_ERR_OPERATIONS_ERROR;
201         }
202         el->values = val2;
203         el->num_values = 1;
204
205         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
206         el->values[0].length = sizeof(ptr);
207
208         /* update the name */
209         el->name = LTDB_IDXPTR;
210
211         return LDB_SUCCESS;
212 }
213
214
215 /* enable the idxptr mode when transactions start */
216 int ltdb_index_transaction_start(struct ldb_module *module)
217 {
218         void *data = ldb_module_get_private(module);
219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
220         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
221         return LDB_SUCCESS;
222 }
223
224 /*
225   a wrapper around ltdb_search_dn1() which translates pointer based index records
226   and maps them into normal ldb message structures
227  */
228 static int ltdb_search_dn1_index(struct ldb_module *module,
229                                 struct ldb_dn *dn, struct ldb_message *msg)
230 {
231         int ret, i;
232         ret = ltdb_search_dn1(module, dn, msg);
233         if (ret != LDB_SUCCESS) {
234                 return ret;
235         }
236
237         /* if this isn't a @INDEX record then don't munge it */
238         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         for (i=0;i<msg->num_elements;i++) {
243                 struct ldb_message_element *el = &msg->elements[i];
244                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
245                         ret = ltdb_convert_from_idxptr(module, el);
246                         if (ret != LDB_SUCCESS) {
247                                 return ret;
248                         }
249                 }
250         }
251
252         return ret;
253 }
254
255
256
257 /*
258   fixup the idxptr for one DN
259  */
260 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
261 {
262         struct ldb_context *ldb;
263         struct ldb_dn *dn;
264         struct ldb_message *msg = ldb_msg_new(module);
265         int ret;
266
267         ldb = ldb_module_get_ctx(module);
268
269         dn = ldb_dn_new(msg, ldb, strdn);
270         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
271                 ret = ltdb_store(module, msg, TDB_REPLACE);
272         }
273         talloc_free(msg);
274         return ret;
275 }
276
277 /* cleanup the idxptr mode when transaction commits */
278 int ltdb_index_transaction_commit(struct ldb_module *module)
279 {
280         int i;
281         void *data = ldb_module_get_private(module);
282         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
283
284         /* fix all the DNs that we have modified */
285         if (ltdb->idxptr) {
286                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
287                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
288                 }
289
290                 if (ltdb->idxptr->repack) {
291                         tdb_repack(ltdb->tdb);
292                 }
293         }
294
295         talloc_free(ltdb->idxptr);
296         ltdb->idxptr = NULL;
297         return LDB_SUCCESS;
298 }
299
300 /* cleanup the idxptr mode when transaction cancels */
301 int ltdb_index_transaction_cancel(struct ldb_module *module)
302 {
303         void *data = ldb_module_get_private(module);
304         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
305         talloc_free(ltdb->idxptr);
306         ltdb->idxptr = NULL;
307         return LDB_SUCCESS;
308 }
309
310
311
312 /* a wrapper around ltdb_store() for the index code which
313    stores in IDXPTR format when idxptr mode is enabled
314
315    WARNING: This modifies the msg which is passed in
316 */
317 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         int ret;
322
323         if (ltdb->idxptr) {
324                 int i;
325                 struct ldb_message *msg2 = ldb_msg_new(module);
326
327                 /* free any old pointer */
328                 ret = ltdb_search_dn1(module, msg->dn, msg2);
329                 if (ret == 0) {
330                         for (i=0;i<msg2->num_elements;i++) {
331                                 struct ldb_message_element *el = &msg2->elements[i];
332                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
333                                         ret = ltdb_free_idxptr(module, el);
334                                         if (ret != LDB_SUCCESS) {
335                                                 return ret;
336                                         }
337                                 }
338                         }
339                 }
340                 talloc_free(msg2);
341
342                 for (i=0;i<msg->num_elements;i++) {
343                         struct ldb_message_element *el = &msg->elements[i];
344                         if (strcmp(el->name, LTDB_IDX) == 0) {
345                                 ret = ltdb_convert_to_idxptr(module, el);
346                                 if (ret != LDB_SUCCESS) {
347                                         return ret;
348                                 }
349                         }
350                 }
351
352                 if (ltdb_idxptr_add(module, msg) != 0) {
353                         return LDB_ERR_OPERATIONS_ERROR;
354                 }
355         }
356
357         ret = ltdb_store(module, msg, flgs);
358         return ret;
359 }
360
361
362 /*
363   find an element in a list, using the given comparison function and
364   assuming that the list is already sorted using comp_fn
365
366   return -1 if not found, or the index of the first occurance of needle if found
367 */
368 static int ldb_list_find(const void *needle,
369                          const void *base, size_t nmemb, size_t size,
370                          comparison_fn_t comp_fn)
371 {
372         const char *base_p = (const char *)base;
373         size_t min_i, max_i, test_i;
374
375         if (nmemb == 0) {
376                 return -1;
377         }
378
379         min_i = 0;
380         max_i = nmemb-1;
381
382         while (min_i < max_i) {
383                 int r;
384
385                 test_i = (min_i + max_i) / 2;
386                 /* the following cast looks strange, but is
387                  correct. The key to understanding it is that base_p
388                  is a pointer to an array of pointers, so we have to
389                  dereference it after casting to void **. The strange
390                  const in the middle gives us the right type of pointer
391                  after the dereference  (tridge) */
392                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
393                 if (r == 0) {
394                         /* scan back for first element */
395                         while (test_i > 0 &&
396                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
397                                 test_i--;
398                         }
399                         return test_i;
400                 }
401                 if (r < 0) {
402                         if (test_i == 0) {
403                                 return -1;
404                         }
405                         max_i = test_i - 1;
406                 }
407                 if (r > 0) {
408                         min_i = test_i + 1;
409                 }
410         }
411
412         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
413                 return min_i;
414         }
415
416         return -1;
417 }
418
419 struct dn_list {
420         unsigned int count;
421         char **dn;
422 };
423
424 /*
425   return the dn key to be used for an index
426   caller frees
427 */
428 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
429                                      const char *attr, const struct ldb_val *value)
430 {
431         struct ldb_dn *ret;
432         struct ldb_val v;
433         const struct ldb_schema_attribute *a;
434         char *attr_folded;
435         int r;
436
437         attr_folded = ldb_attr_casefold(ldb, attr);
438         if (!attr_folded) {
439                 return NULL;
440         }
441
442         a = ldb_schema_attribute_by_name(ldb, attr);
443         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
444         if (r != LDB_SUCCESS) {
445                 const char *errstr = ldb_errstring(ldb);
446                 /* canonicalisation can be refused. For example, 
447                    a attribute that takes wildcards will refuse to canonicalise
448                    if the value contains a wildcard */
449                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
450                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
451                 talloc_free(attr_folded);
452                 return NULL;
453         }
454         if (ldb_should_b64_encode(&v)) {
455                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
456                 if (!vstr) return NULL;
457                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
458                 talloc_free(vstr);
459         } else {
460                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
461         }
462
463         if (v.data != value->data) {
464                 talloc_free(v.data);
465         }
466         talloc_free(attr_folded);
467
468         return ret;
469 }
470
471 /*
472   see if a attribute value is in the list of indexed attributes
473 */
474 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
475                             unsigned int *v_idx, const char *key)
476 {
477         unsigned int i, j;
478         for (i=0;i<msg->num_elements;i++) {
479                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
480                         const struct ldb_message_element *el = &msg->elements[i];
481
482                         if (attr == NULL) {
483                                 /* in this case we are just looking to see if key is present,
484                                    we are not spearching for a specific index */
485                                 return 0;
486                         }
487
488                         for (j=0;j<el->num_values;j++) {
489                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
490                                         if (v_idx) {
491                                                 *v_idx = j;
492                                         }
493                                         return i;
494                                 }
495                         }
496                 }
497         }
498         return -1;
499 }
500
501 /* used in sorting dn lists */
502 static int list_cmp(const char **s1, const char **s2)
503 {
504         return strcmp(*s1, *s2);
505 }
506
507 /*
508   return a list of dn's that might match a simple indexed search or
509  */
510 static int ltdb_index_dn_simple(struct ldb_module *module,
511                                 const struct ldb_parse_tree *tree,
512                                 const struct ldb_message *index_list,
513                                 struct dn_list *list)
514 {
515         struct ldb_context *ldb;
516         struct ldb_dn *dn;
517         int ret;
518         unsigned int i, j;
519         struct ldb_message *msg;
520
521         ldb = ldb_module_get_ctx(module);
522
523         list->count = 0;
524         list->dn = NULL;
525
526         /* if the attribute isn't in the list of indexed attributes then
527            this node needs a full search */
528         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
529                 return LDB_ERR_OPERATIONS_ERROR;
530         }
531
532         /* the attribute is indexed. Pull the list of DNs that match the 
533            search criterion */
534         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
535         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
536
537         msg = talloc(list, struct ldb_message);
538         if (msg == NULL) {
539                 return LDB_ERR_OPERATIONS_ERROR;
540         }
541
542         ret = ltdb_search_dn1_index(module, dn, msg);
543         talloc_free(dn);
544         if (ret != LDB_SUCCESS) {
545                 return ret;
546         }
547
548         for (i=0;i<msg->num_elements;i++) {
549                 struct ldb_message_element *el;
550
551                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
552                         continue;
553                 }
554
555                 el = &msg->elements[i];
556
557                 list->dn = talloc_array(list, char *, el->num_values);
558                 if (!list->dn) {
559                         talloc_free(msg);
560                         return LDB_ERR_OPERATIONS_ERROR;
561                 }
562
563                 for (j=0;j<el->num_values;j++) {
564                         list->dn[list->count] =
565                                 talloc_strdup(list->dn, (char *)el->values[j].data);
566                         if (!list->dn[list->count]) {
567                                 talloc_free(msg);
568                                 return LDB_ERR_OPERATIONS_ERROR;
569                         }
570                         list->count++;
571                 }
572         }
573
574         talloc_free(msg);
575
576         if (list->count > 1) {
577                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
578         }
579
580         return LDB_SUCCESS;
581 }
582
583
584 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
585
586 /*
587   return a list of dn's that might match a leaf indexed search
588  */
589 static int ltdb_index_dn_leaf(struct ldb_module *module,
590                               const struct ldb_parse_tree *tree,
591                               const struct ldb_message *index_list,
592                               struct dn_list *list)
593 {
594         struct ldb_context *ldb;
595         ldb = ldb_module_get_ctx(module);
596
597         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
598                 list->dn = talloc_array(list, char *, 1);
599                 if (list->dn == NULL) {
600                         ldb_oom(ldb);
601                         return LDB_ERR_OPERATIONS_ERROR;
602                 }
603                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
604                 if (list->dn[0] == NULL) {
605                         ldb_oom(ldb);
606                         return LDB_ERR_OPERATIONS_ERROR;
607                 }
608                 list->count = 1;
609                 return LDB_SUCCESS;
610         }
611         return ltdb_index_dn_simple(module, tree, index_list, list);
612 }
613
614
615 /*
616   list intersection
617   list = list & list2
618   relies on the lists being sorted
619 */
620 static int list_intersect(struct ldb_context *ldb,
621                           struct dn_list *list, const struct dn_list *list2)
622 {
623         struct dn_list *list3;
624         unsigned int i;
625
626         if (list->count == 0 || list2->count == 0) {
627                 /* 0 & X == 0 */
628                 return LDB_ERR_NO_SUCH_OBJECT;
629         }
630
631         list3 = talloc(ldb, struct dn_list);
632         if (list3 == NULL) {
633                 return LDB_ERR_OPERATIONS_ERROR;
634         }
635
636         list3->dn = talloc_array(list3, char *, list->count);
637         if (!list3->dn) {
638                 talloc_free(list3);
639                 return LDB_ERR_OPERATIONS_ERROR;
640         }
641         list3->count = 0;
642
643         for (i=0;i<list->count;i++) {
644                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
645                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
646                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
647                         list3->count++;
648                 } else {
649                         talloc_free(list->dn[i]);
650                 }
651         }
652
653         talloc_free(list->dn);
654         list->dn = talloc_move(list, &list3->dn);
655         list->count = list3->count;
656         talloc_free(list3);
657
658         return LDB_ERR_NO_SUCH_OBJECT;
659 }
660
661
662 /*
663   list union
664   list = list | list2
665   relies on the lists being sorted
666 */
667 static int list_union(struct ldb_context *ldb,
668                       struct dn_list *list, const struct dn_list *list2)
669 {
670         unsigned int i;
671         char **d;
672         unsigned int count = list->count;
673
674         if (list->count == 0 && list2->count == 0) {
675                 /* 0 | 0 == 0 */
676                 return LDB_ERR_NO_SUCH_OBJECT;
677         }
678
679         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
680         if (!d) {
681                 return LDB_ERR_OPERATIONS_ERROR;
682         }
683         list->dn = d;
684
685         for (i=0;i<list2->count;i++) {
686                 if (ldb_list_find(list2->dn[i], list->dn, count,
687                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
688                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
689                         if (!list->dn[list->count]) {
690                                 return LDB_ERR_OPERATIONS_ERROR;
691                         }
692                         list->count++;
693                 }
694         }
695
696         if (list->count != count) {
697                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
698         }
699
700         return LDB_ERR_NO_SUCH_OBJECT;
701 }
702
703 static int ltdb_index_dn(struct ldb_module *module,
704                          const struct ldb_parse_tree *tree,
705                          const struct ldb_message *index_list,
706                          struct dn_list *list);
707
708
709 /*
710   OR two index results
711  */
712 static int ltdb_index_dn_or(struct ldb_module *module,
713                             const struct ldb_parse_tree *tree,
714                             const struct ldb_message *index_list,
715                             struct dn_list *list)
716 {
717         struct ldb_context *ldb;
718         unsigned int i;
719         int ret;
720
721         ldb = ldb_module_get_ctx(module);
722
723         ret = LDB_ERR_OPERATIONS_ERROR;
724         list->dn = NULL;
725         list->count = 0;
726
727         for (i=0;i<tree->u.list.num_elements;i++) {
728                 struct dn_list *list2;
729                 int v;
730
731                 list2 = talloc(module, struct dn_list);
732                 if (list2 == NULL) {
733                         return LDB_ERR_OPERATIONS_ERROR;
734                 }
735
736                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
737
738                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
739                         /* 0 || X == X */
740                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
741                                 ret = v;
742                         }
743                         talloc_free(list2);
744                         continue;
745                 }
746
747                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
748                         /* 1 || X == 1 */
749                         talloc_free(list->dn);
750                         talloc_free(list2);
751                         return v;
752                 }
753
754                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
755                         ret = LDB_SUCCESS;
756                         list->dn = talloc_move(list, &list2->dn);
757                         list->count = list2->count;
758                 } else {
759                         if (list_union(ldb, list, list2) == -1) {
760                                 talloc_free(list2);
761                                 return LDB_ERR_OPERATIONS_ERROR;
762                         }
763                         ret = LDB_SUCCESS;
764                 }
765                 talloc_free(list2);
766         }
767
768         if (list->count == 0) {
769                 return LDB_ERR_NO_SUCH_OBJECT;
770         }
771
772         return ret;
773 }
774
775
776 /*
777   NOT an index results
778  */
779 static int ltdb_index_dn_not(struct ldb_module *module,
780                              const struct ldb_parse_tree *tree,
781                              const struct ldb_message *index_list,
782                              struct dn_list *list)
783 {
784         /* the only way to do an indexed not would be if we could
785            negate the not via another not or if we knew the total
786            number of database elements so we could know that the
787            existing expression covered the whole database.
788
789            instead, we just give up, and rely on a full index scan
790            (unless an outer & manages to reduce the list)
791         */
792         return LDB_ERR_OPERATIONS_ERROR;
793 }
794
795 /*
796   AND two index results
797  */
798 static int ltdb_index_dn_and(struct ldb_module *module,
799                              const struct ldb_parse_tree *tree,
800                              const struct ldb_message *index_list,
801                              struct dn_list *list)
802 {
803         struct ldb_context *ldb;
804         unsigned int i;
805         int ret;
806
807         ldb = ldb_module_get_ctx(module);
808
809         ret = LDB_ERR_OPERATIONS_ERROR;
810         list->dn = NULL;
811         list->count = 0;
812
813         for (i=0;i<tree->u.list.num_elements;i++) {
814                 struct dn_list *list2;
815                 int v;
816
817                 list2 = talloc(module, struct dn_list);
818                 if (list2 == NULL) {
819                         return LDB_ERR_OPERATIONS_ERROR;
820                 }
821
822                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
823
824                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
825                         /* 0 && X == 0 */
826                         talloc_free(list->dn);
827                         talloc_free(list2);
828                         return LDB_ERR_NO_SUCH_OBJECT;
829                 }
830
831                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
832                         talloc_free(list2);
833                         continue;
834                 }
835
836                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
837                         ret = LDB_SUCCESS;
838                         talloc_free(list->dn);
839                         list->dn = talloc_move(list, &list2->dn);
840                         list->count = list2->count;
841                 } else {
842                         if (list_intersect(ldb, list, list2) == -1) {
843                                 talloc_free(list2);
844                                 return LDB_ERR_OPERATIONS_ERROR;
845                         }
846                 }
847
848                 talloc_free(list2);
849
850                 if (list->count == 0) {
851                         talloc_free(list->dn);
852                         return LDB_ERR_NO_SUCH_OBJECT;
853                 }
854         }
855
856         return ret;
857 }
858
859 /*
860   AND index results and ONE level special index
861  */
862 static int ltdb_index_dn_one(struct ldb_module *module,
863                              struct ldb_dn *parent_dn,
864                              struct dn_list *list)
865 {
866         struct ldb_context *ldb;
867         struct dn_list *list2;
868         struct ldb_message *msg;
869         struct ldb_dn *key;
870         struct ldb_val val;
871         unsigned int i, j;
872         int ret;
873
874         ldb = ldb_module_get_ctx(module);
875
876         list2 = talloc_zero(module, struct dn_list);
877         if (list2 == NULL) {
878                 return LDB_ERR_OPERATIONS_ERROR;
879         }
880
881         /* the attribute is indexed. Pull the list of DNs that match the
882            search criterion */
883         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
884         val.length = strlen((char *)val.data);
885         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
886         if (!key) {
887                 talloc_free(list2);
888                 return LDB_ERR_OPERATIONS_ERROR;
889         }
890
891         msg = talloc(list2, struct ldb_message);
892         if (msg == NULL) {
893                 talloc_free(list2);
894                 return LDB_ERR_OPERATIONS_ERROR;
895         }
896
897         ret = ltdb_search_dn1_index(module, key, msg);
898         talloc_free(key);
899         if (ret != LDB_SUCCESS) {
900                 return ret;
901         }
902
903         for (i = 0; i < msg->num_elements; i++) {
904                 struct ldb_message_element *el;
905
906                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
907                         continue;
908                 }
909
910                 el = &msg->elements[i];
911
912                 list2->dn = talloc_array(list2, char *, el->num_values);
913                 if (!list2->dn) {
914                         talloc_free(list2);
915                         return LDB_ERR_OPERATIONS_ERROR;
916                 }
917
918                 for (j = 0; j < el->num_values; j++) {
919                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
920                         if (!list2->dn[list2->count]) {
921                                 talloc_free(list2);
922                                 return LDB_ERR_OPERATIONS_ERROR;
923                         }
924                         list2->count++;
925                 }
926         }
927
928         if (list2->count == 0) {
929                 talloc_free(list2);
930                 return LDB_ERR_NO_SUCH_OBJECT;
931         }
932
933         if (list2->count > 1) {
934                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
935         }
936
937         if (list->count > 0) {
938                 if (list_intersect(ldb, list, list2) == -1) {
939                         talloc_free(list2);
940                         return LDB_ERR_OPERATIONS_ERROR;
941                 }
942
943                 if (list->count == 0) {
944                         talloc_free(list->dn);
945                         talloc_free(list2);
946                         return LDB_ERR_NO_SUCH_OBJECT;
947                 }
948         } else {
949                 list->dn = talloc_move(list, &list2->dn);
950                 list->count = list2->count;
951         }
952
953         talloc_free(list2);
954
955         return LDB_SUCCESS;
956 }
957
958 /*
959   return a list of dn's that might match a indexed search or
960   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
961  */
962 static int ltdb_index_dn(struct ldb_module *module,
963                          const struct ldb_parse_tree *tree,
964                          const struct ldb_message *index_list,
965                          struct dn_list *list)
966 {
967         int ret = LDB_ERR_OPERATIONS_ERROR;
968
969         switch (tree->operation) {
970         case LDB_OP_AND:
971                 ret = ltdb_index_dn_and(module, tree, index_list, list);
972                 break;
973
974         case LDB_OP_OR:
975                 ret = ltdb_index_dn_or(module, tree, index_list, list);
976                 break;
977
978         case LDB_OP_NOT:
979                 ret = ltdb_index_dn_not(module, tree, index_list, list);
980                 break;
981
982         case LDB_OP_EQUALITY:
983                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
984                 break;
985
986         case LDB_OP_SUBSTRING:
987         case LDB_OP_GREATER:
988         case LDB_OP_LESS:
989         case LDB_OP_PRESENT:
990         case LDB_OP_APPROX:
991         case LDB_OP_EXTENDED:
992                 /* we can't index with fancy bitops yet */
993                 ret = LDB_ERR_OPERATIONS_ERROR;
994                 break;
995         }
996
997         return ret;
998 }
999
1000 /*
1001   filter a candidate dn_list from an indexed search into a set of results
1002   extracting just the given attributes
1003 */
1004 static int ltdb_index_filter(const struct dn_list *dn_list,
1005                              struct ltdb_context *ac)
1006 {
1007         struct ldb_context *ldb;
1008         struct ldb_message *msg;
1009         unsigned int i;
1010
1011         ldb = ldb_module_get_ctx(ac->module);
1012
1013         for (i = 0; i < dn_list->count; i++) {
1014                 struct ldb_dn *dn;
1015                 int ret;
1016
1017                 msg = ldb_msg_new(ac);
1018                 if (!msg) {
1019                         return LDB_ERR_OPERATIONS_ERROR;
1020                 }
1021
1022                 dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
1023                 if (dn == NULL) {
1024                         talloc_free(msg);
1025                         return LDB_ERR_OPERATIONS_ERROR;
1026                 }
1027
1028                 ret = ltdb_search_dn1(ac->module, dn, msg);
1029                 talloc_free(dn);
1030                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1031                         /* the record has disappeared? yes, this can happen */
1032                         talloc_free(msg);
1033                         continue;
1034                 }
1035
1036                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1037                         /* an internal error */
1038                         talloc_free(msg);
1039                         return LDB_ERR_OPERATIONS_ERROR;
1040                 }
1041
1042                 if (!ldb_match_msg(ldb, msg,
1043                                    ac->tree, ac->base, ac->scope)) {
1044                         talloc_free(msg);
1045                         continue;
1046                 }
1047
1048                 /* filter the attributes that the user wants */
1049                 ret = ltdb_filter_attrs(msg, ac->attrs);
1050
1051                 if (ret == -1) {
1052                         talloc_free(msg);
1053                         return LDB_ERR_OPERATIONS_ERROR;
1054                 }
1055
1056                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1057                 if (ret != LDB_SUCCESS) {
1058                         ac->request_terminated = true;
1059                         return ret;
1060                 }
1061         }
1062
1063         return LDB_SUCCESS;
1064 }
1065
1066 /*
1067   search the database with a LDAP-like expression using indexes
1068   returns -1 if an indexed search is not possible, in which
1069   case the caller should call ltdb_search_full()
1070 */
1071 int ltdb_search_indexed(struct ltdb_context *ac)
1072 {
1073         struct ldb_context *ldb;
1074         void *data = ldb_module_get_private(ac->module);
1075         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1076         struct dn_list *dn_list;
1077         int ret, idxattr, idxone;
1078
1079         ldb = ldb_module_get_ctx(ac->module);
1080
1081         idxattr = idxone = 0;
1082         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1083         if (ret == 0 ) {
1084                 idxattr = 1;
1085         }
1086
1087         /* We do one level indexing only if requested */
1088         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1089         if (ret == 0 ) {
1090                 idxone = 1;
1091         }
1092
1093         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1094             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1095                 /* no indexes? must do full search */
1096                 return LDB_ERR_OPERATIONS_ERROR;
1097         }
1098
1099         ret = LDB_ERR_OPERATIONS_ERROR;
1100
1101         dn_list = talloc_zero(ac, struct dn_list);
1102         if (dn_list == NULL) {
1103                 return LDB_ERR_OPERATIONS_ERROR;
1104         }
1105
1106         if (ac->scope == LDB_SCOPE_BASE) {
1107                 /* with BASE searches only one DN can match */
1108                 dn_list->dn = talloc_array(dn_list, char *, 1);
1109                 if (dn_list->dn == NULL) {
1110                         ldb_oom(ldb);
1111                         return LDB_ERR_OPERATIONS_ERROR;
1112                 }
1113                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1114                 if (dn_list->dn[0] == NULL) {
1115                         ldb_oom(ldb);
1116                         return LDB_ERR_OPERATIONS_ERROR;
1117                 }
1118                 dn_list->count = 1;
1119                 ret = LDB_SUCCESS;
1120         }
1121
1122         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1123                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1124
1125                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1126                         talloc_free(dn_list);
1127                         return ret;
1128                 }
1129         }
1130
1131         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1132                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1133         }
1134
1135         if (ret == LDB_SUCCESS) {
1136                 /* we've got a candidate list - now filter by the full tree
1137                    and extract the needed attributes */
1138                 ret = ltdb_index_filter(dn_list, ac);
1139         }
1140
1141         talloc_free(dn_list);
1142
1143         return ret;
1144 }
1145
1146 /*
1147   add a index element where this is the first indexed DN for this value
1148 */
1149 static int ltdb_index_add1_new(struct ldb_context *ldb,
1150                                struct ldb_message *msg,
1151                                const char *dn)
1152 {
1153         struct ldb_message_element *el;
1154
1155         /* add another entry */
1156         el = talloc_realloc(msg, msg->elements,
1157                                struct ldb_message_element, msg->num_elements+1);
1158         if (!el) {
1159                 return LDB_ERR_OPERATIONS_ERROR;
1160         }
1161
1162         msg->elements = el;
1163         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1164         if (!msg->elements[msg->num_elements].name) {
1165                 return LDB_ERR_OPERATIONS_ERROR;
1166         }
1167         msg->elements[msg->num_elements].num_values = 0;
1168         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1169         if (!msg->elements[msg->num_elements].values) {
1170                 return LDB_ERR_OPERATIONS_ERROR;
1171         }
1172         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1173         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1174         msg->elements[msg->num_elements].num_values = 1;
1175         msg->num_elements++;
1176
1177         return LDB_SUCCESS;
1178 }
1179
1180
1181 /*
1182   add a index element where this is not the first indexed DN for this
1183   value
1184 */
1185 static int ltdb_index_add1_add(struct ldb_context *ldb,
1186                                struct ldb_message *msg,
1187                                int idx,
1188                                const char *dn)
1189 {
1190         struct ldb_val *v2;
1191         unsigned int i;
1192
1193         /* for multi-valued attributes we can end up with repeats */
1194         for (i=0;i<msg->elements[idx].num_values;i++) {
1195                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1196                         return LDB_SUCCESS;
1197                 }
1198         }
1199
1200         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1201                               struct ldb_val,
1202                               msg->elements[idx].num_values+1);
1203         if (!v2) {
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206         msg->elements[idx].values = v2;
1207
1208         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1209         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1210         msg->elements[idx].num_values++;
1211
1212         return LDB_SUCCESS;
1213 }
1214
1215 /*
1216   add an index entry for one message element
1217 */
1218 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1219                            struct ldb_message_element *el, int v_idx)
1220 {
1221         struct ldb_context *ldb;
1222         struct ldb_message *msg;
1223         struct ldb_dn *dn_key;
1224         int ret;
1225         unsigned int i;
1226
1227         ldb = ldb_module_get_ctx(module);
1228
1229         msg = talloc(module, struct ldb_message);
1230         if (msg == NULL) {
1231                 errno = ENOMEM;
1232                 return LDB_ERR_OPERATIONS_ERROR;
1233         }
1234
1235         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1236         if (!dn_key) {
1237                 talloc_free(msg);
1238                 return LDB_ERR_OPERATIONS_ERROR;
1239         }
1240         talloc_steal(msg, dn_key);
1241
1242         ret = ltdb_search_dn1_index(module, dn_key, msg);
1243         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1244                 talloc_free(msg);
1245                 return ret;
1246         }
1247
1248         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1249                 msg->dn = dn_key;
1250                 msg->num_elements = 0;
1251                 msg->elements = NULL;
1252         }
1253
1254         for (i=0;i<msg->num_elements;i++) {
1255                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1256                         break;
1257                 }
1258         }
1259
1260         if (i == msg->num_elements) {
1261                 ret = ltdb_index_add1_new(ldb, msg, dn);
1262         } else {
1263                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1264         }
1265
1266         if (ret == LDB_SUCCESS) {
1267                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1268         }
1269
1270         talloc_free(msg);
1271
1272         return ret;
1273 }
1274
1275 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1276                            struct ldb_message_element *elements, int num_el)
1277 {
1278         void *data = ldb_module_get_private(module);
1279         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1280         int ret;
1281         unsigned int i, j;
1282
1283         if (dn[0] == '@') {
1284                 return LDB_SUCCESS;
1285         }
1286
1287         if (ltdb->cache->indexlist->num_elements == 0) {
1288                 /* no indexed fields */
1289                 return LDB_SUCCESS;
1290         }
1291
1292         for (i = 0; i < num_el; i++) {
1293                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1294                                        NULL, LTDB_IDXATTR);
1295                 if (ret == -1) {
1296                         continue;
1297                 }
1298                 for (j = 0; j < elements[i].num_values; j++) {
1299                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1300                         if (ret != LDB_SUCCESS) {
1301                                 return ret;
1302                         }
1303                 }
1304         }
1305
1306         return LDB_SUCCESS;
1307 }
1308
1309 /*
1310   add the index entries for a new record
1311 */
1312 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1313 {
1314         const char *dn;
1315         int ret;
1316
1317         dn = ldb_dn_get_linearized(msg->dn);
1318         if (dn == NULL) {
1319                 return LDB_ERR_OPERATIONS_ERROR;
1320         }
1321
1322         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1323
1324         return ret;
1325 }
1326
1327
1328 /*
1329   delete an index entry for one message element
1330 */
1331 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1332                          struct ldb_message_element *el, int v_idx)
1333 {
1334         struct ldb_context *ldb;
1335         struct ldb_message *msg;
1336         struct ldb_dn *dn_key;
1337         int ret, i;
1338         unsigned int j;
1339
1340         ldb = ldb_module_get_ctx(module);
1341
1342         if (dn[0] == '@') {
1343                 return LDB_SUCCESS;
1344         }
1345
1346         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1347         if (!dn_key) {
1348                 return LDB_ERR_OPERATIONS_ERROR;
1349         }
1350
1351         msg = talloc(dn_key, struct ldb_message);
1352         if (msg == NULL) {
1353                 talloc_free(dn_key);
1354                 return LDB_ERR_OPERATIONS_ERROR;
1355         }
1356
1357         ret = ltdb_search_dn1_index(module, dn_key, msg);
1358         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1359                 talloc_free(dn_key);
1360                 return ret;
1361         }
1362
1363         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1364                 /* it wasn't indexed. Did we have an earlier error? If we did then
1365                    its gone now */
1366                 talloc_free(dn_key);
1367                 return LDB_SUCCESS;
1368         }
1369
1370         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1371         if (i == -1) {
1372                 struct ldb_ldif ldif;
1373
1374                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1375                                 "ERROR: dn %s not found in %s\n", dn,
1376                                 ldb_dn_get_linearized(dn_key));
1377                 ldif.changetype = LDB_CHANGETYPE_NONE;
1378                 ldif.msg = msg;
1379                 ldb_ldif_write_file(ldb, stdout, &ldif);
1380                 sleep(100);
1381                 /* it ain't there. hmmm */
1382                 talloc_free(dn_key);
1383                 return LDB_SUCCESS;
1384         }
1385
1386         if (j != msg->elements[i].num_values - 1) {
1387                 memmove(&msg->elements[i].values[j],
1388                         &msg->elements[i].values[j+1],
1389                         (msg->elements[i].num_values-(j+1)) *
1390                         sizeof(msg->elements[i].values[0]));
1391         }
1392         msg->elements[i].num_values--;
1393
1394         if (msg->elements[i].num_values == 0) {
1395                 ret = ltdb_delete_noindex(module, dn_key);
1396         } else {
1397                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1398         }
1399
1400         talloc_free(dn_key);
1401
1402         return ret;
1403 }
1404
1405 /*
1406   delete the index entries for a record
1407   return -1 on failure
1408 */
1409 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1410 {
1411         void *data = ldb_module_get_private(module);
1412         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1413         int ret;
1414         const char *dn;
1415         unsigned int i, j;
1416
1417         /* find the list of indexed fields */
1418         if (ltdb->cache->indexlist->num_elements == 0) {
1419                 /* no indexed fields */
1420                 return LDB_SUCCESS;
1421         }
1422
1423         if (ldb_dn_is_special(msg->dn)) {
1424                 return LDB_SUCCESS;
1425         }
1426
1427         dn = ldb_dn_get_linearized(msg->dn);
1428         if (dn == NULL) {
1429                 return LDB_ERR_OPERATIONS_ERROR;
1430         }
1431
1432         for (i = 0; i < msg->num_elements; i++) {
1433                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1434                                        NULL, LTDB_IDXATTR);
1435                 if (ret == -1) {
1436                         continue;
1437                 }
1438                 for (j = 0; j < msg->elements[i].num_values; j++) {
1439                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1440                         if (ret != LDB_SUCCESS) {
1441                                 return ret;
1442                         }
1443                 }
1444         }
1445
1446         return LDB_SUCCESS;
1447 }
1448
1449 /*
1450   handle special index for one level searches
1451 */
1452 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1453 {
1454         void *data = ldb_module_get_private(module);
1455         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1456         struct ldb_message_element el;
1457         struct ldb_val val;
1458         struct ldb_dn *pdn;
1459         const char *dn;
1460         int ret;
1461
1462         /* We index for ONE Level only if requested */
1463         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1464         if (ret != 0) {
1465                 return LDB_SUCCESS;
1466         }
1467
1468         pdn = ldb_dn_get_parent(module, msg->dn);
1469         if (pdn == NULL) {
1470                 return LDB_ERR_OPERATIONS_ERROR;
1471         }
1472
1473         dn = ldb_dn_get_linearized(msg->dn);
1474         if (dn == NULL) {
1475                 talloc_free(pdn);
1476                 return LDB_ERR_OPERATIONS_ERROR;
1477         }
1478
1479         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1480         if (val.data == NULL) {
1481                 talloc_free(pdn);
1482                 return LDB_ERR_OPERATIONS_ERROR;
1483         }
1484
1485         val.length = strlen((char *)val.data);
1486         el.name = LTDB_IDXONE;
1487         el.values = &val;
1488         el.num_values = 1;
1489
1490         if (add) {
1491                 ret = ltdb_index_add1(module, dn, &el, 0);
1492         } else { /* delete */
1493                 ret = ltdb_index_del_value(module, dn, &el, 0);
1494         }
1495
1496         talloc_free(pdn);
1497
1498         return ret;
1499 }
1500
1501
1502 /*
1503   traversal function that deletes all @INDEX records
1504 */
1505 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1506 {
1507         const char *dn = "DN=" LTDB_INDEX ":";
1508         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1509                 return tdb_delete(tdb, key);
1510         }
1511         return 0;
1512 }
1513
1514 /*
1515   traversal function that adds @INDEX records during a re index
1516 */
1517 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1518 {
1519         struct ldb_context *ldb;
1520         struct ldb_module *module = (struct ldb_module *)state;
1521         struct ldb_message *msg;
1522         const char *dn = NULL;
1523         int ret;
1524         TDB_DATA key2;
1525
1526         ldb = ldb_module_get_ctx(module);
1527
1528         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1529             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1530                 return 0;
1531         }
1532
1533         msg = talloc(module, struct ldb_message);
1534         if (msg == NULL) {
1535                 return -1;
1536         }
1537
1538         ret = ltdb_unpack_data(module, &data, msg);
1539         if (ret != 0) {
1540                 talloc_free(msg);
1541                 return -1;
1542         }
1543
1544         /* check if the DN key has changed, perhaps due to the
1545            case insensitivity of an element changing */
1546         key2 = ltdb_key(module, msg->dn);
1547         if (key2.dptr == NULL) {
1548                 /* probably a corrupt record ... darn */
1549                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1550                                                         ldb_dn_get_linearized(msg->dn));
1551                 talloc_free(msg);
1552                 return 0;
1553         }
1554         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1555                 tdb_delete(tdb, key);
1556                 tdb_store(tdb, key2, data, 0);
1557         }
1558         talloc_free(key2.dptr);
1559
1560         if (msg->dn == NULL) {
1561                 dn = (char *)key.dptr + 3;
1562         } else {
1563                 dn = ldb_dn_get_linearized(msg->dn);
1564         }
1565
1566         ret = ltdb_index_one(module, msg, 1);
1567         if (ret == LDB_SUCCESS) {
1568                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1569         } else {
1570                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1571                         "Adding special ONE LEVEL index failed (%s)!\n",
1572                         ldb_dn_get_linearized(msg->dn));
1573         }
1574
1575         talloc_free(msg);
1576
1577         if (ret != LDB_SUCCESS) return -1;
1578
1579         return 0;
1580 }
1581
1582 /*
1583   force a complete reindex of the database
1584 */
1585 int ltdb_reindex(struct ldb_module *module)
1586 {
1587         void *data = ldb_module_get_private(module);
1588         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1589         int ret;
1590
1591         if (ltdb_cache_reload(module) != 0) {
1592                 return LDB_ERR_OPERATIONS_ERROR;
1593         }
1594
1595         /* first traverse the database deleting any @INDEX records */
1596         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1597         if (ret == -1) {
1598                 return LDB_ERR_OPERATIONS_ERROR;
1599         }
1600
1601         /* if we don't have indexes we have nothing todo */
1602         if (ltdb->cache->indexlist->num_elements == 0) {
1603                 return LDB_SUCCESS;
1604         }
1605
1606         /* now traverse adding any indexes for normal LDB records */
1607         ret = tdb_traverse(ltdb->tdb, re_index, module);
1608         if (ret == -1) {
1609                 return LDB_ERR_OPERATIONS_ERROR;
1610         }
1611
1612         if (ltdb->idxptr) {
1613                 ltdb->idxptr->repack = true;
1614         }
1615
1616         return LDB_SUCCESS;
1617 }